1
0
Fork 0
mirror of https://gitlab.wikimedia.org/ladsgroup/Phabricator-maintenance-bot synced 2024-11-22 12:02:38 +01:00
Phabricator-maintenance-bot/lib.py

189 lines
5.7 KiB
Python
Raw Normal View History

2019-11-22 22:19:59 +01:00
import json
2019-11-26 12:36:02 +01:00
import sys
import time
2019-11-22 22:19:59 +01:00
import requests
class Client(object):
"""Phabricator client"""
def __init__(self, url, username, key):
self.url = url
self.username = username
self.column_cache = {}
self.phid_cache = {}
self.session = {
'token': key,
}
@classmethod
def newFromCreds(cls):
2019-11-26 12:36:02 +01:00
with open(sys.argv[1], 'r') as f:
2019-11-22 22:19:59 +01:00
creds = json.loads(f.read())
return cls(*creds)
def post(self, path, data):
data['__conduit__'] = self.session
r = requests.post('%s/api/%s' % (self.url, path), data={
'params': json.dumps(data),
'output': 'json',
})
resp = r.json()
if resp['error_code'] is not None:
raise Exception(resp['error_info'])
return resp['result']
def lookupPhid(self, label):
"""Lookup information on a Phab object by name."""
if not self.phid_cache.get(label):
r = self.post('phid.lookup', {'names': [label]})
if label in r and 'phid' in r[label]:
obj = r[label]['phid']
self.phid_cache[label] = obj
else:
raise Exception('No object found for %s' % label)
return self.phid_cache[label]
def getColumns(self, project_phid):
if not self.column_cache.get(project_phid):
self.column_cache[project_phid] = self.post(
'project.column.search', {
"constraints": {
"projects": [project_phid]}})
return self.column_cache[project_phid]
def moveColumns(self, task_phid, to_column):
self.post('maniphest.edit', {
'objectIdentifier': task_phid,
'transactions': [{
'type': 'column',
'value': [to_column],
}]
})
def setTaskDescription(self, task_phid, new_desc):
self.post('maniphest.edit', {
'objectIdentifier': task_phid,
'transactions': [{
'type': 'description',
'value': new_desc,
}]
})
def createSubtask(self, desc, project_phids, parent_phid, title):
self.post('maniphest.edit', {
'objectIdentifier': '',
'transactions': [{
'type': 'parent',
'value': parent_phid
},
{
'type': 'title',
'value': title
},
{
'type': 'description',
'value': desc,
},
{
'type': 'projects.add',
'value': project_phids
}]
})
def createParentTask(self, desc, project_phids, subtask_phid, title):
self.post('maniphest.edit', {
'objectIdentifier': '',
'transactions': [{
'type': 'subtasks.add',
'value': [subtask_phid]
},
{
'type': 'title',
'value': title
},
{
'type': 'description',
'value': desc,
},
{
'type': 'projects.add',
'value': project_phids
}]
})
2019-11-22 22:19:59 +01:00
def taskDetails(self, phid):
"""Lookup details of a Maniphest task."""
r = self.post('maniphest.query', {'phids': [phid]})
if phid in r:
return r[phid]
raise Exception('No task found for phid %s' % phid)
def getTransactions(self, phid):
r = self.post('transaction.search', {'objectIdentifier': phid})
if 'data' in r:
return r['data']
raise Exception('No transaction found for phid %s' % phid)
def removeProject(self, project_phid, task):
return self.removeProjectByPhid(project_phid, self.lookupPhid(task))
def removeProjectByPhid(self, project_phid, task_phid):
self.post('maniphest.edit', {
'objectIdentifier': task_phid,
'transactions': [{
'type': 'projects.remove',
'value': [project_phid],
}]
})
def getTasksWithProject(self, project_phid, continue_=None, statuses=None):
r = self._getTasksWithProjectContinue(
project_phid, continue_, statuses=statuses)
2019-11-22 22:19:59 +01:00
cursor = r['cursor']
for case in r['data']:
if case['type'] != 'TASK':
continue
yield case['phid']
if cursor.get('after'):
for case in self.getTasksWithProject(
project_phid, cursor['after']):
yield case
def _getTasksWithProjectContinue(self, project_phid, continue_=None, statuses=None):
2019-11-22 22:19:59 +01:00
params = {
'limit': 100,
'constraints': {
'projects': [project_phid],
2019-11-26 12:36:02 +01:00
"modifiedStart": int(time.time() - int(sys.argv[2]))
2019-11-22 22:19:59 +01:00
}
}
if continue_:
params['after'] = continue_
if statuses:
params['constraints']['statuses'] = statuses
2019-11-22 22:19:59 +01:00
return self.post('maniphest.search', params)
def getTaskColumns(self, phid):
params = {
"attachments": {
"columns": {"boards": {"columns": True}}
},
"constraints": {
"phids": [phid]
}
}
return self.post('maniphest.search', params)[
'data'][0]['attachments']['columns']
def getTaskSubtasks(self, phid):
params = {
"constraints": {
"phids": [phid],
"hasSubtasks": True
}
}
return self.post('maniphest.search', params)[
'data']