1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2025-01-18 02:31:10 +01:00

Split the GitHub import cursor into separate repository and issues event importers

Summary:
Ref T10538. The primary GitHub event activity stream does not report minor events (labels, milestones, etc).

GitHub has a second, similar activity stream which does report these events (the "Issues Events API").

Use two separate cursors: one consumes the primary stream; the second consumes the events stream.

One possible issue with this is that we may write events in a different order than they occurred, so GitHub shows "comment, label, close" but we show "comment, close, label" or similar. This is probably OK because the secondary API doesn't seem to have any very important events (e.g., it's probably fine if label changes are out-of-order), but we can conceivably put some buffer stage in between the two if it's an issue.

Test Plan: {F1164894}

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T10538

Differential Revision: https://secure.phabricator.com/D15446
This commit is contained in:
epriestley 2016-03-09 05:35:08 -08:00
parent 1e83aef880
commit 72889c09bf
8 changed files with 361 additions and 223 deletions

View file

@ -1421,6 +1421,8 @@ phutil_register_library_map(array(
'NuanceController' => 'applications/nuance/controller/NuanceController.php',
'NuanceDAO' => 'applications/nuance/storage/NuanceDAO.php',
'NuanceGitHubEventItemType' => 'applications/nuance/item/NuanceGitHubEventItemType.php',
'NuanceGitHubImportCursor' => 'applications/nuance/cursor/NuanceGitHubImportCursor.php',
'NuanceGitHubIssuesImportCursor' => 'applications/nuance/cursor/NuanceGitHubIssuesImportCursor.php',
'NuanceGitHubRepositoryImportCursor' => 'applications/nuance/cursor/NuanceGitHubRepositoryImportCursor.php',
'NuanceGitHubRepositorySourceDefinition' => 'applications/nuance/source/NuanceGitHubRepositorySourceDefinition.php',
'NuanceImportCursor' => 'applications/nuance/cursor/NuanceImportCursor.php',
@ -5677,7 +5679,9 @@ phutil_register_library_map(array(
'NuanceController' => 'PhabricatorController',
'NuanceDAO' => 'PhabricatorLiskDAO',
'NuanceGitHubEventItemType' => 'NuanceItemType',
'NuanceGitHubRepositoryImportCursor' => 'NuanceImportCursor',
'NuanceGitHubImportCursor' => 'NuanceImportCursor',
'NuanceGitHubIssuesImportCursor' => 'NuanceGitHubImportCursor',
'NuanceGitHubRepositoryImportCursor' => 'NuanceGitHubImportCursor',
'NuanceGitHubRepositorySourceDefinition' => 'NuanceSourceDefinition',
'NuanceImportCursor' => 'Phobject',
'NuanceImportCursorData' => array(

View file

@ -0,0 +1,258 @@
<?php
abstract class NuanceGitHubImportCursor
extends NuanceImportCursor {
abstract protected function getGitHubAPIEndpointURI($user, $repository);
abstract protected function newNuanceItemFromGitHubRecord(array $record);
protected function getMaximumPage() {
return 100;
}
protected function getPageSize() {
return 100;
}
protected function getMinimumDelayBetweenPolls() {
// Even if GitHub says we can, don't poll more than once every few seconds.
// In particular, the Issue Events API does not advertise a poll interval
// in a header.
return 5;
}
final protected function shouldPullDataFromSource() {
$now = PhabricatorTime::getNow();
// Respect GitHub's poll interval header. If we made a request recently,
// don't make another one until we've waited long enough.
$ttl = $this->getCursorProperty('github.poll.ttl');
if ($ttl && ($ttl >= $now)) {
$this->logInfo(
pht(
'Respecting "%s" or minimum poll delay: waiting for %s second(s) '.
'to poll GitHub.',
'X-Poll-Interval',
new PhutilNumber(1 + ($ttl - $now))));
return false;
}
// Respect GitHub's API rate limiting. If we've exceeded the rate limit,
// wait until it resets to try again.
$limit = $this->getCursorProperty('github.limit.ttl');
if ($limit && ($limit >= $now)) {
$this->logInfo(
pht(
'Respecting "%s": waiting for %s second(s) to poll GitHub.',
'X-RateLimit-Reset',
new PhutilNumber(1 + ($limit - $now))));
return false;
}
return true;
}
final protected function pullDataFromSource() {
$viewer = $this->getViewer();
$now = PhabricatorTime::getNow();
$source = $this->getSource();
$user = $source->getSourceProperty('github.user');
$repository = $source->getSourceProperty('github.repository');
$api_token = $source->getSourceProperty('github.token');
// This API only supports fetching 10 pages of 30 events each, for a total
// of 300 events.
$etag = null;
$new_items = array();
$hit_known_items = false;
$max_page = $this->getMaximumPage();
$page_size = $this->getPageSize();
for ($page = 1; $page <= $max_page; $page++) {
$uri = $this->getGitHubAPIEndpointURI($user, $repository);
$data = array(
'page' => $page,
'per_page' => $page_size,
);
$future = id(new PhutilGitHubFuture())
->setAccessToken($api_token)
->setRawGitHubQuery($uri, $data);
if ($page == 1) {
$cursor_etag = $this->getCursorProperty('github.poll.etag');
if ($cursor_etag) {
$future->addHeader('If-None-Match', $cursor_etag);
}
}
$this->logInfo(
pht(
'Polling GitHub Repository API endpoint "%s".',
$uri));
$response = $future->resolve();
// Do this first: if we hit the rate limit, we get a response but the
// body isn't valid.
$this->updateRateLimits($response);
if ($response->getStatus()->getStatusCode() == 304) {
$this->logInfo(
pht(
'Received a 304 Not Modified from GitHub, no new events.'));
}
// This means we hit a rate limit or a "Not Modified" because of the
// "ETag" header. In either case, we should bail out.
if ($response->getStatus()->isError()) {
$this->updatePolling($response, $now, false);
$this->getCursorData()->save();
return false;
}
if ($page == 1) {
$etag = $response->getHeaderValue('ETag');
}
$records = $response->getBody();
foreach ($records as $record) {
$item = $this->newNuanceItemFromGitHubRecord($record);
$item_key = $item->getItemKey();
$this->logInfo(
pht(
'Fetched event "%s".',
$item_key));
$new_items[$item->getItemKey()] = $item;
}
if ($new_items) {
$existing = id(new NuanceItemQuery())
->setViewer($viewer)
->withSourcePHIDs(array($source->getPHID()))
->withItemKeys(array_keys($new_items))
->execute();
$existing = mpull($existing, null, 'getItemKey');
foreach ($new_items as $key => $new_item) {
if (isset($existing[$key])) {
unset($new_items[$key]);
$hit_known_items = true;
$this->logInfo(
pht(
'Event "%s" is previously known.',
$key));
}
}
}
if ($hit_known_items) {
break;
}
if (count($records) < $page_size) {
break;
}
}
// TODO: When we go through the whole queue without hitting anything we
// have seen before, we should record some sort of global event so we
// can tell the user when the bridging started or was interrupted?
if (!$hit_known_items) {
$already_polled = $this->getCursorProperty('github.polled');
if ($already_polled) {
// TODO: This is bad: we missed some items, maybe because too much
// stuff happened too fast or the daemons were broken for a long
// time.
} else {
// TODO: This is OK, we're doing the initial import.
}
}
if ($etag !== null) {
$this->updateETag($etag);
}
$this->updatePolling($response, $now, true);
// Reverse the new items so we insert them in chronological order.
$new_items = array_reverse($new_items);
$source->openTransaction();
foreach ($new_items as $new_item) {
$new_item->save();
}
$this->getCursorData()->save();
$source->saveTransaction();
foreach ($new_items as $new_item) {
$new_item->scheduleUpdate();
}
return false;
}
private function updateRateLimits(PhutilGitHubResponse $response) {
$remaining = $response->getHeaderValue('X-RateLimit-Remaining');
$limit_reset = $response->getHeaderValue('X-RateLimit-Reset');
$now = PhabricatorTime::getNow();
$limit_ttl = null;
if (strlen($remaining)) {
$remaining = (int)$remaining;
if (!$remaining) {
$limit_ttl = (int)$limit_reset;
}
}
$this->setCursorProperty('github.limit.ttl', $limit_ttl);
$this->logInfo(
pht(
'This key has %s remaining API request(s), '.
'limit resets in %s second(s).',
new PhutilNumber($remaining),
new PhutilNumber($limit_reset - $now)));
}
private function updateETag($etag) {
$this->setCursorProperty('github.poll.etag', $etag);
$this->logInfo(
pht(
'ETag for this request was "%s".',
$etag));
}
private function updatePolling(
PhutilGitHubResponse $response,
$start,
$success) {
if ($success) {
$this->setCursorProperty('github.polled', true);
}
$poll_interval = (int)$response->getHeaderValue('X-Poll-Interval');
$poll_interval = max($this->getMinimumDelayBetweenPolls(), $poll_interval);
$poll_ttl = $start + $poll_interval;
$this->setCursorProperty('github.poll.ttl', $poll_ttl);
$now = PhabricatorTime::getNow();
$this->logInfo(
pht(
'Set API poll TTL to +%s second(s) (%s second(s) from now).',
new PhutilNumber($poll_interval),
new PhutilNumber($poll_ttl - $now)));
}
}

View file

@ -0,0 +1,30 @@
<?php
final class NuanceGitHubIssuesImportCursor
extends NuanceGitHubImportCursor {
const CURSORTYPE = 'github.issues';
protected function getGitHubAPIEndpointURI($user, $repository) {
return "/repos/{$user}/{$repository}/issues/events";
}
protected function newNuanceItemFromGitHubRecord(array $record) {
$source = $this->getSource();
$id = $record['id'];
$item_key = "github.issueevent.{$id}";
$container_key = null;
return NuanceItem::initializeNewItem()
->setStatus(NuanceItem::STATUS_IMPORTING)
->setSourcePHID($source->getPHID())
->setItemType(NuanceGitHubEventItemType::ITEMTYPE)
->setItemKey($item_key)
->setItemContainerKey($container_key)
->setItemProperty('api.type', 'issue')
->setItemProperty('api.raw', $record);
}
}

View file

@ -1,236 +1,23 @@
<?php
final class NuanceGitHubRepositoryImportCursor
extends NuanceImportCursor {
extends NuanceGitHubImportCursor {
const CURSORTYPE = 'github.repository';
protected function shouldPullDataFromSource() {
$now = PhabricatorTime::getNow();
// Respect GitHub's poll interval header. If we made a request recently,
// don't make another one until we've waited long enough.
$ttl = $this->getCursorProperty('github.poll.ttl');
if ($ttl && ($ttl >= $now)) {
$this->logInfo(
pht(
'Respecting "%s": waiting for %s second(s) to poll GitHub.',
'X-Poll-Interval',
new PhutilNumber(1 + ($ttl - $now))));
return false;
}
// Respect GitHub's API rate limiting. If we've exceeded the rate limit,
// wait until it resets to try again.
$limit = $this->getCursorProperty('github.limit.ttl');
if ($limit && ($limit >= $now)) {
$this->logInfo(
pht(
'Respecting "%s": waiting for %s second(s) to poll GitHub.',
'X-RateLimit-Reset',
new PhutilNumber(1 + ($limit - $now))));
return false;
}
return true;
protected function getGitHubAPIEndpointURI($user, $repository) {
return "/repos/{$user}/{$repository}/events";
}
protected function pullDataFromSource() {
$viewer = $this->getViewer();
$now = PhabricatorTime::getNow();
$source = $this->getSource();
$user = $source->getSourceProperty('github.user');
$repository = $source->getSourceProperty('github.repository');
$api_token = $source->getSourceProperty('github.token');
// This API only supports fetching 10 pages of 30 events each, for a total
// of 300 events.
$etag = null;
$new_items = array();
$hit_known_items = false;
for ($page = 1; $page <= 10; $page++) {
$uri = "/repos/{$user}/{$repository}/events";
$data = array(
'page' => $page,
);
$future = id(new PhutilGitHubFuture())
->setAccessToken($api_token)
->setRawGitHubQuery($uri, $data);
if ($page == 1) {
$cursor_etag = $this->getCursorProperty('github.poll.etag');
if ($cursor_etag) {
$future->addHeader('If-None-Match', $cursor_etag);
}
}
$this->logInfo(
pht(
'Polling GitHub Repository API endpoint "%s".',
$uri));
$response = $future->resolve();
// Do this first: if we hit the rate limit, we get a response but the
// body isn't valid.
$this->updateRateLimits($response);
if ($response->getStatus()->getStatusCode() == 304) {
$this->logInfo(
pht(
'Received a 304 Not Modified from GitHub, no new events.'));
}
// This means we hit a rate limit or a "Not Modified" because of the
// "ETag" header. In either case, we should bail out.
if ($response->getStatus()->isError()) {
$this->updatePolling($response, $now, false);
$this->getCursorData()->save();
return false;
}
if ($page == 1) {
$etag = $response->getHeaderValue('ETag');
}
$records = $response->getBody();
foreach ($records as $record) {
$item = $this->newNuanceItemFromGitHubEvent($record);
$item_key = $item->getItemKey();
$this->logInfo(
pht(
'Fetched event "%s".',
$item_key));
$new_items[$item->getItemKey()] = $item;
}
if ($new_items) {
$existing = id(new NuanceItemQuery())
->setViewer($viewer)
->withSourcePHIDs(array($source->getPHID()))
->withItemKeys(array_keys($new_items))
->execute();
$existing = mpull($existing, null, 'getItemKey');
foreach ($new_items as $key => $new_item) {
if (isset($existing[$key])) {
unset($new_items[$key]);
$hit_known_items = true;
$this->logInfo(
pht(
'Event "%s" is previously known.',
$key));
}
}
}
if ($hit_known_items) {
break;
}
if (count($records) < 30) {
break;
}
}
// TODO: When we go through the whole queue without hitting anything we
// have seen before, we should record some sort of global event so we
// can tell the user when the bridging started or was interrupted?
if (!$hit_known_items) {
$already_polled = $this->getCursorProperty('github.polled');
if ($already_polled) {
// TODO: This is bad: we missed some items, maybe because too much
// stuff happened too fast or the daemons were broken for a long
// time.
} else {
// TODO: This is OK, we're doing the initial import.
}
}
if ($etag !== null) {
$this->updateETag($etag);
}
$this->updatePolling($response, $now, true);
// Reverse the new items so we insert them in chronological order.
$new_items = array_reverse($new_items);
$source->openTransaction();
foreach ($new_items as $new_item) {
$new_item->save();
}
$this->getCursorData()->save();
$source->saveTransaction();
foreach ($new_items as $new_item) {
$new_item->scheduleUpdate();
}
return false;
protected function getMaximumPage() {
return 10;
}
private function updateRateLimits(PhutilGitHubResponse $response) {
$remaining = $response->getHeaderValue('X-RateLimit-Remaining');
$limit_reset = $response->getHeaderValue('X-RateLimit-Reset');
$now = PhabricatorTime::getNow();
$limit_ttl = null;
if (strlen($remaining)) {
$remaining = (int)$remaining;
if (!$remaining) {
$limit_ttl = (int)$limit_reset;
}
}
$this->setCursorProperty('github.limit.ttl', $limit_ttl);
$this->logInfo(
pht(
'This key has %s remaining API request(s), '.
'limit resets in %s second(s).',
new PhutilNumber($remaining),
new PhutilNumber($limit_reset - $now)));
protected function getPageSize() {
return 30;
}
private function updateETag($etag) {
$this->setCursorProperty('github.poll.etag', $etag);
$this->logInfo(
pht(
'ETag for this request was "%s".',
$etag));
}
private function updatePolling(
PhutilGitHubResponse $response,
$start,
$success) {
if ($success) {
$this->setCursorProperty('github.polled', true);
}
$poll_interval = (int)$response->getHeaderValue('X-Poll-Interval');
$poll_ttl = $start + $poll_interval;
$this->setCursorProperty('github.poll.ttl', $poll_ttl);
$now = PhabricatorTime::getNow();
$this->logInfo(
pht(
'Set API poll TTL to +%s second(s) (%s second(s) from now).',
new PhutilNumber($poll_interval),
new PhutilNumber($poll_ttl - $now)));
}
private function newNuanceItemFromGitHubEvent(array $record) {
protected function newNuanceItemFromGitHubRecord(array $record) {
$source = $this->getSource();
$id = $record['id'];
@ -255,6 +42,7 @@ final class NuanceGitHubRepositoryImportCursor
->setItemType(NuanceGitHubEventItemType::ITEMTYPE)
->setItemKey($item_key)
->setItemContainerKey($container_key)
->setItemProperty('api.type', 'repository')
->setItemProperty('api.raw', $record);
}

View file

@ -14,6 +14,27 @@ final class NuanceGitHubEventItemType
}
public function getItemDisplayName(NuanceItem $item) {
$api_type = $item->getItemProperty('api.type');
switch ($api_type) {
case 'issue':
return $this->getGitHubIssueAPIEventDisplayName($item);
case 'repository':
return $this->getGitHubRepositoryAPIEventDisplayName($item);
default:
return pht('GitHub Event (Unknown API Type "%s")', $api_type);
}
}
private function getGitHubIssueAPIEventDisplayName(NuanceItem $item) {
$raw = $item->getItemProperty('api.raw', array());
$action = idxv($raw, array('event'));
$number = idxv($raw, array('issue', 'number'));
return pht('GitHub Issue #%d (%s)', $number, $action);
}
private function getGitHubRepositoryAPIEventDisplayName(NuanceItem $item) {
$raw = $item->getItemProperty('api.raw', array());
$repo = idxv($raw, array('repo', 'name'), pht('<unknown/unknown>'));

View file

@ -15,6 +15,11 @@ final class NuanceManagementImportWorkflow
'param' => 'source',
'help' => pht('Choose which source to import.'),
),
array(
'name' => 'cursor',
'param' => 'cursor',
'help' => pht('Import only a particular cursor.'),
),
));
}
@ -40,6 +45,36 @@ final class NuanceManagementImportWorkflow
$source->getName()));
}
$select = $args->getArg('cursor');
if (strlen($select)) {
if (empty($cursors[$select])) {
throw new PhutilArgumentUsageException(
pht(
'This source ("%s") does not have a "%s" cursor. Available '.
'cursors: %s.',
$source->getName(),
$select,
implode(', ', array_keys($cursors))));
} else {
echo tsprintf(
"%s\n",
pht(
'Importing cursor "%s" only.',
$select));
$cursors = array_select_keys($cursors, array($select));
}
} else {
echo tsprintf(
"%s\n",
pht(
'Importing all cursors: %s.',
implode(', ', array_keys($cursors))));
echo tsprintf(
"%s\n",
pht('(Use --cursor to import only a particular cursor.)'));
}
foreach ($cursors as $cursor) {
$cursor->importFromSource();
}

View file

@ -23,6 +23,8 @@ final class NuanceGitHubRepositorySourceDefinition
return array(
id(new NuanceGitHubRepositoryImportCursor())
->setCursorKey('events.repository'),
id(new NuanceGitHubIssuesImportCursor())
->setCursorKey('events.issues'),
);
}

View file

@ -114,7 +114,7 @@ abstract class NuanceSourceDefinition extends Phobject {
->setCursorData($cursor_data);
}
return $cursors;
return $map;
}
protected function newImportCursors() {