1
0
Fork 0

getting workflow stages

This commit is contained in:
Christian Kühnel 2020-05-06 12:06:40 +02:00
parent 27da151340
commit d55096bb7c

View file

@ -1,13 +1,25 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import requests
from typing import Optional, List, Dict
import json
import os
from urllib.parse import urljoin
import datetime
import numpy
import csv import csv
import datetime
import hashlib
import json
import numpy
import requests
import os
import sys
from typing import Optional, List, Dict
from urllib.parse import urljoin
class Stage:
def __init__(self, stage_dict: Dict):
self.name = stage_dict['name']
self.success = stage_dict['status'].lower() == 'success'
self.start_time = datetime.datetime.fromtimestamp(stage_dict['startTimeMillis']/1000)
self.duration = datetime.timedelta(milliseconds=stage_dict['durationMillis'])
class Build: class Build:
@ -17,6 +29,7 @@ class Build:
self.result = build_dict['result'] self.result = build_dict['result']
self.start_time = datetime.datetime.fromtimestamp(build_dict['timestamp']/1000) self.start_time = datetime.datetime.fromtimestamp(build_dict['timestamp']/1000)
self.duration = datetime.timedelta(milliseconds=build_dict['duration']) self.duration = datetime.timedelta(milliseconds=build_dict['duration'])
self.stages = [] # type: List[Stage]
@property @property
def hour(self) -> datetime.datetime: def hour(self) -> datetime.datetime:
@ -35,10 +48,12 @@ class Build:
day=self.start_time.day, day=self.start_time.day,
) )
def update_from_wfdata(self, wfdata: Dict):
self.stages = [Stage(s) for s in wfdata['stages']]
class JenkinsStatsReader: class JenkinsStatsReader:
_TMP_DIR = 'tmp/jenkins'
_JENKINS_DAT_FILE = 'tmp/jenkins.json'
def __init__(self): def __init__(self):
self.username = None # type: Optional[str] self.username = None # type: Optional[str]
@ -62,28 +77,51 @@ class JenkinsStatsReader:
return self.builds.keys() return self.builds.keys()
def get_data(self): def get_data(self):
if not os.path.isfile(self._JENKINS_DAT_FILE): jobnames = self.fetch_jobsnames()
self.fetch_data() print('Found {} jobs: {}'.format(len(jobnames), jobnames))
self.parse_data() self.get_builds(jobnames)
self.get_workflow_data()
self.create_statistics('hour') self.create_statistics('hour')
self.create_statistics('day') self.create_statistics('day')
def fetch_data(self): def cached_get(self, url) -> Dict:
response = self._session.get( m = hashlib.sha256()
urljoin(self.jenkins_url, 'api/json?tree=jobs[name,url,allBuilds[number,result,duration,url,timestamp]]')) m.update(url.encode('utf-8'))
with open(self._JENKINS_DAT_FILE, 'w') as jenkins_data_file: filename = m.digest().hex()
json.dump(response.json(), jenkins_data_file) cache_file = os.path.join(self._TMP_DIR, filename)
if os.path.isfile(cache_file):
with open(cache_file, 'r') as json_file:
data = json.load(json_file)
return data
def parse_data(self): response = self._session.get(urljoin(self.jenkins_url, url))
with open(self._JENKINS_DAT_FILE) as jenkins_data_file: if response.status_code != 200:
build_data = json.load(jenkins_data_file) raise IOError('Could not read data from {}:\n{}'.format(url, response.text))
for job in build_data['jobs']: os.makedirs(self._TMP_DIR, exist_ok=True)
job_name = job['name'] with open(cache_file, 'w') as jenkins_data_file:
self.builds[job_name] = [Build(job_name, b) for b in job['allBuilds']] jenkins_data_file.write(response.text)
return response.json()
def fetch_jobsnames(self) -> List[str]:
data = self.cached_get('api/json?tree=jobs[name]')
return [job['name'] for job in data['jobs']]
def get_builds(self, job_names):
for job_name in job_names:
print('Gettings builds for: {}'.format(job_name))
build_data = self.cached_get('job/{}/api/json?tree=allBuilds[number,result,duration,timestamp,executor]'.format(job_name))
self.builds[job_name] = [Build(job_name, b) for b in build_data['allBuilds']]
print('{} has {} builds'.format(job_name, len(self.builds[job_name]))) print('{} has {} builds'.format(job_name, len(self.builds[job_name])))
def get_workflow_data(self):
for job_name, builds in self.builds.items():
for i, build in enumerate(builds):
wfdata = self.cached_get('job/{}/{}/wfapi/'.format(job_name, build.number))
build.update_from_wfdata(wfdata)
sys.stdout.write('\r{} [{}/{}]'.format(job_name, i, len(builds)))
sys.stdout.flush()
def create_statistics(self, group_by: str): def create_statistics(self, group_by: str):
# only look at Phab
for job_name, builds in self.builds.items(): for job_name, builds in self.builds.items():
print('Writing data for {}'.format(job_name)) print('Writing data for {}'.format(job_name))
# TODO: add success/failure rates # TODO: add success/failure rates