diff --git a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php index 8cbf4eaf8b..d4996e3e35 100644 --- a/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php +++ b/src/infrastructure/daemon/workers/query/PhabricatorWorkerLeaseQuery.php @@ -48,21 +48,43 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery { $leased = 0; foreach ($phases as $phase) { - queryfx( + + // NOTE: If we issue `UPDATE ... WHERE ... ORDER BY id ASC`, the query + // goes very, very slowly. The `ORDER BY` triggers this, although we get + // the same apparent results without it. Without the ORDER BY, binary + // read slaves complain that the query isn't repeatable. To avoid both + // problems, do a SELECT and then an UPDATE. + + $rows = queryfx_all( $conn_w, - 'UPDATE %T task - SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d - %Q %Q %Q', + 'SELECT id, leaseOwner FROM %T %Q %Q %Q', $task_table->getTableName(), - $lease_ownership_name, - self::DEFAULT_LEASE_DURATION, $this->buildWhereClause($conn_w, $phase), $this->buildOrderClause($conn_w), $this->buildLimitClause($conn_w, $limit - $leased)); - $leased += $conn_w->getAffectedRows(); - if ($leased == $limit) { - break; + // NOTE: Sometimes, we'll race with another worker and they'll grab + // this task before we do. We could reduce how often this happens by + // selecting more tasks than we need, then shuffling them and trying + // to lock only the number we're actually after. However, the amount + // of time workers spend here should be very small relative to their + // total runtime, so keep it simple for the moment. + + if ($rows) { + queryfx( + $conn_w, + 'UPDATE %T task + SET leaseOwner = %s, leaseExpires = UNIX_TIMESTAMP() + %d + %Q', + $task_table->getTableName(), + $lease_ownership_name, + self::DEFAULT_LEASE_DURATION, + $this->buildUpdateWhereClause($conn_w, $phase, $rows)); + + $leased += $conn_w->getAffectedRows(); + if ($leased == $limit) { + break; + } } } @@ -124,6 +146,48 @@ final class PhabricatorWorkerLeaseQuery extends PhabricatorQuery { return $this->formatWhereClause($where); } + private function buildUpdateWhereClause( + AphrontDatabaseConnection $conn_w, + $phase, + array $rows) { + + $where = array(); + + // NOTE: This is basically working around the MySQL behavior that + // `IN (NULL)` doesn't match NULL. + + switch ($phase) { + case self::PHASE_UNLEASED: + $where[] = qsprintf( + $conn_w, + 'leaseOwner IS NULL'); + $where[] = qsprintf( + $conn_w, + 'id IN (%Ld)', + ipull($rows, 'id')); + break; + case self::PHASE_EXPIRED: + $in = array(); + foreach ($rows as $row) { + $in[] = qsprintf( + $conn_w, + '(%d, %s)', + $row['id'], + $row['leaseOwner']); + } + $where[] = qsprintf( + $conn_w, + '(id, leaseOwner) IN (%Q)', + '('.implode(', ', $in).')'); + break; + default: + throw new Exception("Unknown phase '{$phase}'!"); + } + + return $this->formatWhereClause($where); + + } + private function buildOrderClause(AphrontDatabaseConnection $conn_w) { return qsprintf($conn_w, 'ORDER BY id ASC'); }