engines = $engines; return $this; } public function processIterator($iterator) { $result = null; $raw_facts = array(); foreach ($iterator as $key => $object) { $raw_facts[$object->getPHID()] = $this->computeRawFacts($object); if (count($raw_facts) > self::RAW_FACT_BUFFER_LIMIT) { $this->updateRawFacts($raw_facts); $raw_facts = array(); } $result = $key; } if ($raw_facts) { $this->updateRawFacts($raw_facts); $raw_facts = array(); } return $result; } private function computeRawFacts(PhabricatorLiskDAO $object) { $facts = array(); foreach ($this->engines as $engine) { if (!$engine->shouldComputeRawFactsForObject($object)) { continue; } $facts[] = $engine->computeRawFactsForObject($object); } return array_mergev($facts); } private function updateRawFacts(array $map) { foreach ($map as $phid => $facts) { assert_instances_of($facts, 'PhabricatorFactRaw'); } $phids = array_keys($map); if (!$phids) { return; } $table = new PhabricatorFactRaw(); $conn = $table->establishConnection('w'); $table_name = $table->getTableName(); $sql = array(); foreach ($map as $phid => $facts) { foreach ($facts as $fact) { $sql[] = qsprintf( $conn, '(%s, %s, %s, %d, %d, %d)', $fact->getFactType(), $fact->getObjectPHID(), $fact->getObjectA(), $fact->getValueX(), $fact->getValueY(), $fact->getEpoch()); } } $table->openTransaction(); queryfx( $conn, 'DELETE FROM %T WHERE objectPHID IN (%Ls)', $table_name, $phids); if ($sql) { foreach (array_chunk($sql, 256) as $chunk) { queryfx( $conn, 'INSERT INTO %T (factType, objectPHID, objectA, valueX, valueY, epoch) VALUES %Q', $table_name, implode(', ', $chunk)); } } $table->saveTransaction(); } }