diff --git a/resources/sql/autopatches/20160424.locks.1.sql b/resources/sql/autopatches/20160424.locks.1.sql new file mode 100644 index 0000000000..0edea13689 --- /dev/null +++ b/resources/sql/autopatches/20160424.locks.1.sql @@ -0,0 +1,2 @@ +ALTER TABLE {$NAMESPACE}_repository.repository_workingcopyversion + ADD lockOwner VARCHAR(255) COLLATE {$COLLATE_TEXT}; diff --git a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php index f388fc2ad6..e3c70fecd9 100644 --- a/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php +++ b/src/applications/diffusion/protocol/DiffusionRepositoryClusterEngine.php @@ -11,9 +11,11 @@ final class DiffusionRepositoryClusterEngine extends Phobject { private $repository; private $viewer; + private $logger; + private $clusterWriteLock; private $clusterWriteVersion; - private $logger; + private $clusterWriteOwner; /* -( Configuring Synchronization )---------------------------------------- */ @@ -247,9 +249,14 @@ final class DiffusionRepositoryClusterEngine extends Phobject { $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); + $table = new PhabricatorRepositoryWorkingCopyVersion(); + $locked_connection = $table->establishConnection('w'); + $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( $repository_phid); + $write_lock->useSpecificConnection($locked_connection); + $lock_wait = phutil_units('2 minutes in seconds'); $this->logLine( @@ -290,7 +297,7 @@ final class DiffusionRepositoryClusterEngine extends Phobject { throw new Exception( pht( 'An previous write to this repository was interrupted; refusing '. - 'new writes. This issue resolves operator intervention to resolve, '. + 'new writes. This issue requires operator intervention to resolve, '. 'see "Write Interruptions" in the "Cluster: Repositories" in the '. 'documentation for instructions.')); } @@ -302,14 +309,20 @@ final class DiffusionRepositoryClusterEngine extends Phobject { throw $ex; } + $pid = getmypid(); + $hash = Filesystem::readRandomCharacters(12); + $this->clusterWriteOwner = "{$pid}.{$hash}"; + PhabricatorRepositoryWorkingCopyVersion::willWrite( + $locked_connection, $repository_phid, $device_phid, array( 'userPHID' => $viewer->getPHID(), 'epoch' => PhabricatorTime::getNow(), 'devicePHID' => $device_phid, - )); + ), + $this->clusterWriteOwner); $this->clusterWriteVersion = $max_version; $this->clusterWriteLock = $write_lock; @@ -337,31 +350,105 @@ final class DiffusionRepositoryClusterEngine extends Phobject { $device = AlmanacKeys::getLiveDevice(); $device_phid = $device->getPHID(); - // NOTE: This means we're still bumping the version when pushes fail. We - // could select only un-rejected events instead to bump a little less - // often. + // It is possible that we've lost the global lock while receiving the push. + // For example, the master database may have been restarted between the + // time we acquired the global lock and now, when the push has finished. - $new_log = id(new PhabricatorRepositoryPushEventQuery()) - ->setViewer(PhabricatorUser::getOmnipotentUser()) - ->withRepositoryPHIDs(array($repository_phid)) - ->setLimit(1) - ->executeOne(); + // We wrote a durable lock while we were holding the the global lock, + // essentially upgrading our lock. We can still safely release this upgraded + // lock even if we're no longer holding the global lock. - $old_version = $this->clusterWriteVersion; - if ($new_log) { - $new_version = $new_log->getID(); - } else { - $new_version = $old_version; + // If we fail to release the lock, the repository will be frozen until + // an operator can figure out what happened, so we try pretty hard to + // reconnect to the database and release the lock. + + $now = PhabricatorTime::getNow(); + $duration = phutil_units('5 minutes in seconds'); + $try_until = $now + $duration; + + $did_release = false; + $already_failed = false; + while (PhabricatorTime::getNow() <= $try_until) { + try { + // NOTE: This means we're still bumping the version when pushes fail. We + // could select only un-rejected events instead to bump a little less + // often. + + $new_log = id(new PhabricatorRepositoryPushEventQuery()) + ->setViewer(PhabricatorUser::getOmnipotentUser()) + ->withRepositoryPHIDs(array($repository_phid)) + ->setLimit(1) + ->executeOne(); + + $old_version = $this->clusterWriteVersion; + if ($new_log) { + $new_version = $new_log->getID(); + } else { + $new_version = $old_version; + } + + PhabricatorRepositoryWorkingCopyVersion::didWrite( + $repository_phid, + $device_phid, + $this->clusterWriteVersion, + $new_log->getID(), + $this->clusterWriteOwner); + $did_release = true; + break; + } catch (AphrontConnectionQueryException $ex) { + $connection_exception = $ex; + } catch (AphrontConnectionLostQueryException $ex) { + $connection_exception = $ex; + } + + if (!$already_failed) { + $already_failed = true; + $this->logLine( + pht('CRITICAL. Failed to release cluster write lock!')); + + $this->logLine( + pht( + 'The connection to the master database was lost while receiving '. + 'the write.')); + + $this->logLine( + pht( + 'This process will spend %s more second(s) attempting to '. + 'recover, then give up.', + new PhutilNumber($duration))); + } + + sleep(1); } - PhabricatorRepositoryWorkingCopyVersion::didWrite( - $repository_phid, - $device_phid, - $this->clusterWriteVersion, - $new_log->getID()); + if ($did_release) { + if ($already_failed) { + $this->logLine( + pht('RECOVERED. Link to master database was restored.')); + } + $this->logLine(pht('Released cluster write lock.')); + } else { + throw new Exception( + pht( + 'Failed to reconnect to master database and release held write '. + 'lock ("%s") on device "%s" for repository "%s" after trying '. + 'for %s seconds(s). This repository will be frozen.', + $this->clusterWriteOwner, + $device->getName(), + $this->getDisplayName(), + new PhutilNumber($duration))); + } + + // We can continue even if we've lost this lock, everything is still + // consistent. + try { + $this->clusterWriteLock->unlock(); + } catch (Exception $ex) { + // Ignore. + } - $this->clusterWriteLock->unlock(); $this->clusterWriteLock = null; + $this->clusterWriteOwner = null; } diff --git a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php index 79c00231c7..9843ca8401 100644 --- a/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitSSHWorkflow.php @@ -11,6 +11,7 @@ abstract class DiffusionGitSSHWorkflow public function writeClusterEngineLogMessage($message) { parent::writeError($message); + $this->getErrorChannel()->update(); } protected function identifyRepository() { diff --git a/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php index b1694de814..60b929f7c4 100644 --- a/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionSSHWorkflow.php @@ -55,6 +55,21 @@ abstract class DiffusionSSHWorkflow extends PhabricatorSSHWorkflow { return $this; } + protected function getCurrentDeviceName() { + $device = AlmanacKeys::getLiveDevice(); + if ($device) { + return $device->getName(); + } + + return php_uname('n'); + } + + protected function getTargetDeviceName() { + // TODO: This should use the correct device identity. + $uri = new PhutilURI($this->proxyURI); + return $uri->getDomain(); + } + protected function shouldProxy() { return (bool)$this->proxyURI; } diff --git a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php index 0feeec759f..51abc70d35 100644 --- a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php +++ b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php @@ -7,6 +7,7 @@ final class PhabricatorRepositoryWorkingCopyVersion protected $devicePHID; protected $repositoryVersion; protected $isWriting; + protected $lockOwner; protected $writeProperties; protected function getConfiguration() { @@ -16,6 +17,7 @@ final class PhabricatorRepositoryWorkingCopyVersion 'repositoryVersion' => 'uint32', 'isWriting' => 'bool', 'writeProperties' => 'text?', + 'lockOwner' => 'text255?', ), self::CONFIG_KEY_SCHEMA => array( 'key_workingcopy' => array( @@ -69,29 +71,33 @@ final class PhabricatorRepositoryWorkingCopyVersion * by default. */ public static function willWrite( + AphrontDatabaseConnection $locked_connection, $repository_phid, $device_phid, - array $write_properties) { + array $write_properties, + $lock_owner) { + $version = new self(); - $conn_w = $version->establishConnection('w'); $table = $version->getTableName(); queryfx( - $conn_w, + $locked_connection, 'INSERT INTO %T (repositoryPHID, devicePHID, repositoryVersion, isWriting, - writeProperties) + writeProperties, lockOwner) VALUES - (%s, %s, %d, %d, %s) + (%s, %s, %d, %d, %s, %s) ON DUPLICATE KEY UPDATE isWriting = VALUES(isWriting), - writeProperties = VALUES(writeProperties)', + writeProperties = VALUES(writeProperties), + lockOwner = VALUES(lockOwner)', $table, $repository_phid, $device_phid, 0, 1, - phutil_json_encode($write_properties)); + phutil_json_encode($write_properties), + $lock_owner); } @@ -102,7 +108,9 @@ final class PhabricatorRepositoryWorkingCopyVersion $repository_phid, $device_phid, $old_version, - $new_version) { + $new_version, + $lock_owner) { + $version = new self(); $conn_w = $version->establishConnection('w'); $table = $version->getTableName(); @@ -111,17 +119,20 @@ final class PhabricatorRepositoryWorkingCopyVersion $conn_w, 'UPDATE %T SET repositoryVersion = %d, - isWriting = 0 + isWriting = 0, + lockOwner = NULL WHERE repositoryPHID = %s AND devicePHID = %s AND repositoryVersion = %d AND - isWriting = 1', + isWriting = 1 AND + lockOwner = %s', $table, $new_version, $repository_phid, $device_phid, - $old_version); + $old_version, + $lock_owner); } diff --git a/src/infrastructure/util/PhabricatorGlobalLock.php b/src/infrastructure/util/PhabricatorGlobalLock.php index 394f57d9a9..26e11d1899 100644 --- a/src/infrastructure/util/PhabricatorGlobalLock.php +++ b/src/infrastructure/util/PhabricatorGlobalLock.php @@ -29,6 +29,7 @@ final class PhabricatorGlobalLock extends PhutilLock { private $conn; + private $isExternalConnection = false; private static $pool = array(); @@ -74,6 +75,7 @@ final class PhabricatorGlobalLock extends PhutilLock { */ public function useSpecificConnection(AphrontDatabaseConnection $conn) { $this->conn = $conn; + $this->isExternalConnection = true; return $this; } @@ -109,29 +111,54 @@ final class PhabricatorGlobalLock extends PhutilLock { $max_allowed_timeout = 2147483; queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout); + $lock_name = $this->getName(); + $result = queryfx_one( $conn, 'SELECT GET_LOCK(%s, %f)', - $this->getName(), + $lock_name, $wait); $ok = head($result); if (!$ok) { - throw new PhutilLockException($this->getName()); + throw new PhutilLockException($lock_name); } + $conn->rememberLock($lock_name); + $this->conn = $conn; } protected function doUnlock() { - queryfx( - $this->conn, - 'SELECT RELEASE_LOCK(%s)', - $this->getName()); + $lock_name = $this->getName(); + + $conn = $this->conn; + + try { + $result = queryfx_one( + $conn, + 'SELECT RELEASE_LOCK(%s)', + $lock_name); + $conn->forgetLock($lock_name); + } catch (Exception $ex) { + $result = array(null); + } + + $ok = head($result); + if (!$ok) { + // TODO: We could throw here, but then this lock doesn't get marked + // unlocked and we throw again later when exiting. It also doesn't + // particularly matter for any current applications. For now, just + // swallow the error. + } - $this->conn->close(); - self::$pool[] = $this->conn; $this->conn = null; + $this->isExternalConnection = false; + + if (!$this->isExternalConnection) { + $conn->close(); + self::$pool[] = $conn; + } } }