From dd0b3d8b6478d76dba5828083c4aa0824bdaf524 Mon Sep 17 00:00:00 2001 From: Mikhail Goncharov Date: Mon, 29 Jan 2024 15:39:12 +0100 Subject: [PATCH] cleanup --- scripts/metrics/connect_db.sh | 22 --- scripts/metrics/load_buildkite.py | 292 ------------------------------ 2 files changed, 314 deletions(-) delete mode 100755 scripts/metrics/connect_db.sh delete mode 100644 scripts/metrics/load_buildkite.py diff --git a/scripts/metrics/connect_db.sh b/scripts/metrics/connect_db.sh deleted file mode 100755 index 0b9cfa4..0000000 --- a/scripts/metrics/connect_db.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/usr/bin/env bash -# Copyright 2021 Google LLC -# -# Licensed under the the Apache License v2.0 with LLVM Exceptions (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://llvm.org/LICENSE.txt -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# generate statistics on the llvm github repository -SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )" -cd $SCRIPT_DIR -# sleep to let proxy to came up -cloud_sql_proxy -instances=llvm-premerge-checks:us-central1:buildkite-stats=tcp:0.0.0.0:5432 & -sleep 3s -$@ diff --git a/scripts/metrics/load_buildkite.py b/scripts/metrics/load_buildkite.py deleted file mode 100644 index c344b02..0000000 --- a/scripts/metrics/load_buildkite.py +++ /dev/null @@ -1,292 +0,0 @@ -import sys -import psycopg2 -import psycopg2.extras -import logging -import requests -import os -import dateutil -import chardet -from benedict import benedict -import traceback - -psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json) - -token = f'Bearer {os.getenv("BUILDKITE_API_TOKEN")}' - - -def connect(): - return psycopg2.connect( - f"host=127.0.0.1 sslmode=disable dbname=stats user=stats password={os.getenv('DB_PASSWORD')}") - - -def download_text(url): - r = requests.get(url, allow_redirects=True, - headers={'Authorization': token}) - if r.status_code != 200: - raise Exception(f'response status {r.status_code}') - try: - return r.content.decode('utf-8').replace("\x00", "\uFFFD"), 'utf-8' - except: - pass - try: - return r.content.decode('ascii').replace("\x00", "\uFFFD"), 'ascii' - except: - pass - d = chardet.detect(r.content) - return r.content.decode(d['encoding']).replace("\x00", "\uFFFD"), d['encoding'] - - -def download_job_logs(conn): - logging.info('downloading job logs') - with conn.cursor() as c: - c.execute(f""" -select j.id, j.raw->>'raw_log_url' url -from jobs j -left join artifacts a on a.job_id = j.id and a.id=j.id -where a.id IS NULL and j.raw->>'raw_log_url' IS NOT NULL -""") - total = c.rowcount - logging.info(f'will download {total} logs') - cnt = 0 - for row in c: - cnt += 1 - job_id = row[0] - url = row[1] - meta = {'filename': 'stdout'} - try: - content, en = download_text(url) - meta['encoding'] = en - with conn.cursor() as i: - i.execute('INSERT INTO artifacts (id, job_id, content, meta) VALUES (%s, %s, %s, %s)', - [job_id, job_id, content, meta]) - except: - meta['failure'] = traceback.format_exc() - logging.error(f'download artifact failed {meta["failure"]} {url}') - with conn.cursor() as i: - i.execute('INSERT INTO artifacts (id, job_id, meta) VALUES (%s, %s, %s)', [job_id, job_id, meta]) - if cnt % 100 == 0: - logging.info(f'downloaded {cnt}/{total} logs') - conn.commit() - logging.info(f'downloaded {cnt} logs') - return True - - -def download_job_artifacts(conn): - logging.info('downloading job artifacts') - with conn.cursor() as c: - c.execute(f""" -select ja.meta from -(select j.key,j.id job_id, a->>'id' aid, a as meta from jobs j, json_array_elements(j.meta->'artifacts') as a) as ja -left join artifacts a on a.job_id = ja.job_id and a.id=ja.aid -where a.id IS NULL""") - total = c.rowcount - logging.info(f'will download {total} artifacts') - cnt = 0 - for row in c: - meta = benedict(row[0]) - cnt += 1 - try: - content, en = download_text(meta.get('download_url')) - meta['encoding'] = en - with conn.cursor() as i: - i.execute('INSERT INTO artifacts (id, job_id, content, meta) VALUES (%s, %s, %s, %s)', - [meta.get('id'), meta.get('job_id'), content, meta]) - except: - meta['failure'] = traceback.format_exc() - logging.error(f'download artifact failed {meta["failure"]} {meta.get("download_url")}') - with conn.cursor() as i: - i.execute('INSERT INTO artifacts (id, job_id, meta) VALUES (%s, %s, %s)', - [meta.get('id'), meta.get('job_id'), meta]) - if cnt % 100 == 0: - logging.info(f'downloaded {cnt}/{total} artifacts') - conn.commit() - logging.info(f'downloaded {cnt} artifacts') - return True - - -def insert_new_builds(conn): - logging.info('inserting new builds') - all_builds = [] - page = 1 - while page < 10000: - logging.info(f'checking page #{page}') - re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', - params={'page': page}, - headers={'Authorization': token}) - if re.status_code != 200: - logging.error(f'list builds response status: {re.status_code}') - sys.exit(1) - x = re.json() - if not x: - logging.warning('empty response') - break - page += 1 - all_builds.extend(x) - b = x[-1] - with conn.cursor() as c: - c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) - if c.fetchone()[0] != 0: - logging.info(f"found existing build {b.get('id')}") - break - all_builds.reverse() - logging.info(f'{len(all_builds)} builds loaded') - cnt = 0 - for b in all_builds: - with conn.cursor() as c: - c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s) ON CONFLICT (id) DO UPDATE SET raw = %s', - [b.get('id'), b, b]) - cnt += 1 - if cnt % 100 == 0: - logging.info(f'{cnt} builds inserted / updated') - conn.commit() - conn.commit() - logging.info(f'{cnt} builds inserted') - return cnt - - -def insert_all_builds(conn): - logging.info('inserting all builds') - page = 1 - cnt = 0 - while page < 100000: - logging.info(f'checking page #{page}') - re = requests.get('https://api.buildkite.com/v2/organizations/llvm-project/builds', - params={'page': page}, - headers={'Authorization': token}) - if re.status_code != 200: - logging.error(f'list builds response status: {re.status_code}') - sys.exit(1) - x = re.json() - if not x: - logging.warning('empty response') - break - page += 1 - for b in x: - with conn.cursor() as c: - c.execute('SELECT count(1) FROM builds WHERE id = %s', (b.get('id'),)) - if c.fetchone()[0] == 0: - c.execute('INSERT INTO builds (id, raw) VALUES (%s, %s)', [b.get('id'), b]) - cnt += 1 - if cnt % 100 == 0: - logging.info(f'{cnt} builds inserted') - conn.commit() - conn.commit() - logging.info(f'{cnt} builds inserted') - return cnt - - -def update_running_builds(conn): - with conn.cursor() as c: - c.execute(""" -SELECT key, raw -FROM builds b -WHERE b.raw->>'state' IN ('running', 'scheduled', 'canceling')""") - cnt = 0 - total = c.rowcount - logging.info(f'checking {total} running builds') - for row in c: - key = row[0] - raw = row[1] - logging.info(f'running build {key}') - re = requests.get(raw['url'], headers={'Authorization': token}) - logging.info(f"{raw['url']} -> {re.status_code}") - if re.status_code != 200: - # mark build as "invalid" - continue - j = re.json() - logging.info(f"state {raw['state']} -> {j['state']}") - with conn.cursor() as u: - u.execute("""UPDATE builds SET raw = %s WHERE key = %s""", (j, key)) - cnt += 1 - if cnt % 100 == 0: - conn.commit() - conn.commit() - - -def download_job_artifacts_list(conn): - logging.info('download jobs artifact lsits') - with conn.cursor() as c: - c.execute(""" -SELECT key, raw->>'artifacts_url', meta -FROM jobs -WHERE (meta->>'artifacts' IS NULL) AND (raw->>'artifacts_url' IS NOT NULL)""") - cnt = 0 - total = c.rowcount - logging.info(f'will download {total} artifact lists') - for row in c: - key = row[0] - url = row[1] - meta = row[2] - if meta is None: - meta = {} - r = requests.get(url, allow_redirects=True, headers={'Authorization': token}) - if r.status_code != 200: - logging.error(f'cannot load artifacts_url {r.status_code} {url}') - continue - meta['artifacts'] = r.json() - with conn.cursor() as i: - i.execute('UPDATE jobs SET meta = %s WHERE key = %s', (meta, key)) - cnt += 1 - if cnt % 100 == 0: - logging.info(f'downloaded {cnt}/{total} artifact lists') - conn.commit() - logging.info(f'downloaded {cnt} artifact lists') - conn.commit() - - -def insert_new_jobs(conn): - logging.info('inserting new jobs') - with conn.cursor() as c: - c.execute("""select bj.id, bj.jid, bj.job from -(select b.id, j->>'id' jid, j as job from builds b, json_array_elements(b.raw->'jobs') as j -WHERE b.raw->>'state' NOT IN ('running', 'scheduled', 'canceling')) as bj -left join jobs j on j.id = bj.jid -where j.id IS NULL""") - total = c.rowcount - cnt = 0 - logging.info(f'will insert {total} jobs') - for row in c: - build_id = row[0] - job_id = row[1] - job = benedict(row[2]) - meta = {} - # durations - runnable_at = job.get('runnable_at') - started_at = job.get('started_at') - finished_at = job.get('finished_at') - if (runnable_at is not None) and (started_at is not None) and (finished_at is not None): - runnable_at = dateutil.parser.parse(runnable_at) - started_at = dateutil.parser.parse(started_at) - finished_at = dateutil.parser.parse(finished_at) - meta['queue_time'] = (started_at - runnable_at).total_seconds() - meta['run_time'] = (finished_at - started_at).total_seconds() - meta['total_time'] = (finished_at - runnable_at).total_seconds() - # agent data - for e in job.get('agent.meta_data', []): - p = e.split('=') - if p[0] == 'queue': - meta['agent_queue'] = p[1] - if p[0] == 'name': - meta['agent_name'] = p[1] - with conn.cursor() as i: - i.execute('INSERT INTO jobs (id, build_id, raw, meta) VALUES (%s, %s, %s, %s)', - [job_id, build_id, job, meta]) - cnt += 1 - if cnt % 100 == 0: - logging.info(f'inserted {cnt}/{total} jobs') - conn.commit() - logging.info(f'inserted {cnt} jobs') - conn.commit() - - -if __name__ == '__main__': - logging.basicConfig(level='INFO', format='%(levelname)-7s %(message)s') - cn = connect() - logging.info('downloading buildkite data') - # insert_all_builds(cn) - # insert_new_builds(cn) - # update_running_builds(cn) - # insert_new_jobs(cn) - # download_job_artifacts_list(cn) - # download_job_artifacts(cn) - # download_job_logs(cn)