mirror of
https://we.phorge.it/source/phorge.git
synced 2024-11-22 06:42:42 +01:00
When updating a Ferret search index document, reuse existing rows where possible
Summary: Ref T13587. Currently, when a document is reindexed by Ferret, the old document is completely discarded and a new version is inserted to replace it. This approach is simple to implement, but can lead to exhaustion of the ngram AUTO_INCREMENT id column in reasonable circumstances. Conceptually, this approach "should" be fine and this exhaustion is an awkard implementation detail. However, since it's easy to be less wasteful when performing document updates and all the other approaches are awkward or leaky in other ways that are probably worse, use a more complex implementation to avoid executing unnecessary INSERT statements. Test Plan: - Created and indexed a new document, searched for it. - Updated a document, indexed it with `bin/search index ... --force --trace`, saw only modifications updated in the index. - Searched for newly added terms (got hits) and removed terms (no longer got hits) to verify add/delete index behavior. Maniphest Tasks: T13587 Differential Revision: https://secure.phabricator.com/D21495
This commit is contained in:
parent
5d6dddc5eb
commit
4f647fb6be
1 changed files with 261 additions and 87 deletions
|
@ -134,123 +134,297 @@ final class PhabricatorFerretFulltextEngineExtension
|
|||
$ngram_engine = new PhabricatorSearchNgramEngine();
|
||||
$ngrams = $ngram_engine->getTermNgramsFromString($ngrams_source);
|
||||
|
||||
$conn = $object->establishConnection('w');
|
||||
|
||||
if ($ngrams) {
|
||||
$common = queryfx_all(
|
||||
$conn,
|
||||
'SELECT ngram FROM %T WHERE ngram IN (%Ls)',
|
||||
$engine->getCommonNgramsTableName(),
|
||||
$ngrams);
|
||||
$common = ipull($common, 'ngram', 'ngram');
|
||||
|
||||
foreach ($ngrams as $key => $ngram) {
|
||||
if (isset($common[$ngram])) {
|
||||
unset($ngrams[$key]);
|
||||
continue;
|
||||
}
|
||||
|
||||
// NOTE: MySQL discards trailing whitespace in CHAR(X) columns.
|
||||
$trimmed_ngram = rtrim($ngram, ' ');
|
||||
if (isset($common[$trimmed_ngram])) {
|
||||
unset($ngrams[$key]);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$object->openTransaction();
|
||||
|
||||
try {
|
||||
$conn = $object->establishConnection('w');
|
||||
$this->deleteOldDocument($engine, $object, $document);
|
||||
// See T13587. If this document already exists in the index, we try to
|
||||
// update the existing rows to avoid leaving the ngrams table heavily
|
||||
// fragmented.
|
||||
|
||||
queryfx(
|
||||
$old_document = queryfx_one(
|
||||
$conn,
|
||||
'INSERT INTO %T (objectPHID, isClosed, epochCreated, epochModified,
|
||||
authorPHID, ownerPHID) VALUES (%s, %d, %d, %d, %ns, %ns)',
|
||||
'SELECT id FROM %T WHERE objectPHID = %s',
|
||||
$engine->getDocumentTableName(),
|
||||
$object->getPHID(),
|
||||
$is_closed,
|
||||
$document->getDocumentCreated(),
|
||||
$document->getDocumentModified(),
|
||||
$author_phid,
|
||||
$owner_phid);
|
||||
$object->getPHID());
|
||||
if ($old_document) {
|
||||
$old_document_id = (int)$old_document['id'];
|
||||
} else {
|
||||
$old_document_id = null;
|
||||
}
|
||||
|
||||
$document_id = $conn->getInsertID();
|
||||
foreach ($ferret_fields as $ferret_field) {
|
||||
if ($old_document_id === null) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'INSERT INTO %T (documentID, fieldKey, rawCorpus, termCorpus,
|
||||
normalCorpus) VALUES (%d, %s, %s, %s, %s)',
|
||||
$engine->getFieldTableName(),
|
||||
$document_id,
|
||||
$ferret_field['fieldKey'],
|
||||
$ferret_field['rawCorpus'],
|
||||
$ferret_field['termCorpus'],
|
||||
$ferret_field['normalCorpus']);
|
||||
}
|
||||
'INSERT INTO %T (objectPHID, isClosed, epochCreated, epochModified,
|
||||
authorPHID, ownerPHID) VALUES (%s, %d, %d, %d, %ns, %ns)',
|
||||
$engine->getDocumentTableName(),
|
||||
$object->getPHID(),
|
||||
$is_closed,
|
||||
$document->getDocumentCreated(),
|
||||
$document->getDocumentModified(),
|
||||
$author_phid,
|
||||
$owner_phid);
|
||||
$document_id = $conn->getInsertID();
|
||||
|
||||
if ($ngrams) {
|
||||
$common = queryfx_all(
|
||||
$is_new = true;
|
||||
} else {
|
||||
$document_id = $old_document_id;
|
||||
queryfx(
|
||||
$conn,
|
||||
'SELECT ngram FROM %T WHERE ngram IN (%Ls)',
|
||||
$engine->getCommonNgramsTableName(),
|
||||
$ngrams);
|
||||
$common = ipull($common, 'ngram', 'ngram');
|
||||
'UPDATE %T
|
||||
SET
|
||||
isClosed = %d,
|
||||
epochCreated = %d,
|
||||
epochModified = %d,
|
||||
authorPHID = %ns,
|
||||
ownerPHID = %ns
|
||||
WHERE id = %d',
|
||||
$engine->getDocumentTableName(),
|
||||
$is_closed,
|
||||
$document->getDocumentCreated(),
|
||||
$document->getDocumentModified(),
|
||||
$author_phid,
|
||||
$owner_phid,
|
||||
$document_id);
|
||||
|
||||
foreach ($ngrams as $key => $ngram) {
|
||||
if (isset($common[$ngram])) {
|
||||
unset($ngrams[$key]);
|
||||
continue;
|
||||
}
|
||||
|
||||
// NOTE: MySQL discards trailing whitespace in CHAR(X) columns.
|
||||
$trim_ngram = rtrim($ngram, ' ');
|
||||
if (isset($common[$ngram])) {
|
||||
unset($ngrams[$key]);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
$is_new = false;
|
||||
}
|
||||
|
||||
if ($ngrams) {
|
||||
$sql = array();
|
||||
foreach ($ngrams as $ngram) {
|
||||
$sql[] = qsprintf(
|
||||
$conn,
|
||||
'(%d, %s)',
|
||||
$document_id,
|
||||
$ngram);
|
||||
}
|
||||
$this->updateStoredFields(
|
||||
$conn,
|
||||
$is_new,
|
||||
$document_id,
|
||||
$engine,
|
||||
$ferret_fields);
|
||||
|
||||
$this->updateStoredNgrams(
|
||||
$conn,
|
||||
$is_new,
|
||||
$document_id,
|
||||
$engine,
|
||||
$ngrams);
|
||||
|
||||
foreach (PhabricatorLiskDAO::chunkSQL($sql) as $chunk) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'INSERT INTO %T (documentID, ngram) VALUES %LQ',
|
||||
$engine->getNgramsTableName(),
|
||||
$chunk);
|
||||
}
|
||||
}
|
||||
} catch (Exception $ex) {
|
||||
$object->killTransaction();
|
||||
throw $ex;
|
||||
} catch (Throwable $ex) {
|
||||
$object->killTransaction();
|
||||
throw $ex;
|
||||
}
|
||||
|
||||
$object->saveTransaction();
|
||||
}
|
||||
|
||||
|
||||
private function deleteOldDocument(
|
||||
private function updateStoredFields(
|
||||
AphrontDatabaseConnection $conn,
|
||||
$is_new,
|
||||
$document_id,
|
||||
PhabricatorFerretEngine $engine,
|
||||
$object,
|
||||
PhabricatorSearchAbstractDocument $document) {
|
||||
$new_fields) {
|
||||
|
||||
$conn = $object->establishConnection('w');
|
||||
|
||||
$old_document = queryfx_one(
|
||||
$conn,
|
||||
'SELECT * FROM %T WHERE objectPHID = %s',
|
||||
$engine->getDocumentTableName(),
|
||||
$object->getPHID());
|
||||
if (!$old_document) {
|
||||
return;
|
||||
if (!$is_new) {
|
||||
$old_fields = queryfx_all(
|
||||
$conn,
|
||||
'SELECT * FROM %T WHERE documentID = %d',
|
||||
$engine->getFieldTableName(),
|
||||
$document_id);
|
||||
} else {
|
||||
$old_fields = array();
|
||||
}
|
||||
|
||||
$old_id = $old_document['id'];
|
||||
$old_fields = ipull($old_fields, null, 'fieldKey');
|
||||
$new_fields = ipull($new_fields, null, 'fieldKey');
|
||||
|
||||
queryfx(
|
||||
$conn,
|
||||
'DELETE FROM %T WHERE id = %d',
|
||||
$engine->getDocumentTableName(),
|
||||
$old_id);
|
||||
$delete_rows = array();
|
||||
$insert_rows = array();
|
||||
$update_rows = array();
|
||||
|
||||
queryfx(
|
||||
$conn,
|
||||
'DELETE FROM %T WHERE documentID = %d',
|
||||
$engine->getFieldTableName(),
|
||||
$old_id);
|
||||
foreach ($old_fields as $field_key => $old_field) {
|
||||
if (!isset($new_fields[$field_key])) {
|
||||
$delete_rows[] = $old_field;
|
||||
}
|
||||
}
|
||||
|
||||
queryfx(
|
||||
$conn,
|
||||
'DELETE FROM %T WHERE documentID = %d',
|
||||
$engine->getNgramsTableName(),
|
||||
$old_id);
|
||||
$compare_keys = array(
|
||||
'rawCorpus',
|
||||
'termCorpus',
|
||||
'normalCorpus',
|
||||
);
|
||||
|
||||
foreach ($new_fields as $field_key => $new_field) {
|
||||
if (!isset($old_fields[$field_key])) {
|
||||
$insert_rows[] = $new_field;
|
||||
continue;
|
||||
}
|
||||
|
||||
$old_field = $old_fields[$field_key];
|
||||
|
||||
$same_row = true;
|
||||
foreach ($compare_keys as $compare_key) {
|
||||
if ($old_field[$compare_key] !== $new_field[$compare_key]) {
|
||||
$same_row = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if ($same_row) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$new_field['id'] = $old_field['id'];
|
||||
$update_rows[] = $new_field;
|
||||
}
|
||||
|
||||
if ($delete_rows) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'DELETE FROM %T WHERE id IN (%Ld)',
|
||||
$engine->getFieldTableName(),
|
||||
ipull($delete_rows, 'id'));
|
||||
}
|
||||
|
||||
foreach ($update_rows as $update_row) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'UPDATE %T
|
||||
SET
|
||||
rawCorpus = %s,
|
||||
termCorpus = %s,
|
||||
normalCorpus = %s
|
||||
WHERE id = %d',
|
||||
$engine->getFieldTableName(),
|
||||
$update_row['rawCorpus'],
|
||||
$update_row['termCorpus'],
|
||||
$update_row['normalCorpus'],
|
||||
$update_row['id']);
|
||||
}
|
||||
|
||||
foreach ($insert_rows as $insert_row) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'INSERT INTO %T (documentID, fieldKey, rawCorpus, termCorpus,
|
||||
normalCorpus) VALUES (%d, %s, %s, %s, %s)',
|
||||
$engine->getFieldTableName(),
|
||||
$document_id,
|
||||
$insert_row['fieldKey'],
|
||||
$insert_row['rawCorpus'],
|
||||
$insert_row['termCorpus'],
|
||||
$insert_row['normalCorpus']);
|
||||
}
|
||||
}
|
||||
|
||||
private function updateStoredNgrams(
|
||||
AphrontDatabaseConnection $conn,
|
||||
$is_new,
|
||||
$document_id,
|
||||
PhabricatorFerretEngine $engine,
|
||||
$new_ngrams) {
|
||||
|
||||
if ($is_new) {
|
||||
$old_ngrams = array();
|
||||
} else {
|
||||
$old_ngrams = queryfx_all(
|
||||
$conn,
|
||||
'SELECT id, ngram FROM %T WHERE documentID = %d',
|
||||
$engine->getNgramsTableName(),
|
||||
$document_id);
|
||||
}
|
||||
|
||||
$old_ngrams = ipull($old_ngrams, 'id', 'ngram');
|
||||
$new_ngrams = array_fuse($new_ngrams);
|
||||
|
||||
$delete_ids = array();
|
||||
$insert_ngrams = array();
|
||||
|
||||
// NOTE: MySQL discards trailing whitespace in CHAR(X) columns.
|
||||
|
||||
foreach ($old_ngrams as $ngram => $id) {
|
||||
if (isset($new_ngrams[$ngram])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$untrimmed_ngram = $ngram.' ';
|
||||
if (isset($new_ngrams[$untrimmed_ngram])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$delete_ids[] = $id;
|
||||
}
|
||||
|
||||
foreach ($new_ngrams as $ngram) {
|
||||
if (isset($old_ngrams[$ngram])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$trimmed_ngram = rtrim($ngram, ' ');
|
||||
if (isset($old_ngrams[$trimmed_ngram])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$insert_ngrams[] = $ngram;
|
||||
}
|
||||
|
||||
if ($delete_ids) {
|
||||
$sql = array();
|
||||
foreach ($delete_ids as $id) {
|
||||
$sql[] = qsprintf(
|
||||
$conn,
|
||||
'%d',
|
||||
$id);
|
||||
}
|
||||
|
||||
foreach (PhabricatorLiskDAO::chunkSQL($sql) as $chunk) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'DELETE FROM %T WHERE id IN (%LQ)',
|
||||
$engine->getNgramsTableName(),
|
||||
$chunk);
|
||||
}
|
||||
}
|
||||
|
||||
if ($insert_ngrams) {
|
||||
$sql = array();
|
||||
foreach ($insert_ngrams as $ngram) {
|
||||
$sql[] = qsprintf(
|
||||
$conn,
|
||||
'(%d, %s)',
|
||||
$document_id,
|
||||
$ngram);
|
||||
}
|
||||
|
||||
foreach (PhabricatorLiskDAO::chunkSQL($sql) as $chunk) {
|
||||
queryfx(
|
||||
$conn,
|
||||
'INSERT INTO %T (documentID, ngram) VALUES %LQ',
|
||||
$engine->getNgramsTableName(),
|
||||
$chunk);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function newFerretSearchFunctions() {
|
||||
|
|
Loading…
Reference in a new issue