diff --git a/scripts/metrics/load_buildkite.py b/scripts/metrics/load_buildkite.py index 6d6e67b..03f591b 100644 --- a/scripts/metrics/load_buildkite.py +++ b/scripts/metrics/load_buildkite.py @@ -106,62 +106,37 @@ where a.id IS NULL""") def insert_new_builds(conn): logging.info('inserting new builds') - max_pages = 2 - while max_pages < 1000: - logging.info(f'checking page #{max_pages}') + all_builds = [] + page = 1 + while page < 10000: + logging.info(f'checking page #{page}') re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', - params={'page': max_pages}, + params={'page': page}, headers={'Authorization': token}) if re.status_code != 200: logging.error(f'list builds response status: {re.status_code}') sys.exit(1) x = re.json() - existing = 0 - new = 0 - for b in x: - if (b['state'] == 'running') or (b['state'] == 'scheduled'): - new += 1 - continue - with conn.cursor() as c: - c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) - if c.fetchone()[0] == 0: - new += 1 - else: - existing += 1 - logging.info(f'new {new} existing {existing}') - if new == 0: - break - max_pages += 10 - max_pages += 5 - logging.info(f'will load {max_pages} pages') - page = 1 - all_builds = [] - # Read #max_pages first in order to not miss any builds that are moved due to new inserts. - while page <= max_pages: - logging.info(f'loading page {page}') - re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', - params={'page': page}, - headers={'Authorization': token}) - if re.status_code != 200: - print('response status', re.status_code, re) - break - x = re.json() - if x == []: + if not x: logging.warning('empty response') break - all_builds.extend(x) page += 1 - # Now insert new builds in reverse order so that we can resume correctly if operation has failed. + all_builds.extend(x) + b = x[-1] + with conn.cursor() as c: + c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) + if c.fetchone()[0] != 0: + logging.info(f"found existing build {b.get('id')}") + break all_builds.reverse() logging.info(f'{len(all_builds)} builds loaded') cnt = 0 for b in all_builds: - if (b['state'] == 'running') or (b['state'] == 'scheduled'): - continue with conn.cursor() as c: c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) if c.fetchone()[0] == 0: - c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s)', [b.get('id'), psycopg2.extras.Json(b)]) + c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s) ON CONFLICT (id) DO UPDATE SET raw = %s', + [b.get('id'), psycopg2.extras.Json(b), psycopg2.extras.Json(b)]) cnt += 1 if cnt % 100 == 0: logging.info(f'{cnt} builds inserted') @@ -171,6 +146,64 @@ def insert_new_builds(conn): return cnt +def insert_all_builds(conn): + logging.info('inserting all builds') + page = 1 + cnt = 0 + while page < 100000: + logging.info(f'checking page #{page}') + re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', + params={'page': page}, + headers={'Authorization': token}) + if re.status_code != 200: + logging.error(f'list builds response status: {re.status_code}') + sys.exit(1) + x = re.json() + if not x: + logging.warning('empty response') + break + page += 1 + for b in x: + with conn.cursor() as c: + c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s) ON CONFLICT (id) DO NOTHING', + [b.get('id'), b]) + cnt += 1 + if cnt % 100 == 0: + logging.info(f'{cnt} builds inserted') + conn.commit() + conn.commit() + logging.info(f'{cnt} builds inserted') + return cnt + + +def update_running_builds(conn): + with conn.cursor() as c: + c.execute(""" +SELECT key, raw +FROM builds b +WHERE b.raw->>'state' IN ('running', 'scheduled', 'canceling')""") + cnt = 0 + total = c.rowcount + logging.info(f'checking {total} running builds') + for row in c: + key = row[0] + raw = row[1] + logging.info(f'running build {key}') + re = requests.get(raw['url'], headers={'Authorization': token}) + logging.info(f"{raw['url']} -> {re.status_code}") + if re.status_code != 200: + # mark build as "invalid" + continue + j = re.json() + logging.info(f"state {raw['state']} -> {j['state']}") + with conn.cursor() as u: + u.execute("""UPDATE builds SET raw = %s WHERE key = %s""", (j, key)) + cnt += 1 + if cnt % 100 == 0: + conn.commit() + conn.commit() + + def download_job_artifacts_list(conn): logging.info('download jobs artifact lsits') with conn.cursor() as c: @@ -206,7 +239,8 @@ def insert_new_jobs(conn): logging.info('inserting new jobs') with conn.cursor() as c: c.execute("""select bj.id, bj.jid, bj.job from -(select b.id, j->>'id' jid, j as job from builds b, json_array_elements(b.raw->'jobs') as j) as bj +(select b.id, j->>'id' jid, j as job from builds b, json_array_elements(b.raw->'jobs') as j +WHERE b.raw->>'state' NOT IN ('running', 'scheduled', 'canceling')) as bj left join jobs j on j.id = bj.jid where j.id IS NULL""") total = c.rowcount @@ -250,7 +284,9 @@ if __name__ == '__main__': logging.basicConfig(level='INFO', format='%(levelname)-7s %(message)s') cn = connect() logging.info('downloading buildkite data') + # insert_all_builds(cn) insert_new_builds(cn) + # update_running_builds(cn) insert_new_jobs(cn) download_job_artifacts_list(cn) download_job_artifacts(cn)