Blog
Improving the Splunk Dashboard development workflow

Pre-requisites

  • IDE with git integration such as VSCode
  • Python Selenium webdriver for your browser
  • Python 2.7 or 3+ and package dependencies (see below for specific packages).
  • Optionally a new Python virtual environment for isolation of packages
  • Selenium stand alone server to enable session reuse
  • The Java package required for Selenium server to run

Workflow

  1. Start the Selenium server in a separate process. This will start a local server process and hold the browser session context
  2. Initialise environmental values for Splunk such as username, password, Splunk URL
  3. Create a local git repository to store your work and enable the post-commit hook as shown below
  4. Use the IDE to edit your dashboard XML
  5. When ready to preview the dashboard, git commit your file. In VSCode, a macro can save, stage and commit your changes with a keyboard combination for faster iterations.
  6. The post-commit hook will upload the xml to Splunk and refresh the browser

Artefacts

Initialise environmental values once before starting the development session:

#!/bin/bash
read -s -p 'Password: ' password
export splunk_password="$password"
export splunk_url='https://splunk-test.company.com.au'
export splunk_user='m123456'

The Git post-commit hook:

#!/bin/sh
 
# Upload the XML dashboard to Splunk
python `pwd`/scripts/iis-dashboard.py --dashboard scorecard_nonprod_ibm_managed --appname MWOPS --xmlfilepath `pwd`/dashboards/scorecard-environment.xml --dashboarduser=$splunk_user
 
# Splunk needs time to process the dashboard after upload completes
sleep 2
 
# Refresh our browser to show the new changes
python `pwd`/scripts/webdriver.py --url=$splunk_url --debug

Python script to upload the dashboard XML to our Splunk App:

import sys
import os
import io
import requests
import argparse
import xml.etree.ElementTree as etree
from http.client import responses
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
 
'''
module docstring: module to upload xml dashboard to be used on a git push
'''
 
## authenticate and get the session key
def authenticate(splunkurl, user, password):
    '''
    authenticate to get the auth cookie
    '''
 
    # first retrieve the cval cookie value by GET to login page
    url = splunkurl + '/en-US/account/login'
    response = HTTPSESSION.get(url, verify=False)
    print('Authentication - get cval cookie: {} {}'.format(response.status_code, responses[response.status_code]))
    #print('CVAL Cookie: {}'.format(HTTPSESSION.cookies))
    cval = HTTPSESSION.cookies['cval']
 
    auth_payload = {
        'username': user,
        'password': password,
        'cval': cval
    }
 
    response = HTTPSESSION.post(url, data=auth_payload, verify=False)
 
    print('Authentication - post creds: {} {}'.format(response.status_code, responses[response.status_code]))
    #print(response.text)
 
    #session_key = json.loads(response.text)['sessionKey']
    #print(session_key)
 
    #return session_key
    return 0
 
 
## get dashboard
def get_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, download_flag):
    '''
    desc: downloads the xml definition for the provided dashboard name
    '''
 
    url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + splunk_user + '/' + appname + '/data/ui/views/' + dashboard_name
 
    # set headers
    headers = {
 
    }
 
    #print(headers['Cookie'])
    response = HTTPSESSION.get(url, verify=False)
 
    print("Dashboard - downloading: {} {}".format(str(response.status_code), responses[response.status_code]))
    #print(response.cookies)
 
    # parse the XML response to get the dashboard xml
    xmltree = etree.parse(io.StringIO(response.text))
    xmlroot = xmltree.getroot()
    xmlentry = xmlroot.findall('entry')
    print(xmlentry)
 
    if response.status_code == 200 and download_flag:
        with open(xmlfilepath, 'w') as f:
            f.write(response.text)
 
    return 0
 
## save dashboard
def save_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, dashboard_user):
    '''
    desc: saves the xml definition for the provided dashboard name
    '''
 
    url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + dashboard_user + '/' + appname + '/data/ui/views/' + dashboard_name
 
 
    # required headers
    headers = {
        'X-Splunk-Form-Key': HTTPSESSION.cookies['splunkweb_csrf_token_8000'],
        'X-Requested-With': 'XMLHttpRequest'
        #'Content-type': 'application/json'
    }
 
    try:
        f1 = open(xmlfilepath, 'r')
        payload = {
            'eai:data': f1.read()
        }
 
    except Exception as e:
        sys.exit(str(e))
 
 
    response = HTTPSESSION.post(url, verify=False, data=payload, headers=headers)
    print("Dashboard - saving {} {}".format(str(response.status_code), responses[response.status_code]))
    #print("Response headers: " + response.headers)
 
    return 0
 
def get_dashboard_metadata(splunkurl, appname):
    '''
    desc: download the json metadata for the dashboard
    '''
 
    url = '{}/en-US/splunkd/__raw/servicesNS/-/{}/data/ui/views?output_mode=json&search=((isDashboard=1+AND+isVisible=1)+AND+(eai:acl.owner=%22{}%22))'.format(splunkurl, appname, dashboard_user)
 
 
    # query data
    payload = {
        'output_mode':'json',
        'search':'((isDashboard=1+AND+isVisible=1)+AND+((eai:acl.sharing="user"+AND+eai:acl.owner="{}")+OR+(eai:acl.sharing!="user")))'.format(dashboard_user)
    }
 
    #print(headers['Cookie'])
    response = HTTPSESSION.get(url, verify=False)
 
    print('get metadata url: {}'.format(response.url))
    print("metadata - results: {} {}".format(str(response.status_code), responses[response.status_code]))
    print("metadata - response: {}".format(response.content))
 
 
def api_auth():
    '''
    alternate login
    '''
 
    url = ''
 
 
def get_args():
    ''' add arguments '''
 
    parser = argparse.ArgumentParser(description='gets and updates a dashboard')
 
    # saveas xml filename
    parser.add_argument('--xmlfilepath', help='file path to save XML')
 
    # username for the app context
    parser.add_argument('--dashboarduser', default='nobody', help='username for the dashboard context. Default nobody')
 
    # dashboard name
    parser.add_argument('--dashboard', help='name of the dashboard to update')
 
    # app name
    parser.add_argument('--appname', help='splunk app name where dashboard lives')
 
    # splunk user name
    parser.add_argument('--username', default=os.environ.get('splunk_user', None), help='splunk username for authentication. Either set it here or in an environment variable called splunk_user')
 
    # splunk password
    parser.add_argument('--password', default=os.environ.get('splunk_password', None), help='splunk password authentication. Either set it here or in an environment variable called splunk_password')
 
    # splunk password
    parser.add_argument('--url', default=os.environ.get('splunk_url', None), help='splunk full URL such as https://splunk.hostname.com:8080. Either set it here or in an environment variable called splunk_url')
 
    # sanity checks 
    args = parser.parse_args()
    if not args.dashboard: 
        print('dashboard missing')
        exit(parser.print_usage())
 
    if not args.username:
        print('username missing')
        exit(parser.print_usage())
 
    if not args.password:
        print('password missing')
        exit(parser.print_usage())
 
    if not args.url:
        print('url missing')
        exit(parser.print_usage())
 
    if not args.appname:
        print('appname url missing')
        exit(parser.print_usage())
 
    if not args.xmlfilepath:
        print(' xmlfilepath missing')
        exit(parser.print_usage())
 
    return args
 
 
##
'''
#### Main
'''
if __name__ == '__main__':
 
    # get arguments
    args = get_args()
 
    # initialise variables
    splunk_user = os.environ['splunk_user']
    splunk_password = os.environ['splunk_password']
    splunk_url = os.environ['splunk_url']
 
    dashboard_name = args.dashboard
    appname = args.appname
    xmlfilepath = args.xmlfilepath
    dashboard_user = args.dashboarduser
 
 
    try:
        HTTPSESSION = requests.Session()
        # SESS_KEY = get_session_key()
        # print('session key: ' + SESS_KEY)
 
        authenticate(splunk_url, splunk_user, splunk_password)
        save_dashboard_xml(splunk_url, dashboard_name, appname, xmlfilepath, dashboard_user)
        #get_dashboard_metadata(splunk_url,appname)
 
 
        # dashboard_xml = get_dashboard_xml(SESS_KEY)
        # print(dashboard_xml)
    except Exception as e:
        sys.exit('Error: ' + str(e))

The Selenium wrapper to refresh the browser after uploading the dashboard XML:

import sys
import os
import io
import argparse, logging
from selenium import webdriver
from selenium.common import exceptions as seleniumException
 
def get_args():
    ''' add arguments '''
 
    parser = argparse.ArgumentParser(description='browser automation')
 
    parser.add_argument('--debug', help='enable debug logging', action='store_true')
    parser.add_argument('--url', default='https://splunk-test.company.com.au', help='url to retrieve')
 
    # sanity checks 
    args = parser.parse_args()
    if args.debug: 
        logging.basicConfig(level=logging.DEBUG)
 
    return args
 
def create_new_driver():
    '''starts the selenium webdriver to start the borwser '''
    
    from selenium.webdriver.common.keys import Keys
    from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
 
    capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
    capabilities['proxy'] = {'proxyType': 'DIRECT'}
    driver = webdriver.Remote(desired_capabilities=capabilities, command_executor='http://127.0.0.1:4444/wd/hub')
 
    # set global environment variables
    export_cmd_1 = "{}={}\n".format('session_id', driver.session_id)
    export_cmd_2 = "{}={}\n".format('executor_url', driver.command_executor._url)
 
    _f = open(".webdriver_session", "w")
    _f.write(export_cmd_1)
    _f.write(export_cmd_2)
    _f.close()
 
    specs['session_id'] = driver.session_id
    specs['url'] = driver.command_executor._url
    return driver
 
##
def create_driver_session(session_id, executor_url):
    ''' patched the gecko driver to reuse existing sessions '''
    from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
 
    # Save the original function, so we can revert our patch
    org_command_execute = RemoteWebDriver.execute
 
    def new_command_execute(self, command, params=None):
        if command == "newSession":
            # Mock the response
            return {'success': 0, 'value': None, 'sessionId': session_id}
        else:
            return org_command_execute(self, command, params)
 
    # Patch the function before creating the driver object
    RemoteWebDriver.execute = new_command_execute
 
    capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
    capabilities['proxy'] = {'proxyType': 'DIRECT'}
 
    new_driver = webdriver.Remote(command_executor=executor_url, desired_capabilities=capabilities)
    new_driver.session_id = session_id
 
    # Replace the patched function with original function
    RemoteWebDriver.execute = org_command_execute
 
    return new_driver
 
##
'''
#### Main
'''
if __name__ == '__main__':
 
    logging.basicConfig(filename='webdriver.log', level=logging.INFO, format='%(asctime)s %(message)s')
 
    ARGS = get_args()
 
    URL = ARGS.url
 
    SPECS = {}
 
    # get session_id adn executor_url from shell
    try:
        ids = {}
        with open(".webdriver_session") as f:
            for line in f:
                key, value = line.split("=")
                ids[key] = value.rstrip()
 
        logging.debug('ids: {}'.format(ids))
        logging.debug('session id found: {}'.format(ids['session_id']))
        logging.debug('executor_url found {}'.format(ids['executor_url']))
        driver = create_driver_session(ids['session_id'], ids['executor_url'])
    except (OSError, seleniumException.NoSuchWindowException, seleniumException.WebDriverException) as err:
        logging.exception('starting new session due to error {}'.format(err))
        driver = create_new_driver()
 
    try:
        driver.get(URL)
    except Exception as err:
        logging.exception('Unhandled exception: {}'.format(err))