1
0
Fork 0

store builds in runnings state and check them later

That removes a wrong assumption that everything interesting is
located within first few pages. Should also reduce API load a bit (as we
will not load additional 5+ pages just in case.
This commit is contained in:
Mikhail Goncharov 2021-06-24 17:09:42 +02:00
parent a6c2eb7f23
commit 19e290d0cb

View file

@ -106,62 +106,37 @@ where a.id IS NULL""")
def insert_new_builds(conn): def insert_new_builds(conn):
logging.info('inserting new builds') logging.info('inserting new builds')
max_pages = 2 all_builds = []
while max_pages < 1000: page = 1
logging.info(f'checking page #{max_pages}') while page < 10000:
logging.info(f'checking page #{page}')
re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds',
params={'page': max_pages}, params={'page': page},
headers={'Authorization': token}) headers={'Authorization': token})
if re.status_code != 200: if re.status_code != 200:
logging.error(f'list builds response status: {re.status_code}') logging.error(f'list builds response status: {re.status_code}')
sys.exit(1) sys.exit(1)
x = re.json() x = re.json()
existing = 0 if not x:
new = 0
for b in x:
if (b['state'] == 'running') or (b['state'] == 'scheduled'):
new += 1
continue
with conn.cursor() as c:
c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),))
if c.fetchone()[0] == 0:
new += 1
else:
existing += 1
logging.info(f'new {new} existing {existing}')
if new == 0:
break
max_pages += 10
max_pages += 5
logging.info(f'will load {max_pages} pages')
page = 1
all_builds = []
# Read #max_pages first in order to not miss any builds that are moved due to new inserts.
while page <= max_pages:
logging.info(f'loading page {page}')
re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds',
params={'page': page},
headers={'Authorization': token})
if re.status_code != 200:
print('response status', re.status_code, re)
break
x = re.json()
if x == []:
logging.warning('empty response') logging.warning('empty response')
break break
all_builds.extend(x)
page += 1 page += 1
# Now insert new builds in reverse order so that we can resume correctly if operation has failed. all_builds.extend(x)
b = x[-1]
with conn.cursor() as c:
c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),))
if c.fetchone()[0] != 0:
logging.info(f"found existing build {b.get('id')}")
break
all_builds.reverse() all_builds.reverse()
logging.info(f'{len(all_builds)} builds loaded') logging.info(f'{len(all_builds)} builds loaded')
cnt = 0 cnt = 0
for b in all_builds: for b in all_builds:
if (b['state'] == 'running') or (b['state'] == 'scheduled'):
continue
with conn.cursor() as c: with conn.cursor() as c:
c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),))
if c.fetchone()[0] == 0: if c.fetchone()[0] == 0:
c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s)', [b.get('id'), psycopg2.extras.Json(b)]) c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s) ON CONFLICT (id) DO UPDATE SET raw = %s',
[b.get('id'), psycopg2.extras.Json(b), psycopg2.extras.Json(b)])
cnt += 1 cnt += 1
if cnt % 100 == 0: if cnt % 100 == 0:
logging.info(f'{cnt} builds inserted') logging.info(f'{cnt} builds inserted')
@ -171,6 +146,64 @@ def insert_new_builds(conn):
return cnt return cnt
def insert_all_builds(conn):
logging.info('inserting all builds')
page = 1
cnt = 0
while page < 100000:
logging.info(f'checking page #{page}')
re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds',
params={'page': page},
headers={'Authorization': token})
if re.status_code != 200:
logging.error(f'list builds response status: {re.status_code}')
sys.exit(1)
x = re.json()
if not x:
logging.warning('empty response')
break
page += 1
for b in x:
with conn.cursor() as c:
c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s) ON CONFLICT (id) DO NOTHING',
[b.get('id'), b])
cnt += 1
if cnt % 100 == 0:
logging.info(f'{cnt} builds inserted')
conn.commit()
conn.commit()
logging.info(f'{cnt} builds inserted')
return cnt
def update_running_builds(conn):
with conn.cursor() as c:
c.execute("""
SELECT key, raw
FROM builds b
WHERE b.raw->>'state' IN ('running', 'scheduled', 'canceling')""")
cnt = 0
total = c.rowcount
logging.info(f'checking {total} running builds')
for row in c:
key = row[0]
raw = row[1]
logging.info(f'running build {key}')
re = requests.get(raw['url'], headers={'Authorization': token})
logging.info(f"{raw['url']} -> {re.status_code}")
if re.status_code != 200:
# mark build as "invalid"
continue
j = re.json()
logging.info(f"state {raw['state']} -> {j['state']}")
with conn.cursor() as u:
u.execute("""UPDATE builds SET raw = %s WHERE key = %s""", (j, key))
cnt += 1
if cnt % 100 == 0:
conn.commit()
conn.commit()
def download_job_artifacts_list(conn): def download_job_artifacts_list(conn):
logging.info('download jobs artifact lsits') logging.info('download jobs artifact lsits')
with conn.cursor() as c: with conn.cursor() as c:
@ -206,7 +239,8 @@ def insert_new_jobs(conn):
logging.info('inserting new jobs') logging.info('inserting new jobs')
with conn.cursor() as c: with conn.cursor() as c:
c.execute("""select bj.id, bj.jid, bj.job from c.execute("""select bj.id, bj.jid, bj.job from
(select b.id, j->>'id' jid, j as job from builds b, json_array_elements(b.raw->'jobs') as j) as bj (select b.id, j->>'id' jid, j as job from builds b, json_array_elements(b.raw->'jobs') as j
WHERE b.raw->>'state' NOT IN ('running', 'scheduled', 'canceling')) as bj
left join jobs j on j.id = bj.jid left join jobs j on j.id = bj.jid
where j.id IS NULL""") where j.id IS NULL""")
total = c.rowcount total = c.rowcount
@ -250,7 +284,9 @@ if __name__ == '__main__':
logging.basicConfig(level='INFO', format='%(levelname)-7s %(message)s') logging.basicConfig(level='INFO', format='%(levelname)-7s %(message)s')
cn = connect() cn = connect()
logging.info('downloading buildkite data') logging.info('downloading buildkite data')
# insert_all_builds(cn)
insert_new_builds(cn) insert_new_builds(cn)
# update_running_builds(cn)
insert_new_jobs(cn) insert_new_jobs(cn)
download_job_artifacts_list(cn) download_job_artifacts_list(cn)
download_job_artifacts(cn) download_job_artifacts(cn)