1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-12-19 03:50:54 +01:00
phorge-phorge/src/applications/search/index/PhabricatorIndexEngine.php
epriestley a761f73384 Allow index extensions to skip indexing if the object has not changed
Summary:
Fixes T9890. This allows IndexExtensions to emit an object version.

Before we build indexes, we check if the indexed version is the same as the current version. If it is, we just don't call that extension.

T9890 has a case where this is useful: a script went crazy and posted thousands of comments to a single task.

Without versioning, that results in the same comments being indexed over and over again. With versioning, most of the queue could just exit without doing any work.

Test Plan:
  - Added a `sleep(1)` to the actual indexing, used `bin/search index --background` to queue up a lot of tasks, ran them with `bin/phd debug task`, saw them complete very quickly with only one actual index operation performed.
  - Used `bin/search index --trace` and `bin/search index --trace --background` to observe the behavior of queries against the index version store, which looked sensible.
  - Made comments/transactions, saw versions update.
  - Used `bin/remove destroy`, verified index versions were purged.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T9890

Differential Revision: https://secure.phabricator.com/D14845
2015-12-21 17:27:14 -08:00

150 lines
3.5 KiB
PHP

<?php
final class PhabricatorIndexEngine extends Phobject {
private $object;
private $extensions;
private $versions;
private $parameters;
public function setParameters(array $parameters) {
$this->parameters = $parameters;
return $this;
}
public function getParameters() {
return $this->parameters;
}
public function setObject($object) {
$this->object = $object;
return $this;
}
public function getObject() {
return $this->object;
}
public function shouldIndexObject() {
$extensions = $this->newExtensions();
$parameters = $this->getParameters();
foreach ($extensions as $extension) {
$extension->setParameters($parameters);
}
$object = $this->getObject();
$versions = array();
foreach ($extensions as $key => $extension) {
$version = $extension->getIndexVersion($object);
if ($version !== null) {
$versions[$key] = (string)$version;
}
}
if (idx($parameters, 'force')) {
$current_versions = array();
} else {
$keys = array_keys($versions);
$current_versions = $this->loadIndexVersions($keys);
}
foreach ($versions as $key => $version) {
$current_version = idx($current_versions, $key);
if ($current_version === null) {
continue;
}
// If nothing has changed since we built the current index, we do not
// need to rebuild the index.
if ($current_version === $version) {
unset($extensions[$key]);
}
}
$this->extensions = $extensions;
$this->versions = $versions;
// We should index the object only if there is any work to be done.
return (bool)$this->extensions;
}
public function indexObject() {
$extensions = $this->extensions;
$object = $this->getObject();
foreach ($extensions as $key => $extension) {
$extension->indexObject($this, $object);
}
$this->saveIndexVersions($this->versions);
return $this;
}
private function newExtensions() {
$object = $this->getObject();
$extensions = PhabricatorIndexEngineExtension::getAllExtensions();
foreach ($extensions as $key => $extension) {
if (!$extension->shouldIndexObject($object)) {
unset($extensions[$key]);
}
}
return $extensions;
}
private function loadIndexVersions(array $extension_keys) {
if (!$extension_keys) {
return array();
}
$object = $this->getObject();
$object_phid = $object->getPHID();
$table = new PhabricatorSearchIndexVersion();
$conn_r = $table->establishConnection('w');
$rows = queryfx_all(
$conn_r,
'SELECT * FROM %T WHERE objectPHID = %s AND extensionKey IN (%Ls)',
$table->getTableName(),
$object_phid,
$extension_keys);
return ipull($rows, 'version', 'extensionKey');
}
private function saveIndexVersions(array $versions) {
if (!$versions) {
return;
}
$object = $this->getObject();
$object_phid = $object->getPHID();
$table = new PhabricatorSearchIndexVersion();
$conn_w = $table->establishConnection('w');
$sql = array();
foreach ($versions as $key => $version) {
$sql[] = qsprintf(
$conn_w,
'(%s, %s, %s)',
$object_phid,
$key,
$version);
}
queryfx(
$conn_w,
'INSERT INTO %T (objectPHID, extensionKey, version)
VALUES %Q
ON DUPLICATE KEY UPDATE version = VALUES(version)',
$table->getTableName(),
implode(', ', $sql));
}
}