Pre-requisites
- IDE with git integration such as VSCode
- Python Selenium webdriver for your browser
- Python 2.7 or 3+ and package dependencies (see below for specific packages).
- Optionally a new Python virtual environment for isolation of packages
- Selenium stand alone server to enable session reuse
- The Java package required for Selenium server to run
Workflow
- Start the Selenium server in a separate process. This will start a local server process and hold the browser session context
- Initialise environmental values for Splunk such as username, password, Splunk URL
- Create a local git repository to store your work and enable the post-commit hook as shown below
- Use the IDE to edit your dashboard XML
- When ready to preview the dashboard, git commit your file. In VSCode, a macro can save, stage and commit your changes with a keyboard combination for faster iterations.
- The post-commit hook will upload the xml to Splunk and refresh the browser
Artefacts
Initialise environmental values once before starting the development session:
#!/bin/bash
read -s -p 'Password: ' password
export splunk_password="$password"
export splunk_url='https://splunk-test.company.com.au'
export splunk_user='m123456'The Git post-commit hook:
#!/bin/sh
# Upload the XML dashboard to Splunk
python `pwd`/scripts/iis-dashboard.py --dashboard scorecard_nonprod_ibm_managed --appname MWOPS --xmlfilepath `pwd`/dashboards/scorecard-environment.xml --dashboarduser=$splunk_user
# Splunk needs time to process the dashboard after upload completes
sleep 2
# Refresh our browser to show the new changes
python `pwd`/scripts/webdriver.py --url=$splunk_url --debugPython script to upload the dashboard XML to our Splunk App:
import sys
import os
import io
import requests
import argparse
import xml.etree.ElementTree as etree
from http.client import responses
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
'''
module docstring: module to upload xml dashboard to be used on a git push
'''
## authenticate and get the session key
def authenticate(splunkurl, user, password):
'''
authenticate to get the auth cookie
'''
# first retrieve the cval cookie value by GET to login page
url = splunkurl + '/en-US/account/login'
response = HTTPSESSION.get(url, verify=False)
print('Authentication - get cval cookie: {} {}'.format(response.status_code, responses[response.status_code]))
#print('CVAL Cookie: {}'.format(HTTPSESSION.cookies))
cval = HTTPSESSION.cookies['cval']
auth_payload = {
'username': user,
'password': password,
'cval': cval
}
response = HTTPSESSION.post(url, data=auth_payload, verify=False)
print('Authentication - post creds: {} {}'.format(response.status_code, responses[response.status_code]))
#print(response.text)
#session_key = json.loads(response.text)['sessionKey']
#print(session_key)
#return session_key
return 0
## get dashboard
def get_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, download_flag):
'''
desc: downloads the xml definition for the provided dashboard name
'''
url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + splunk_user + '/' + appname + '/data/ui/views/' + dashboard_name
# set headers
headers = {
}
#print(headers['Cookie'])
response = HTTPSESSION.get(url, verify=False)
print("Dashboard - downloading: {} {}".format(str(response.status_code), responses[response.status_code]))
#print(response.cookies)
# parse the XML response to get the dashboard xml
xmltree = etree.parse(io.StringIO(response.text))
xmlroot = xmltree.getroot()
xmlentry = xmlroot.findall('entry')
print(xmlentry)
if response.status_code == 200 and download_flag:
with open(xmlfilepath, 'w') as f:
f.write(response.text)
return 0
## save dashboard
def save_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, dashboard_user):
'''
desc: saves the xml definition for the provided dashboard name
'''
url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + dashboard_user + '/' + appname + '/data/ui/views/' + dashboard_name
# required headers
headers = {
'X-Splunk-Form-Key': HTTPSESSION.cookies['splunkweb_csrf_token_8000'],
'X-Requested-With': 'XMLHttpRequest'
#'Content-type': 'application/json'
}
try:
f1 = open(xmlfilepath, 'r')
payload = {
'eai:data': f1.read()
}
except Exception as e:
sys.exit(str(e))
response = HTTPSESSION.post(url, verify=False, data=payload, headers=headers)
print("Dashboard - saving {} {}".format(str(response.status_code), responses[response.status_code]))
#print("Response headers: " + response.headers)
return 0
def get_dashboard_metadata(splunkurl, appname):
'''
desc: download the json metadata for the dashboard
'''
url = '{}/en-US/splunkd/__raw/servicesNS/-/{}/data/ui/views?output_mode=json&search=((isDashboard=1+AND+isVisible=1)+AND+(eai:acl.owner=%22{}%22))'.format(splunkurl, appname, dashboard_user)
# query data
payload = {
'output_mode':'json',
'search':'((isDashboard=1+AND+isVisible=1)+AND+((eai:acl.sharing="user"+AND+eai:acl.owner="{}")+OR+(eai:acl.sharing!="user")))'.format(dashboard_user)
}
#print(headers['Cookie'])
response = HTTPSESSION.get(url, verify=False)
print('get metadata url: {}'.format(response.url))
print("metadata - results: {} {}".format(str(response.status_code), responses[response.status_code]))
print("metadata - response: {}".format(response.content))
def api_auth():
'''
alternate login
'''
url = ''
def get_args():
''' add arguments '''
parser = argparse.ArgumentParser(description='gets and updates a dashboard')
# saveas xml filename
parser.add_argument('--xmlfilepath', help='file path to save XML')
# username for the app context
parser.add_argument('--dashboarduser', default='nobody', help='username for the dashboard context. Default nobody')
# dashboard name
parser.add_argument('--dashboard', help='name of the dashboard to update')
# app name
parser.add_argument('--appname', help='splunk app name where dashboard lives')
# splunk user name
parser.add_argument('--username', default=os.environ.get('splunk_user', None), help='splunk username for authentication. Either set it here or in an environment variable called splunk_user')
# splunk password
parser.add_argument('--password', default=os.environ.get('splunk_password', None), help='splunk password authentication. Either set it here or in an environment variable called splunk_password')
# splunk password
parser.add_argument('--url', default=os.environ.get('splunk_url', None), help='splunk full URL such as https://splunk.hostname.com:8080. Either set it here or in an environment variable called splunk_url')
# sanity checks
args = parser.parse_args()
if not args.dashboard:
print('dashboard missing')
exit(parser.print_usage())
if not args.username:
print('username missing')
exit(parser.print_usage())
if not args.password:
print('password missing')
exit(parser.print_usage())
if not args.url:
print('url missing')
exit(parser.print_usage())
if not args.appname:
print('appname url missing')
exit(parser.print_usage())
if not args.xmlfilepath:
print(' xmlfilepath missing')
exit(parser.print_usage())
return args
##
'''
#### Main
'''
if __name__ == '__main__':
# get arguments
args = get_args()
# initialise variables
splunk_user = os.environ['splunk_user']
splunk_password = os.environ['splunk_password']
splunk_url = os.environ['splunk_url']
dashboard_name = args.dashboard
appname = args.appname
xmlfilepath = args.xmlfilepath
dashboard_user = args.dashboarduser
try:
HTTPSESSION = requests.Session()
# SESS_KEY = get_session_key()
# print('session key: ' + SESS_KEY)
authenticate(splunk_url, splunk_user, splunk_password)
save_dashboard_xml(splunk_url, dashboard_name, appname, xmlfilepath, dashboard_user)
#get_dashboard_metadata(splunk_url,appname)
# dashboard_xml = get_dashboard_xml(SESS_KEY)
# print(dashboard_xml)
except Exception as e:
sys.exit('Error: ' + str(e))The Selenium wrapper to refresh the browser after uploading the dashboard XML:
import sys
import os
import io
import argparse, logging
from selenium import webdriver
from selenium.common import exceptions as seleniumException
def get_args():
''' add arguments '''
parser = argparse.ArgumentParser(description='browser automation')
parser.add_argument('--debug', help='enable debug logging', action='store_true')
parser.add_argument('--url', default='https://splunk-test.company.com.au', help='url to retrieve')
# sanity checks
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
return args
def create_new_driver():
'''starts the selenium webdriver to start the borwser '''
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
capabilities['proxy'] = {'proxyType': 'DIRECT'}
driver = webdriver.Remote(desired_capabilities=capabilities, command_executor='http://127.0.0.1:4444/wd/hub')
# set global environment variables
export_cmd_1 = "{}={}\n".format('session_id', driver.session_id)
export_cmd_2 = "{}={}\n".format('executor_url', driver.command_executor._url)
_f = open(".webdriver_session", "w")
_f.write(export_cmd_1)
_f.write(export_cmd_2)
_f.close()
specs['session_id'] = driver.session_id
specs['url'] = driver.command_executor._url
return driver
##
def create_driver_session(session_id, executor_url):
''' patched the gecko driver to reuse existing sessions '''
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
# Save the original function, so we can revert our patch
org_command_execute = RemoteWebDriver.execute
def new_command_execute(self, command, params=None):
if command == "newSession":
# Mock the response
return {'success': 0, 'value': None, 'sessionId': session_id}
else:
return org_command_execute(self, command, params)
# Patch the function before creating the driver object
RemoteWebDriver.execute = new_command_execute
capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
capabilities['proxy'] = {'proxyType': 'DIRECT'}
new_driver = webdriver.Remote(command_executor=executor_url, desired_capabilities=capabilities)
new_driver.session_id = session_id
# Replace the patched function with original function
RemoteWebDriver.execute = org_command_execute
return new_driver
##
'''
#### Main
'''
if __name__ == '__main__':
logging.basicConfig(filename='webdriver.log', level=logging.INFO, format='%(asctime)s %(message)s')
ARGS = get_args()
URL = ARGS.url
SPECS = {}
# get session_id adn executor_url from shell
try:
ids = {}
with open(".webdriver_session") as f:
for line in f:
key, value = line.split("=")
ids[key] = value.rstrip()
logging.debug('ids: {}'.format(ids))
logging.debug('session id found: {}'.format(ids['session_id']))
logging.debug('executor_url found {}'.format(ids['executor_url']))
driver = create_driver_session(ids['session_id'], ids['executor_url'])
except (OSError, seleniumException.NoSuchWindowException, seleniumException.WebDriverException) as err:
logging.exception('starting new session due to error {}'.format(err))
driver = create_new_driver()
try:
driver.get(URL)
except Exception as err:
logging.exception('Unhandled exception: {}'.format(err))