mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-09 16:32:39 +01:00
Push feed publishing deeper into the task queue
Summary: Ref T2852. I want to model Asana integration as a response to feed events. Currently, we queue one feed event for each HTTP hook. Instead, always queue one feed event and then have it queue any necessary followup events (now, http hooks; soon, asana). Add a script to make it easy to reproducibly fire feed event publishing. Test Plan: Republished a feed event and verified it hit configured HTTP hooks correctly. $ ./bin/feed republish 5765774156541908292 --trace >>> [2] <connect> phabricator2_feed <<< [2] <connect> 1,660 us >>> [3] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC <<< [3] <query> 595 us >>> [4] <connect> phabricator2_differential <<< [4] <connect> 760 us >>> [5] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [5] <query> 478 us >>> [6] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [6] <query> 449 us >>> [7] <connect> phabricator2_user <<< [7] <connect> 1,062 us >>> [8] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov') <<< [8] <query> 540 us >>> [9] <connect> phabricator2_file <<< [9] <connect> 951 us >>> [10] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7') <<< [10] <query> 498 us >>> [11] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo <<< [11] <query> 507 us Republishing story... >>> [12] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC <<< [12] <query> 685 us >>> [13] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [13] <query> 489 us >>> [14] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [14] <query> 512 us >>> [15] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov') <<< [15] <query> 601 us >>> [16] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7') <<< [16] <query> 405 us >>> [17] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo <<< [17] <query> 551 us >>> [18] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC <<< [18] <query> 507 us >>> [19] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [19] <query> 428 us >>> [20] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5') <<< [20] <query> 419 us >>> [21] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov') <<< [21] <query> 591 us >>> [22] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7') <<< [22] <query> 406 us >>> [23] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo <<< [23] <query> 593 us >>> [24] <http> http://127.0.0.1/derp/ <<< [24] <http> 746,157 us [2013-06-24 20:23:26] EXCEPTION: (HTTPFutureResponseStatusHTTP) [HTTP/500] Internal Server Error Reviewers: btrahan Reviewed By: btrahan CC: aran Maniphest Tasks: T2852 Differential Revision: https://secure.phabricator.com/D6291
This commit is contained in:
parent
0495eb1586
commit
42c0f060d5
11 changed files with 212 additions and 38 deletions
1
bin/feed
Symbolic link
1
bin/feed
Symbolic link
|
@ -0,0 +1 @@
|
|||
../scripts/setup/manage_feed.php
|
22
scripts/setup/manage_feed.php
Executable file
22
scripts/setup/manage_feed.php
Executable file
|
@ -0,0 +1,22 @@
|
|||
#!/usr/bin/env php
|
||||
<?php
|
||||
|
||||
$root = dirname(dirname(dirname(__FILE__)));
|
||||
require_once $root.'/scripts/__init_script__.php';
|
||||
|
||||
$args = new PhutilArgumentParser($argv);
|
||||
$args->setTagline('manage feed');
|
||||
$args->setSynopsis(<<<EOSYNOPSIS
|
||||
**feed** __command__ [__options__]
|
||||
Test and debug feed events.
|
||||
|
||||
EOSYNOPSIS
|
||||
);
|
||||
$args->parseStandardArguments();
|
||||
|
||||
$workflows = array(
|
||||
new PhabricatorFeedManagementRepublishWorkflow(),
|
||||
new PhutilHelpArgumentWorkflow(),
|
||||
);
|
||||
|
||||
$args->parseWorkflows($workflows);
|
|
@ -575,7 +575,9 @@ phutil_register_library_map(array(
|
|||
'DrydockSSHCommandInterface' => 'applications/drydock/interface/command/DrydockSSHCommandInterface.php',
|
||||
'DrydockWebrootInterface' => 'applications/drydock/interface/webroot/DrydockWebrootInterface.php',
|
||||
'DrydockWorkingCopyBlueprint' => 'applications/drydock/blueprint/DrydockWorkingCopyBlueprint.php',
|
||||
'FeedPublisherHTTPWorker' => 'applications/feed/worker/FeedPublisherHTTPWorker.php',
|
||||
'FeedPublisherWorker' => 'applications/feed/worker/FeedPublisherWorker.php',
|
||||
'FeedPushWorker' => 'applications/feed/worker/FeedPushWorker.php',
|
||||
'HarbormasterDAO' => 'applications/harbormaster/storage/HarbormasterDAO.php',
|
||||
'HarbormasterObject' => 'applications/harbormaster/storage/HarbormasterObject.php',
|
||||
'HarbormasterRunnerWorker' => 'applications/harbormaster/worker/HarbormasterRunnerWorker.php',
|
||||
|
@ -1051,6 +1053,8 @@ phutil_register_library_map(array(
|
|||
'PhabricatorFeedController' => 'applications/feed/controller/PhabricatorFeedController.php',
|
||||
'PhabricatorFeedDAO' => 'applications/feed/storage/PhabricatorFeedDAO.php',
|
||||
'PhabricatorFeedMainController' => 'applications/feed/controller/PhabricatorFeedMainController.php',
|
||||
'PhabricatorFeedManagementRepublishWorkflow' => 'applications/feed/management/PhabricatorFeedManagementRepublishWorkflow.php',
|
||||
'PhabricatorFeedManagementWorkflow' => 'applications/feed/management/PhabricatorFeedManagementWorkflow.php',
|
||||
'PhabricatorFeedPublicStreamController' => 'applications/feed/controller/PhabricatorFeedPublicStreamController.php',
|
||||
'PhabricatorFeedQuery' => 'applications/feed/PhabricatorFeedQuery.php',
|
||||
'PhabricatorFeedStory' => 'applications/feed/story/PhabricatorFeedStory.php',
|
||||
|
@ -2456,7 +2460,9 @@ phutil_register_library_map(array(
|
|||
'DrydockSSHCommandInterface' => 'DrydockCommandInterface',
|
||||
'DrydockWebrootInterface' => 'DrydockInterface',
|
||||
'DrydockWorkingCopyBlueprint' => 'DrydockBlueprint',
|
||||
'FeedPublisherWorker' => 'PhabricatorWorker',
|
||||
'FeedPublisherHTTPWorker' => 'FeedPushWorker',
|
||||
'FeedPublisherWorker' => 'FeedPushWorker',
|
||||
'FeedPushWorker' => 'PhabricatorWorker',
|
||||
'HarbormasterDAO' => 'PhabricatorLiskDAO',
|
||||
'HarbormasterObject' => 'HarbormasterDAO',
|
||||
'HarbormasterRunnerWorker' => 'PhabricatorWorker',
|
||||
|
@ -2929,6 +2935,8 @@ phutil_register_library_map(array(
|
|||
'PhabricatorFeedController' => 'PhabricatorController',
|
||||
'PhabricatorFeedDAO' => 'PhabricatorLiskDAO',
|
||||
'PhabricatorFeedMainController' => 'PhabricatorFeedController',
|
||||
'PhabricatorFeedManagementRepublishWorkflow' => 'PhabricatorFeedManagementWorkflow',
|
||||
'PhabricatorFeedManagementWorkflow' => 'PhutilArgumentWorkflow',
|
||||
'PhabricatorFeedPublicStreamController' => 'PhabricatorFeedController',
|
||||
'PhabricatorFeedQuery' => 'PhabricatorCursorPagedPolicyAwareQuery',
|
||||
'PhabricatorFeedStory' => 'PhabricatorPolicyInterface',
|
||||
|
|
|
@ -4,12 +4,18 @@ final class PhabricatorFeedQuery
|
|||
extends PhabricatorCursorPagedPolicyAwareQuery {
|
||||
|
||||
private $filterPHIDs;
|
||||
private $chronologicalKeys;
|
||||
|
||||
public function setFilterPHIDs(array $phids) {
|
||||
$this->filterPHIDs = $phids;
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function withChronologicalKeys(array $keys) {
|
||||
$this->chronologicalKeys = $keys;
|
||||
return $this;
|
||||
}
|
||||
|
||||
protected function loadPage() {
|
||||
|
||||
$story_table = new PhabricatorFeedStoryData();
|
||||
|
@ -54,6 +60,24 @@ final class PhabricatorFeedQuery
|
|||
$this->filterPHIDs);
|
||||
}
|
||||
|
||||
if ($this->chronologicalKeys) {
|
||||
// NOTE: We want to use integers in the query so we can take advantage
|
||||
// of keys, but can't use %d on 32-bit systems. Make sure all the keys
|
||||
// are integers and then format them raw.
|
||||
|
||||
$keys = $this->chronologicalKeys;
|
||||
foreach ($keys as $key) {
|
||||
if (!ctype_digit($key)) {
|
||||
throw new Exception("Key '{$key}' is not a valid chronological key!");
|
||||
}
|
||||
}
|
||||
|
||||
$where[] = qsprintf(
|
||||
$conn_r,
|
||||
'ref.chronologicalKey IN (%Q)',
|
||||
implode(', ', $keys));
|
||||
}
|
||||
|
||||
$where[] = $this->buildPagingClause($conn_r);
|
||||
|
||||
return $this->formatWhereClause($where);
|
||||
|
|
|
@ -105,12 +105,11 @@ final class PhabricatorFeedStoryPublisher {
|
|||
$this->sendNotification($chrono_key);
|
||||
}
|
||||
|
||||
$uris = PhabricatorEnv::getEnvConfig('feed.http-hooks');
|
||||
foreach ($uris as $uri) {
|
||||
$task = PhabricatorWorker::scheduleTask(
|
||||
'FeedPublisherWorker',
|
||||
array('chrono_key' => $chrono_key, 'uri' => $uri));
|
||||
}
|
||||
PhabricatorWorker::scheduleTask(
|
||||
'FeedPublisherWorker',
|
||||
array(
|
||||
'key' => $chrono_key,
|
||||
));
|
||||
|
||||
return $story;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
<?php
|
||||
|
||||
final class PhabricatorFeedManagementRepublishWorkflow
|
||||
extends PhabricatorFeedManagementWorkflow {
|
||||
|
||||
protected function didConstruct() {
|
||||
$this
|
||||
->setName('republish')
|
||||
->setExamples('**republish** __story_key__')
|
||||
->setSynopsis(
|
||||
pht(
|
||||
'Republish a feed event to all consumers.'))
|
||||
->setArguments(
|
||||
array(
|
||||
array(
|
||||
'name' => 'key',
|
||||
'wildcard' => true,
|
||||
),
|
||||
));
|
||||
}
|
||||
|
||||
public function execute(PhutilArgumentParser $args) {
|
||||
$console = PhutilConsole::getConsole();
|
||||
$viewer = PhabricatorUser::getOmnipotentUser();
|
||||
|
||||
$key = $args->getArg('key');
|
||||
if (count($key) < 1) {
|
||||
throw new PhutilArgumentUsageException(
|
||||
pht("Specify a story key to republish."));
|
||||
} else if (count($key) > 1) {
|
||||
throw new PhutilArgumentUsageException(
|
||||
pht("Specify exactly one story key to republish."));
|
||||
}
|
||||
$key = head($key);
|
||||
|
||||
$story = id(new PhabricatorFeedQuery())
|
||||
->setViewer($viewer)
|
||||
->withChronologicalKeys(array($key))
|
||||
->executeOne();
|
||||
|
||||
if (!$story) {
|
||||
throw new PhutilArgumentUsageException(
|
||||
pht('No story exists with key "%s"!', $key));
|
||||
}
|
||||
|
||||
$console->writeOut("%s\n", pht("Republishing story..."));
|
||||
|
||||
PhabricatorWorker::setRunAllTasksInProcess(true);
|
||||
|
||||
PhabricatorWorker::scheduleTask(
|
||||
'FeedPublisherWorker',
|
||||
array(
|
||||
'key' => $key,
|
||||
));
|
||||
|
||||
$console->writeOut("%s\n", pht("Done."));
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
<?php
|
||||
|
||||
abstract class PhabricatorFeedManagementWorkflow
|
||||
extends PhutilArgumentWorkflow {
|
||||
|
||||
final public function isExecutable() {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
29
src/applications/feed/worker/FeedPublisherHTTPWorker.php
Normal file
29
src/applications/feed/worker/FeedPublisherHTTPWorker.php
Normal file
|
@ -0,0 +1,29 @@
|
|||
<?php
|
||||
|
||||
final class FeedPublisherHTTPWorker extends FeedPushWorker {
|
||||
|
||||
protected function doWork() {
|
||||
$story = $this->loadFeedStory();
|
||||
$data = $story->getStoryData();
|
||||
|
||||
$uri = idx($this->getTaskData(), 'uri');
|
||||
|
||||
$post_data = array(
|
||||
'storyID' => $data->getID(),
|
||||
'storyType' => $data->getStoryType(),
|
||||
'storyData' => $data->getStoryData(),
|
||||
'storyAuthorPHID' => $data->getAuthorPHID(),
|
||||
'epoch' => $data->getEpoch(),
|
||||
);
|
||||
|
||||
id(new HTTPFuture($uri, $post_data))
|
||||
->setMethod('POST')
|
||||
->setTimeout(30)
|
||||
->resolvex();
|
||||
}
|
||||
|
||||
public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
|
||||
return max($task->getFailureCount(), 1) * 60;
|
||||
}
|
||||
|
||||
}
|
|
@ -1,38 +1,21 @@
|
|||
<?php
|
||||
|
||||
final class FeedPublisherWorker extends PhabricatorWorker {
|
||||
final class FeedPublisherWorker extends FeedPushWorker {
|
||||
|
||||
protected function doWork() {
|
||||
$task_data = $this->getTaskData();
|
||||
$chrono_key = $task_data['chrono_key'];
|
||||
$uri = $task_data['uri'];
|
||||
$story = $this->loadFeedStory();
|
||||
|
||||
$story = id(new PhabricatorFeedStoryData())
|
||||
->loadOneWhere('chronologicalKey = %s', $chrono_key);
|
||||
|
||||
if (!$story) {
|
||||
throw new PhabricatorWorkerPermanentFailureException(
|
||||
'Feed story was deleted.'
|
||||
);
|
||||
$uris = PhabricatorEnv::getEnvConfig('feed.http-hooks');
|
||||
foreach ($uris as $uri) {
|
||||
PhabricatorWorker::scheduleTask(
|
||||
'FeedPublisherHTTPWorker',
|
||||
array(
|
||||
'key' => $story->getChronologicalKey(),
|
||||
'uri' => $uri,
|
||||
));
|
||||
}
|
||||
|
||||
$data = array(
|
||||
'storyID' => $story->getID(),
|
||||
'storyType' => $story->getStoryType(),
|
||||
'storyData' => $story->getStoryData(),
|
||||
'storyAuthorPHID' => $story->getAuthorPHID(),
|
||||
'epoch' => $story->getEpoch(),
|
||||
);
|
||||
|
||||
id(new HTTPFuture($uri, $data))
|
||||
->setMethod('POST')
|
||||
->setTimeout(30)
|
||||
->resolvex();
|
||||
|
||||
}
|
||||
|
||||
public function getWaitBeforeRetry(PhabricatorWorkerTask $task) {
|
||||
return max($task->getFailureCount(), 1) * 60;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
22
src/applications/feed/worker/FeedPushWorker.php
Normal file
22
src/applications/feed/worker/FeedPushWorker.php
Normal file
|
@ -0,0 +1,22 @@
|
|||
<?php
|
||||
|
||||
abstract class FeedPushWorker extends PhabricatorWorker {
|
||||
|
||||
protected function loadFeedStory() {
|
||||
$task_data = $this->getTaskData();
|
||||
$key = $task_data['key'];
|
||||
|
||||
$story = id(new PhabricatorFeedQuery())
|
||||
->setViewer(PhabricatorUser::getOmnipotentUser())
|
||||
->withChronologicalKeys(array($key))
|
||||
->executeOne();
|
||||
|
||||
if (!$story) {
|
||||
throw new PhabricatorWorkerPermanentFailureException(
|
||||
'Feed story does not exist..');
|
||||
}
|
||||
|
||||
return $story;
|
||||
}
|
||||
|
||||
}
|
|
@ -8,6 +8,7 @@
|
|||
abstract class PhabricatorWorker {
|
||||
|
||||
private $data;
|
||||
private static $runAllTasksInProcess = false;
|
||||
|
||||
|
||||
/* -( Configuring Retries and Failures )----------------------------------- */
|
||||
|
@ -85,10 +86,15 @@ abstract class PhabricatorWorker {
|
|||
}
|
||||
|
||||
final public static function scheduleTask($task_class, $data) {
|
||||
return id(new PhabricatorWorkerActiveTask())
|
||||
->setTaskClass($task_class)
|
||||
->setData($data)
|
||||
->save();
|
||||
if (self::$runAllTasksInProcess) {
|
||||
$worker = newv($task_class, array($data));
|
||||
$worker->doWork();
|
||||
} else {
|
||||
return id(new PhabricatorWorkerActiveTask())
|
||||
->setTaskClass($task_class)
|
||||
->setData($data)
|
||||
->save();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -154,4 +160,13 @@ abstract class PhabricatorWorker {
|
|||
return phutil_tag('pre', array(), $data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set this flag to execute scheduled tasks synchronously, in the same
|
||||
* process. This is useful for debugging, and otherwise dramatically worse
|
||||
* in every way imaginable.
|
||||
*/
|
||||
public static function setRunAllTasksInProcess($all) {
|
||||
self::$runAllTasksInProcess = $all;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue