1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-12-18 19:40:55 +01:00

Make cluster repositories more resistant to freezing

Summary:
Ref T10860. This allows us to recover if the connection to the database is lost during a push.

If we lose the connection to the master database during a push, we would previously freeze the repository. This is very safe, but not very operator-friendly since you have to go manually unfreeze it.

We don't need to be quite this aggressive about freezing things. The repository state is still consistent after we've "upgraded" the lock by setting `isWriting = 1`, so we're actually fine even if we lost the global lock.

Instead of just freezing the repository immediately, sit there in a loop waiting for the master to come back up for a few minutes. If it recovers, we can release the lock and everything will be OK again.

Basically, the changes are:

  - If we can't release the lock at first, sit in a loop trying really hard to release it for a while.
  - Add a unique lock identifier so we can be certain we're only releasing //our// lock no matter what else is going on.
  - Do the version reads on the same connection holding the lock, so we can be sure we haven't lost the lock before we do that read.

Test Plan:
  - Added a `sleep(10)` after accepting the write but before releasing the lock so I could run `mysqld stop` and force this issue to occur.
  - Pushed like this:

```
$ echo D >> record && git commit -am D && git push
[master 707ecc3] D
 1 file changed, 1 insertion(+)
# Push received by "local001.phacility.net", forwarding to cluster host.
# Waiting up to 120 second(s) for a cluster write lock...
# Acquired write lock immediately.
# Waiting up to 120 second(s) for a cluster read lock on "local001.phacility.net"...
# Acquired read lock immediately.
# Device "local001.phacility.net" is already a cluster leader and does not need to be synchronized.
# Ready to receive on cluster host "local001.phacility.net".
Counting objects: 3, done.
Delta compression using up to 8 threads.
Compressing objects: 100% (2/2), done.
Writing objects: 100% (3/3), 254 bytes | 0 bytes/s, done.
Total 3 (delta 1), reused 0 (delta 0)
BEGIN SLEEP
```

  - Here, I stopped `mysqld` from the CLI in another terminal window.

```
END SLEEP
# CRITICAL. Failed to release cluster write lock!
# The connection to the master database was lost while receiving the write.
# This process will spend 300 more second(s) attempting to recover, then give up.
```

  - Here, I started `mysqld` again.

```
# RECOVERED. Link to master database was restored.
# Released cluster write lock.
To ssh://local@localvault.phacility.com/diffusion/26/locktopia.git
   2cbf87c..707ecc3  master -> master
```

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T10860

Differential Revision: https://secure.phabricator.com/D15792
This commit is contained in:
epriestley 2016-04-24 10:07:35 -07:00
parent d0b5dac36b
commit 892a9a1f07
6 changed files with 184 additions and 41 deletions

View file

@ -0,0 +1,2 @@
ALTER TABLE {$NAMESPACE}_repository.repository_workingcopyversion
ADD lockOwner VARCHAR(255) COLLATE {$COLLATE_TEXT};

View file

@ -11,9 +11,11 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
private $repository; private $repository;
private $viewer; private $viewer;
private $logger;
private $clusterWriteLock; private $clusterWriteLock;
private $clusterWriteVersion; private $clusterWriteVersion;
private $logger; private $clusterWriteOwner;
/* -( Configuring Synchronization )---------------------------------------- */ /* -( Configuring Synchronization )---------------------------------------- */
@ -247,9 +249,14 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
$device = AlmanacKeys::getLiveDevice(); $device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID(); $device_phid = $device->getPHID();
$table = new PhabricatorRepositoryWorkingCopyVersion();
$locked_connection = $table->establishConnection('w');
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
$repository_phid); $repository_phid);
$write_lock->useSpecificConnection($locked_connection);
$lock_wait = phutil_units('2 minutes in seconds'); $lock_wait = phutil_units('2 minutes in seconds');
$this->logLine( $this->logLine(
@ -290,7 +297,7 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
throw new Exception( throw new Exception(
pht( pht(
'An previous write to this repository was interrupted; refusing '. 'An previous write to this repository was interrupted; refusing '.
'new writes. This issue resolves operator intervention to resolve, '. 'new writes. This issue requires operator intervention to resolve, '.
'see "Write Interruptions" in the "Cluster: Repositories" in the '. 'see "Write Interruptions" in the "Cluster: Repositories" in the '.
'documentation for instructions.')); 'documentation for instructions.'));
} }
@ -302,14 +309,20 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
throw $ex; throw $ex;
} }
$pid = getmypid();
$hash = Filesystem::readRandomCharacters(12);
$this->clusterWriteOwner = "{$pid}.{$hash}";
PhabricatorRepositoryWorkingCopyVersion::willWrite( PhabricatorRepositoryWorkingCopyVersion::willWrite(
$locked_connection,
$repository_phid, $repository_phid,
$device_phid, $device_phid,
array( array(
'userPHID' => $viewer->getPHID(), 'userPHID' => $viewer->getPHID(),
'epoch' => PhabricatorTime::getNow(), 'epoch' => PhabricatorTime::getNow(),
'devicePHID' => $device_phid, 'devicePHID' => $device_phid,
)); ),
$this->clusterWriteOwner);
$this->clusterWriteVersion = $max_version; $this->clusterWriteVersion = $max_version;
$this->clusterWriteLock = $write_lock; $this->clusterWriteLock = $write_lock;
@ -337,31 +350,105 @@ final class DiffusionRepositoryClusterEngine extends Phobject {
$device = AlmanacKeys::getLiveDevice(); $device = AlmanacKeys::getLiveDevice();
$device_phid = $device->getPHID(); $device_phid = $device->getPHID();
// NOTE: This means we're still bumping the version when pushes fail. We // It is possible that we've lost the global lock while receiving the push.
// could select only un-rejected events instead to bump a little less // For example, the master database may have been restarted between the
// often. // time we acquired the global lock and now, when the push has finished.
$new_log = id(new PhabricatorRepositoryPushEventQuery()) // We wrote a durable lock while we were holding the the global lock,
->setViewer(PhabricatorUser::getOmnipotentUser()) // essentially upgrading our lock. We can still safely release this upgraded
->withRepositoryPHIDs(array($repository_phid)) // lock even if we're no longer holding the global lock.
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion; // If we fail to release the lock, the repository will be frozen until
if ($new_log) { // an operator can figure out what happened, so we try pretty hard to
$new_version = $new_log->getID(); // reconnect to the database and release the lock.
} else {
$new_version = $old_version; $now = PhabricatorTime::getNow();
$duration = phutil_units('5 minutes in seconds');
$try_until = $now + $duration;
$did_release = false;
$already_failed = false;
while (PhabricatorTime::getNow() <= $try_until) {
try {
// NOTE: This means we're still bumping the version when pushes fail. We
// could select only un-rejected events instead to bump a little less
// often.
$new_log = id(new PhabricatorRepositoryPushEventQuery())
->setViewer(PhabricatorUser::getOmnipotentUser())
->withRepositoryPHIDs(array($repository_phid))
->setLimit(1)
->executeOne();
$old_version = $this->clusterWriteVersion;
if ($new_log) {
$new_version = $new_log->getID();
} else {
$new_version = $old_version;
}
PhabricatorRepositoryWorkingCopyVersion::didWrite(
$repository_phid,
$device_phid,
$this->clusterWriteVersion,
$new_log->getID(),
$this->clusterWriteOwner);
$did_release = true;
break;
} catch (AphrontConnectionQueryException $ex) {
$connection_exception = $ex;
} catch (AphrontConnectionLostQueryException $ex) {
$connection_exception = $ex;
}
if (!$already_failed) {
$already_failed = true;
$this->logLine(
pht('CRITICAL. Failed to release cluster write lock!'));
$this->logLine(
pht(
'The connection to the master database was lost while receiving '.
'the write.'));
$this->logLine(
pht(
'This process will spend %s more second(s) attempting to '.
'recover, then give up.',
new PhutilNumber($duration)));
}
sleep(1);
} }
PhabricatorRepositoryWorkingCopyVersion::didWrite( if ($did_release) {
$repository_phid, if ($already_failed) {
$device_phid, $this->logLine(
$this->clusterWriteVersion, pht('RECOVERED. Link to master database was restored.'));
$new_log->getID()); }
$this->logLine(pht('Released cluster write lock.'));
} else {
throw new Exception(
pht(
'Failed to reconnect to master database and release held write '.
'lock ("%s") on device "%s" for repository "%s" after trying '.
'for %s seconds(s). This repository will be frozen.',
$this->clusterWriteOwner,
$device->getName(),
$this->getDisplayName(),
new PhutilNumber($duration)));
}
// We can continue even if we've lost this lock, everything is still
// consistent.
try {
$this->clusterWriteLock->unlock();
} catch (Exception $ex) {
// Ignore.
}
$this->clusterWriteLock->unlock();
$this->clusterWriteLock = null; $this->clusterWriteLock = null;
$this->clusterWriteOwner = null;
} }

View file

@ -11,6 +11,7 @@ abstract class DiffusionGitSSHWorkflow
public function writeClusterEngineLogMessage($message) { public function writeClusterEngineLogMessage($message) {
parent::writeError($message); parent::writeError($message);
$this->getErrorChannel()->update();
} }
protected function identifyRepository() { protected function identifyRepository() {

View file

@ -55,6 +55,21 @@ abstract class DiffusionSSHWorkflow extends PhabricatorSSHWorkflow {
return $this; return $this;
} }
protected function getCurrentDeviceName() {
$device = AlmanacKeys::getLiveDevice();
if ($device) {
return $device->getName();
}
return php_uname('n');
}
protected function getTargetDeviceName() {
// TODO: This should use the correct device identity.
$uri = new PhutilURI($this->proxyURI);
return $uri->getDomain();
}
protected function shouldProxy() { protected function shouldProxy() {
return (bool)$this->proxyURI; return (bool)$this->proxyURI;
} }

View file

@ -7,6 +7,7 @@ final class PhabricatorRepositoryWorkingCopyVersion
protected $devicePHID; protected $devicePHID;
protected $repositoryVersion; protected $repositoryVersion;
protected $isWriting; protected $isWriting;
protected $lockOwner;
protected $writeProperties; protected $writeProperties;
protected function getConfiguration() { protected function getConfiguration() {
@ -16,6 +17,7 @@ final class PhabricatorRepositoryWorkingCopyVersion
'repositoryVersion' => 'uint32', 'repositoryVersion' => 'uint32',
'isWriting' => 'bool', 'isWriting' => 'bool',
'writeProperties' => 'text?', 'writeProperties' => 'text?',
'lockOwner' => 'text255?',
), ),
self::CONFIG_KEY_SCHEMA => array( self::CONFIG_KEY_SCHEMA => array(
'key_workingcopy' => array( 'key_workingcopy' => array(
@ -69,29 +71,33 @@ final class PhabricatorRepositoryWorkingCopyVersion
* by default. * by default.
*/ */
public static function willWrite( public static function willWrite(
AphrontDatabaseConnection $locked_connection,
$repository_phid, $repository_phid,
$device_phid, $device_phid,
array $write_properties) { array $write_properties,
$lock_owner) {
$version = new self(); $version = new self();
$conn_w = $version->establishConnection('w');
$table = $version->getTableName(); $table = $version->getTableName();
queryfx( queryfx(
$conn_w, $locked_connection,
'INSERT INTO %T 'INSERT INTO %T
(repositoryPHID, devicePHID, repositoryVersion, isWriting, (repositoryPHID, devicePHID, repositoryVersion, isWriting,
writeProperties) writeProperties, lockOwner)
VALUES VALUES
(%s, %s, %d, %d, %s) (%s, %s, %d, %d, %s, %s)
ON DUPLICATE KEY UPDATE ON DUPLICATE KEY UPDATE
isWriting = VALUES(isWriting), isWriting = VALUES(isWriting),
writeProperties = VALUES(writeProperties)', writeProperties = VALUES(writeProperties),
lockOwner = VALUES(lockOwner)',
$table, $table,
$repository_phid, $repository_phid,
$device_phid, $device_phid,
0, 0,
1, 1,
phutil_json_encode($write_properties)); phutil_json_encode($write_properties),
$lock_owner);
} }
@ -102,7 +108,9 @@ final class PhabricatorRepositoryWorkingCopyVersion
$repository_phid, $repository_phid,
$device_phid, $device_phid,
$old_version, $old_version,
$new_version) { $new_version,
$lock_owner) {
$version = new self(); $version = new self();
$conn_w = $version->establishConnection('w'); $conn_w = $version->establishConnection('w');
$table = $version->getTableName(); $table = $version->getTableName();
@ -111,17 +119,20 @@ final class PhabricatorRepositoryWorkingCopyVersion
$conn_w, $conn_w,
'UPDATE %T SET 'UPDATE %T SET
repositoryVersion = %d, repositoryVersion = %d,
isWriting = 0 isWriting = 0,
lockOwner = NULL
WHERE WHERE
repositoryPHID = %s AND repositoryPHID = %s AND
devicePHID = %s AND devicePHID = %s AND
repositoryVersion = %d AND repositoryVersion = %d AND
isWriting = 1', isWriting = 1 AND
lockOwner = %s',
$table, $table,
$new_version, $new_version,
$repository_phid, $repository_phid,
$device_phid, $device_phid,
$old_version); $old_version,
$lock_owner);
} }

View file

@ -29,6 +29,7 @@
final class PhabricatorGlobalLock extends PhutilLock { final class PhabricatorGlobalLock extends PhutilLock {
private $conn; private $conn;
private $isExternalConnection = false;
private static $pool = array(); private static $pool = array();
@ -74,6 +75,7 @@ final class PhabricatorGlobalLock extends PhutilLock {
*/ */
public function useSpecificConnection(AphrontDatabaseConnection $conn) { public function useSpecificConnection(AphrontDatabaseConnection $conn) {
$this->conn = $conn; $this->conn = $conn;
$this->isExternalConnection = true;
return $this; return $this;
} }
@ -109,29 +111,54 @@ final class PhabricatorGlobalLock extends PhutilLock {
$max_allowed_timeout = 2147483; $max_allowed_timeout = 2147483;
queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout); queryfx($conn, 'SET wait_timeout = %d', $max_allowed_timeout);
$lock_name = $this->getName();
$result = queryfx_one( $result = queryfx_one(
$conn, $conn,
'SELECT GET_LOCK(%s, %f)', 'SELECT GET_LOCK(%s, %f)',
$this->getName(), $lock_name,
$wait); $wait);
$ok = head($result); $ok = head($result);
if (!$ok) { if (!$ok) {
throw new PhutilLockException($this->getName()); throw new PhutilLockException($lock_name);
} }
$conn->rememberLock($lock_name);
$this->conn = $conn; $this->conn = $conn;
} }
protected function doUnlock() { protected function doUnlock() {
queryfx( $lock_name = $this->getName();
$this->conn,
'SELECT RELEASE_LOCK(%s)', $conn = $this->conn;
$this->getName());
try {
$result = queryfx_one(
$conn,
'SELECT RELEASE_LOCK(%s)',
$lock_name);
$conn->forgetLock($lock_name);
} catch (Exception $ex) {
$result = array(null);
}
$ok = head($result);
if (!$ok) {
// TODO: We could throw here, but then this lock doesn't get marked
// unlocked and we throw again later when exiting. It also doesn't
// particularly matter for any current applications. For now, just
// swallow the error.
}
$this->conn->close();
self::$pool[] = $this->conn;
$this->conn = null; $this->conn = null;
$this->isExternalConnection = false;
if (!$this->isExternalConnection) {
$conn->close();
self::$pool[] = $conn;
}
} }
} }