1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2025-02-20 10:48:40 +01:00

Make daemons ignore "Unreachable" commits and avoid duplicate work

Summary:
Ref T9028. This improves the daemon behavior for unreachable commits. There is still no way for commits to become marked unreachable on their own.

  - When a daemon encounters an unreachable commit, fail permanently.
  - When we revive a commit, queue new daemons to process it (since some of the daemons might have failed permanently the first time around).
  - Before doing a step on a commit, check if the step has already been done and skip it if it has. This can't happen normally, but will soon be possible if a commit is repeatedly deleted and revived very quickly.
  - Steps queued with `bin/repository reparse ...` still execute normally.

Test Plan:
  - Used `bin/repository reparse` to run every step, verified they all mark the commit with the proper flag.
  - Faked the `reparse` exception in the "skip step" code, used `repository reparse` to skip every step.
  - Marked a commit as unreachable, ran `discover`, saw daemons queue for it.
  - Ran daemons with `bin/worker execute --id ...`, saw them all skip + queue the next step.
  - Marked a commit as unreachable, ran `bin/repository reparse` on it, got permanent failures immediately for each step.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T9028

Differential Revision: https://secure.phabricator.com/D16131
This commit is contained in:
epriestley 2016-06-16 07:50:23 -07:00
parent ec89c7d63e
commit 77ee518d88
9 changed files with 148 additions and 97 deletions

View file

@ -514,10 +514,16 @@ final class PhabricatorRepositoryDiscoveryEngine
$repository->getID(),
$commit_identifier);
if ($conn_w->getAffectedRows()) {
$commit = $commit->loadOneWhere(
'repositoryID = %d AND commitIdentifier = %s',
$repository->getID(),
$commit_identifier);
// After reviving a commit, schedule new daemons for it.
$this->didDiscoverCommit($repository, $commit, $epoch);
return;
}
$commit->setRepositoryID($repository->getID());
$commit->setCommitIdentifier($commit_identifier);
$commit->setEpoch($epoch);
@ -575,21 +581,7 @@ final class PhabricatorRepositoryDiscoveryEngine
}
$commit->saveTransaction();
$this->insertTask($repository, $commit);
queryfx(
$conn_w,
'INSERT INTO %T (repositoryID, size, lastCommitID, epoch)
VALUES (%d, 1, %d, %d)
ON DUPLICATE KEY UPDATE
size = size + 1,
lastCommitID =
IF(VALUES(epoch) > epoch, VALUES(lastCommitID), lastCommitID),
epoch = IF(VALUES(epoch) > epoch, VALUES(epoch), epoch)',
PhabricatorRepository::TABLE_SUMMARY,
$repository->getID(),
$commit->getID(),
$epoch);
$this->didDiscoverCommit($repository, $commit, $epoch);
if ($this->repairMode) {
// Normally, the query should throw a duplicate key exception. If we
@ -614,6 +606,29 @@ final class PhabricatorRepositoryDiscoveryEngine
}
}
private function didDiscoverCommit(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit,
$epoch) {
$this->insertTask($repository, $commit);
// Update the repository summary table.
queryfx(
$commit->establishConnection('w'),
'INSERT INTO %T (repositoryID, size, lastCommitID, epoch)
VALUES (%d, 1, %d, %d)
ON DUPLICATE KEY UPDATE
size = size + 1,
lastCommitID =
IF(VALUES(epoch) > epoch, VALUES(lastCommitID), lastCommitID),
epoch = IF(VALUES(epoch) > epoch, VALUES(epoch), epoch)',
PhabricatorRepository::TABLE_SUMMARY,
$repository->getID(),
$commit->getID(),
$epoch);
}
private function didDiscoverRefs(array $refs) {
foreach ($refs as $ref) {
$this->workingSet[$ref->getIdentifier()] = true;

View file

@ -3,6 +3,10 @@
final class PhabricatorRepositoryCommitHeraldWorker
extends PhabricatorRepositoryCommitParserWorker {
protected function getImportStepFlag() {
return PhabricatorRepositoryCommit::IMPORTED_HERALD;
}
public function getRequiredLeaseTime() {
// Herald rules may take a long time to process.
return phutil_units('4 hours in seconds');
@ -12,6 +16,12 @@ final class PhabricatorRepositoryCommitHeraldWorker
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit) {
if ($this->shouldSkipImportStep()) {
// This worker has no followup tasks, so we can just bail out
// right away without queueing anything.
return;
}
// Reload the commit to pull commit data and audit requests.
$commit = id(new DiffusionCommitQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())

View file

@ -3,14 +3,18 @@
final class PhabricatorRepositoryCommitOwnersWorker
extends PhabricatorRepositoryCommitParserWorker {
protected function getImportStepFlag() {
return PhabricatorRepositoryCommit::IMPORTED_OWNERS;
}
protected function parseCommit(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit) {
$this->triggerOwnerAudits($repository, $commit);
$commit->writeImportStatusFlag(
PhabricatorRepositoryCommit::IMPORTED_OWNERS);
if (!$this->shouldSkipImportStep()) {
$this->triggerOwnerAudits($repository, $commit);
$commit->writeImportStatusFlag($this->getImportStepFlag());
}
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(

View file

@ -26,6 +26,14 @@ abstract class PhabricatorRepositoryCommitParserWorker
pht('Commit "%s" does not exist.', $commit_id));
}
if ($commit->isUnreachable()) {
throw new PhabricatorWorkerPermanentFailureException(
pht(
'Commit "%s" has been deleted: it is no longer reachable from '.
'any ref.',
$commit_id));
}
$this->commit = $commit;
return $commit;
@ -44,6 +52,42 @@ abstract class PhabricatorRepositoryCommitParserWorker
return !idx($this->getTaskData(), 'only');
}
protected function getImportStepFlag() {
return null;
}
final protected function shouldSkipImportStep() {
// If this step has already been performed and this is a "natural" task
// which was queued by the normal daemons, decline to do the work again.
// This mitigates races if commits are rapidly deleted and revived.
$flag = $this->getImportStepFlag();
if (!$flag) {
// This step doesn't have an associated flag.
return false;
}
$commit = $this->commit;
if (!$commit->isPartiallyImported($flag)) {
// This commit doesn't have the flag set yet.
return false;
}
if (!$this->shouldQueueFollowupTasks()) {
// This task was queued by administrative tools, so do the work even
// if it duplicates existing work.
return false;
}
$this->log(
"%s\n",
pht(
'Skipping import step; this step was previously completed for '.
'this commit.'));
return true;
}
abstract protected function parseCommit(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit);

View file

@ -3,6 +3,10 @@
abstract class PhabricatorRepositoryCommitChangeParserWorker
extends PhabricatorRepositoryCommitParserWorker {
protected function getImportStepFlag() {
return PhabricatorRepositoryCommit::IMPORTED_CHANGE;
}
public function getRequiredLeaseTime() {
// It can take a very long time to parse commits; some commits in the
// Facebook repository affect many millions of paths. Acquire 24h leases.
@ -23,9 +27,15 @@ abstract class PhabricatorRepositoryCommitChangeParserWorker
return;
}
$results = $this->parseCommitChanges($repository, $commit);
if ($results) {
$this->writeCommitChanges($repository, $commit, $results);
if (!$this->shouldSkipImportStep()) {
$results = $this->parseCommitChanges($repository, $commit);
if ($results) {
$this->writeCommitChanges($repository, $commit, $results);
}
$commit->writeImportStatusFlag($this->getImportStepFlag());
PhabricatorSearchWorker::queueDocumentForIndexing($commit->getPHID());
}
$this->finishParse();
@ -85,12 +95,6 @@ abstract class PhabricatorRepositoryCommitChangeParserWorker
protected function finishParse() {
$commit = $this->commit;
$commit->writeImportStatusFlag(
PhabricatorRepositoryCommit::IMPORTED_CHANGE);
PhabricatorSearchWorker::queueDocumentForIndexing($commit->getPHID());
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(
'PhabricatorRepositoryCommitOwnersWorker',

View file

@ -3,42 +3,52 @@
abstract class PhabricatorRepositoryCommitMessageParserWorker
extends PhabricatorRepositoryCommitParserWorker {
abstract protected function parseCommitWithRef(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit,
DiffusionCommitRef $ref);
protected function getImportStepFlag() {
return PhabricatorRepositoryCommit::IMPORTED_MESSAGE;
}
abstract protected function getFollowupTaskClass();
final protected function parseCommit(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit) {
$viewer = PhabricatorUser::getOmnipotentUser();
if (!$this->shouldSkipImportStep()) {
$viewer = PhabricatorUser::getOmnipotentUser();
$refs_raw = DiffusionQuery::callConduitWithDiffusionRequest(
$viewer,
DiffusionRequest::newFromDictionary(
$refs_raw = DiffusionQuery::callConduitWithDiffusionRequest(
$viewer,
DiffusionRequest::newFromDictionary(
array(
'repository' => $repository,
'user' => $viewer,
)),
'diffusion.querycommits',
array(
'repository' => $repository,
'user' => $viewer,
)),
'diffusion.querycommits',
array(
'repositoryPHID' => $repository->getPHID(),
'phids' => array($commit->getPHID()),
'bypassCache' => true,
'needMessages' => true,
));
'repositoryPHID' => $repository->getPHID(),
'phids' => array($commit->getPHID()),
'bypassCache' => true,
'needMessages' => true,
));
if (empty($refs_raw['data'])) {
throw new Exception(
pht(
'Unable to retrieve details for commit "%s"!',
$commit->getPHID()));
if (empty($refs_raw['data'])) {
throw new Exception(
pht(
'Unable to retrieve details for commit "%s"!',
$commit->getPHID()));
}
$ref = DiffusionCommitRef::newFromConduitResult(head($refs_raw['data']));
$this->updateCommitData($ref);
}
$ref = DiffusionCommitRef::newFromConduitResult(head($refs_raw['data']));
$this->parseCommitWithRef($repository, $commit, $ref);
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(
$this->getFollowupTaskClass(),
array(
'commitID' => $commit->getID(),
));
}
}
final protected function updateCommitData(DiffusionCommitRef $ref) {

View file

@ -3,20 +3,8 @@
final class PhabricatorRepositoryGitCommitMessageParserWorker
extends PhabricatorRepositoryCommitMessageParserWorker {
protected function parseCommitWithRef(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit,
DiffusionCommitRef $ref) {
$this->updateCommitData($ref);
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(
'PhabricatorRepositoryGitCommitChangeParserWorker',
array(
'commitID' => $commit->getID(),
));
}
protected function getFollowupTaskClass() {
return 'PhabricatorRepositoryGitCommitChangeParserWorker';
}
}

View file

@ -3,20 +3,8 @@
final class PhabricatorRepositoryMercurialCommitMessageParserWorker
extends PhabricatorRepositoryCommitMessageParserWorker {
protected function parseCommitWithRef(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit,
DiffusionCommitRef $ref) {
$this->updateCommitData($ref);
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(
'PhabricatorRepositoryMercurialCommitChangeParserWorker',
array(
'commitID' => $commit->getID(),
));
}
protected function getFollowupTaskClass() {
return 'PhabricatorRepositoryMercurialCommitChangeParserWorker';
}
}

View file

@ -3,20 +3,8 @@
final class PhabricatorRepositorySvnCommitMessageParserWorker
extends PhabricatorRepositoryCommitMessageParserWorker {
protected function parseCommitWithRef(
PhabricatorRepository $repository,
PhabricatorRepositoryCommit $commit,
DiffusionCommitRef $ref) {
$this->updateCommitData($ref);
if ($this->shouldQueueFollowupTasks()) {
$this->queueTask(
'PhabricatorRepositorySvnCommitChangeParserWorker',
array(
'commitID' => $commit->getID(),
));
}
protected function getFollowupTaskClass() {
return 'PhabricatorRepositorySvnCommitChangeParserWorker';
}
}