1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-11-28 17:52:43 +01:00
phorge-phorge/src/applications/feed/PhabricatorFeedStoryPublisher.php
epriestley 42c0f060d5 Push feed publishing deeper into the task queue
Summary:
Ref T2852. I want to model Asana integration as a response to feed events. Currently, we queue one feed event for each HTTP hook.

Instead, always queue one feed event and then have it queue any necessary followup events (now, http hooks; soon, asana).

Add a script to make it easy to reproducibly fire feed event publishing.

Test Plan:
Republished a feed event and verified it hit configured HTTP hooks correctly.

  $ ./bin/feed republish 5765774156541908292 --trace
  >>> [2] <connect> phabricator2_feed
  <<< [2] <connect> 1,660 us
  >>> [3] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC
  <<< [3] <query> 595 us
  >>> [4] <connect> phabricator2_differential
  <<< [4] <connect> 760 us
  >>> [5] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [5] <query> 478 us
  >>> [6] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [6] <query> 449 us
  >>> [7] <connect> phabricator2_user
  <<< [7] <connect> 1,062 us
  >>> [8] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov')
  <<< [8] <query> 540 us
  >>> [9] <connect> phabricator2_file
  <<< [9] <connect> 951 us
  >>> [10] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7')
  <<< [10] <query> 498 us
  >>> [11] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo
  <<< [11] <query> 507 us
  Republishing story...
  >>> [12] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC
  <<< [12] <query> 685 us
  >>> [13] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [13] <query> 489 us
  >>> [14] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [14] <query> 512 us
  >>> [15] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov')
  <<< [15] <query> 601 us
  >>> [16] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7')
  <<< [16] <query> 405 us
  >>> [17] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo
  <<< [17] <query> 551 us
  >>> [18] <query> SELECT story.* FROM `feed_storydata` story JOIN `feed_storyreference` ref ON ref.chronologicalKey = story.chronologicalKey WHERE (ref.chronologicalKey IN (5765774156541908292)) GROUP BY story.chronologicalKey ORDER BY story.chronologicalKey DESC
  <<< [18] <query> 507 us
  >>> [19] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [19] <query> 428 us
  >>> [20] <query> SELECT * FROM `differential_revision` WHERE phid IN ('PHID-DREV-ywqmrj5zgkdloqh5p3c5')
  <<< [20] <query> 419 us
  >>> [21] <query> SELECT * FROM `user` WHERE phid in ('PHID-USER-lqiz3yd7wmk64ejugvov')
  <<< [21] <query> 591 us
  >>> [22] <query> SELECT * FROM `file` WHERE phid IN ('PHID-FILE-gq6dlsysvxbn3dgwvky7')
  <<< [22] <query> 406 us
  >>> [23] <query> SELECT * FROM `user_status` WHERE userPHID IN ('PHID-USER-lqiz3yd7wmk64ejugvov') AND UNIX_TIMESTAMP() BETWEEN dateFrom AND dateTo
  <<< [23] <query> 593 us
  >>> [24] <http> http://127.0.0.1/derp/
  <<< [24] <http> 746,157 us
  [2013-06-24 20:23:26] EXCEPTION: (HTTPFutureResponseStatusHTTP) [HTTP/500] Internal Server Error

Reviewers: btrahan

Reviewed By: btrahan

CC: aran

Maniphest Tasks: T2852

Differential Revision: https://secure.phabricator.com/D6291
2013-06-25 16:29:47 -07:00

218 lines
6 KiB
PHP

<?php
final class PhabricatorFeedStoryPublisher {
private $relatedPHIDs;
private $storyType;
private $storyData;
private $storyTime;
private $storyAuthorPHID;
private $primaryObjectPHID;
private $subscribedPHIDs = array();
private $mailRecipientPHIDs = array();
public function setRelatedPHIDs(array $phids) {
$this->relatedPHIDs = $phids;
return $this;
}
public function setSubscribedPHIDs(array $phids) {
$this->subscribedPHIDs = $phids;
return $this;
}
public function setPrimaryObjectPHID($phid) {
$this->primaryObjectPHID = $phid;
return $this;
}
public function setStoryType($story_type) {
$this->storyType = $story_type;
return $this;
}
public function setStoryData(array $data) {
$this->storyData = $data;
return $this;
}
public function setStoryTime($time) {
$this->storyTime = $time;
return $this;
}
public function setStoryAuthorPHID($phid) {
$this->storyAuthorPHID = $phid;
return $this;
}
public function setMailRecipientPHIDs(array $phids) {
$this->mailRecipientPHIDs = $phids;
return $this;
}
public function publish() {
$class = $this->storyType;
if (!$class) {
throw new Exception("Call setStoryType() before publishing!");
}
if (!class_exists($class)) {
throw new Exception(
"Story type must be a valid class name and must subclass ".
"PhabricatorFeedStory. ".
"'{$class}' is not a loadable class.");
}
if (!is_subclass_of($class, 'PhabricatorFeedStory')) {
throw new Exception(
"Story type must be a valid class name and must subclass ".
"PhabricatorFeedStory. ".
"'{$class}' is not a subclass of PhabricatorFeedStory.");
}
$chrono_key = $this->generateChronologicalKey();
$story = new PhabricatorFeedStoryData();
$story->setStoryType($this->storyType);
$story->setStoryData($this->storyData);
$story->setAuthorPHID((string)$this->storyAuthorPHID);
$story->setChronologicalKey($chrono_key);
$story->save();
if ($this->relatedPHIDs) {
$ref = new PhabricatorFeedStoryReference();
$sql = array();
$conn = $ref->establishConnection('w');
foreach (array_unique($this->relatedPHIDs) as $phid) {
$sql[] = qsprintf(
$conn,
'(%s, %s)',
$phid,
$chrono_key);
}
queryfx(
$conn,
'INSERT INTO %T (objectPHID, chronologicalKey) VALUES %Q',
$ref->getTableName(),
implode(', ', $sql));
}
$this->insertNotifications($chrono_key);
if (PhabricatorEnv::getEnvConfig('notification.enabled')) {
$this->sendNotification($chrono_key);
}
PhabricatorWorker::scheduleTask(
'FeedPublisherWorker',
array(
'key' => $chrono_key,
));
return $story;
}
private function insertNotifications($chrono_key) {
$subscribed_phids = $this->subscribedPHIDs;
$subscribed_phids = array_diff(
$subscribed_phids,
array($this->storyAuthorPHID));
if (!$subscribed_phids) {
return;
}
if (!$this->primaryObjectPHID) {
throw new Exception(
"You must call setPrimaryObjectPHID() if you setSubscribedPHIDs()!");
}
$notif = new PhabricatorFeedStoryNotification();
$sql = array();
$conn = $notif->establishConnection('w');
$will_receive_mail = array_fill_keys($this->mailRecipientPHIDs, true);
foreach (array_unique($subscribed_phids) as $user_phid) {
if (isset($will_receive_mail[$user_phid])) {
$mark_read = 1;
} else {
$mark_read = 0;
}
$sql[] = qsprintf(
$conn,
'(%s, %s, %s, %d)',
$this->primaryObjectPHID,
$user_phid,
$chrono_key,
$mark_read);
}
queryfx(
$conn,
'INSERT INTO %T
(primaryObjectPHID, userPHID, chronologicalKey, hasViewed)
VALUES %Q',
$notif->getTableName(),
implode(', ', $sql));
}
private function sendNotification($chrono_key) {
$server_uri = PhabricatorEnv::getEnvConfig('notification.server-uri');
$data = array(
'key' => (string)$chrono_key,
);
id(new HTTPSFuture($server_uri, $data))
->setMethod('POST')
->setTimeout(1)
->resolve();
}
/**
* We generate a unique chronological key for each story type because we want
* to be able to page through the stream with a cursor (i.e., select stories
* after ID = X) so we can efficiently perform filtering after selecting data,
* and multiple stories with the same ID make this cumbersome without putting
* a bunch of logic in the client. We could use the primary key, but that
* would prevent publishing stories which happened in the past. Since it's
* potentially useful to do that (e.g., if you're importing another data
* source) build a unique key for each story which has chronological ordering.
*
* @return string A unique, time-ordered key which identifies the story.
*/
private function generateChronologicalKey() {
// Use the epoch timestamp for the upper 32 bits of the key. Default to
// the current time if the story doesn't have an explicit timestamp.
$time = nonempty($this->storyTime, time());
// Generate a random number for the lower 32 bits of the key.
$rand = head(unpack('L', Filesystem::readRandomBytes(4)));
// On 32-bit machines, we have to get creative.
if (PHP_INT_SIZE < 8) {
// We're on a 32-bit machine.
if (function_exists('bcadd')) {
// Try to use the 'bc' extension.
return bcadd(bcmul($time, bcpow(2, 32)), $rand);
} else {
// Do the math in MySQL. TODO: If we formalize a bc dependency, get
// rid of this.
$conn_r = id(new PhabricatorFeedStoryData())->establishConnection('r');
$result = queryfx_one(
$conn_r,
'SELECT (%d << 32) + %d as N',
$time,
$rand);
return $result['N'];
}
} else {
// This is a 64 bit machine, so we can just do the math.
return ($time << 32) + $rand;
}
}
}