mirror of
https://we.phorge.it/source/phorge.git
synced 2025-01-24 13:38:19 +01:00
Move toward multi-master replicated repositories
Summary: Ref T4292. This mostly implements the locking/versioning logic for multi-master repositories. It is only active on Git SSH pathways, and doesn't actually do anything useful yet: it just does bookkeeping so far. When we read (e.g., `git fetch`) the logic goes like this: - Get the read lock (unique to device + repository). - Read all the versions of the repository on every other device. - If any node has a newer version: - Fetch the newer version. - Increment our version to be the same as the version we fetched. - Release the read lock. - Actually do the fetch. This makes sure that any time you do a read, you always read the most recently acknowledged write. You may have to wait for an internal fetch to happen (this isn't actually implemented yet) but the operation will always work like you expect it to. When we write (e.g., `git push`) the logic goes like this: - Get the write lock (unique to the repository). - Do all the read steps so we're up to date. - Mark a write pending. - Do the actual write. - Bump our version and mark our write finished. - Release the write lock. This allows you to write to any replica. Again, you might have to wait for a fetch first, but everything will work like you expect. There's one notable failure mode here: if the network connection between the repository node and the database fails during the write, the write lock might be released even though a write is ongoing. The "isWriting" column protects against that, by staying locked if we lose our connection to the database. This will currently "freeze" the repository (prevent any new writes) until an administrator can sort things out, since it'd dangerous to continue doing writes (we may lose data). (Since we won't actually acknowledge the write, I think, we could probably smooth this out a bit and make it self-healing //most// of the time: basically, have the broken node rewind itself by updating from another good node. But that's a little more complex.) Test Plan: - Pushed changes to a cluster-mode repository. - Viewed web interface, saw "writing" flag and version changes. - Pulled changes. - Faked various failures, got sensible states. Reviewers: chad Reviewed By: chad Maniphest Tasks: T4292 Differential Revision: https://secure.phabricator.com/D15688
This commit is contained in:
parent
58eef68b7c
commit
4244cad990
7 changed files with 361 additions and 1 deletions
8
resources/sql/autopatches/20160411.repo.1.version.sql
Normal file
8
resources/sql/autopatches/20160411.repo.1.version.sql
Normal file
|
@ -0,0 +1,8 @@
|
|||
CREATE TABLE {$NAMESPACE}_repository.repository_workingcopyversion (
|
||||
id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY,
|
||||
repositoryPHID VARBINARY(64) NOT NULL,
|
||||
devicePHID VARBINARY(64) NOT NULL,
|
||||
repositoryVersion INT UNSIGNED NOT NULL,
|
||||
isWriting BOOL NOT NULL,
|
||||
UNIQUE KEY `key_workingcopy` (repositoryPHID, devicePHID)
|
||||
) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT};
|
|
@ -3210,6 +3210,7 @@ phutil_register_library_map(array(
|
|||
'PhabricatorRepositoryURITestCase' => 'applications/repository/storage/__tests__/PhabricatorRepositoryURITestCase.php',
|
||||
'PhabricatorRepositoryVCSPassword' => 'applications/repository/storage/PhabricatorRepositoryVCSPassword.php',
|
||||
'PhabricatorRepositoryVersion' => 'applications/repository/constants/PhabricatorRepositoryVersion.php',
|
||||
'PhabricatorRepositoryWorkingCopyVersion' => 'applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php',
|
||||
'PhabricatorRequestExceptionHandler' => 'aphront/handler/PhabricatorRequestExceptionHandler.php',
|
||||
'PhabricatorResourceSite' => 'aphront/site/PhabricatorResourceSite.php',
|
||||
'PhabricatorRobotsController' => 'applications/system/controller/PhabricatorRobotsController.php',
|
||||
|
@ -7854,6 +7855,7 @@ phutil_register_library_map(array(
|
|||
'PhabricatorRepositoryURITestCase' => 'PhabricatorTestCase',
|
||||
'PhabricatorRepositoryVCSPassword' => 'PhabricatorRepositoryDAO',
|
||||
'PhabricatorRepositoryVersion' => 'Phobject',
|
||||
'PhabricatorRepositoryWorkingCopyVersion' => 'PhabricatorRepositoryDAO',
|
||||
'PhabricatorRequestExceptionHandler' => 'AphrontRequestExceptionHandler',
|
||||
'PhabricatorResourceSite' => 'PhabricatorSite',
|
||||
'PhabricatorRobotsController' => 'PhabricatorController',
|
||||
|
|
|
@ -44,6 +44,16 @@ final class DiffusionRepositoryClusterManagementPanel
|
|||
$bindings = $service->getBindings();
|
||||
$bindings = mgroup($bindings, 'getDevicePHID');
|
||||
|
||||
// This is an unusual read which always comes from the master.
|
||||
if (PhabricatorEnv::isReadOnly()) {
|
||||
$versions = array();
|
||||
} else {
|
||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
|
||||
$repository->getPHID());
|
||||
}
|
||||
|
||||
$versions = mpull($versions, null, 'getDevicePHID');
|
||||
|
||||
foreach ($bindings as $binding_group) {
|
||||
$all_disabled = true;
|
||||
foreach ($binding_group as $binding) {
|
||||
|
@ -73,6 +83,27 @@ final class DiffusionRepositoryClusterManagementPanel
|
|||
|
||||
$device = $any_binding->getDevice();
|
||||
|
||||
$version = idx($versions, $device->getPHID());
|
||||
if ($version) {
|
||||
$version_number = $version->getRepositoryVersion();
|
||||
$version_number = phutil_tag(
|
||||
'a',
|
||||
array(
|
||||
'href' => "/diffusion/pushlog/view/{$version_number}/",
|
||||
),
|
||||
$version_number);
|
||||
} else {
|
||||
$version_number = '-';
|
||||
}
|
||||
|
||||
if ($version && $version->getIsWriting()) {
|
||||
$is_writing = id(new PHUIIconView())
|
||||
->setIcon('fa-pencil green');
|
||||
} else {
|
||||
$is_writing = id(new PHUIIconView())
|
||||
->setIcon('fa-pencil grey');
|
||||
}
|
||||
|
||||
$rows[] = array(
|
||||
$binding_icon,
|
||||
phutil_tag(
|
||||
|
@ -81,6 +112,8 @@ final class DiffusionRepositoryClusterManagementPanel
|
|||
'href' => $device->getURI(),
|
||||
),
|
||||
$device->getName()),
|
||||
$version_number,
|
||||
$is_writing,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -91,11 +124,15 @@ final class DiffusionRepositoryClusterManagementPanel
|
|||
array(
|
||||
null,
|
||||
pht('Device'),
|
||||
pht('Version'),
|
||||
pht('Writing'),
|
||||
))
|
||||
->setColumnClasses(
|
||||
array(
|
||||
null,
|
||||
'wide',
|
||||
null,
|
||||
null,
|
||||
'right wide',
|
||||
));
|
||||
|
||||
$doc_href = PhabricatorEnv::getDoclink('Cluster: Repositories');
|
||||
|
|
|
@ -21,8 +21,12 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
|
||||
if ($this->shouldProxy()) {
|
||||
$command = $this->getProxyCommand();
|
||||
$is_proxy = true;
|
||||
} else {
|
||||
$command = csprintf('git-receive-pack %s', $repository->getLocalPath());
|
||||
$is_proxy = false;
|
||||
|
||||
$repository->synchronizeWorkingCopyBeforeWrite();
|
||||
}
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
|
@ -41,6 +45,10 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
$this->waitForGitClient();
|
||||
}
|
||||
|
||||
if (!$is_proxy) {
|
||||
$repository->synchronizeWorkingCopyAfterWrite();
|
||||
}
|
||||
|
||||
return $err;
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow {
|
|||
$command = $this->getProxyCommand();
|
||||
} else {
|
||||
$command = csprintf('git-upload-pack -- %s', $repository->getLocalPath());
|
||||
$repository->synchronizeWorkingCopyBeforeRead();
|
||||
}
|
||||
$command = PhabricatorDaemon::sudoCommandAsDaemonUser($command);
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
/**
|
||||
* @task uri Repository URI Management
|
||||
* @task autoclose Autoclose
|
||||
* @task sync Cluster Synchronization
|
||||
*/
|
||||
final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
||||
implements
|
||||
|
@ -62,6 +63,9 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
private $mostRecentCommit = self::ATTACHABLE;
|
||||
private $projectPHIDs = self::ATTACHABLE;
|
||||
|
||||
private $clusterWriteLock;
|
||||
private $clusterWriteVersion;
|
||||
|
||||
public static function initializeNewRepository(PhabricatorUser $actor) {
|
||||
$app = id(new PhabricatorApplicationQuery())
|
||||
->setViewer($actor)
|
||||
|
@ -2262,6 +2266,161 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO
|
|||
}
|
||||
|
||||
|
||||
/* -( Cluster Synchronization )-------------------------------------------- */
|
||||
|
||||
|
||||
/**
|
||||
* @task sync
|
||||
*/
|
||||
public function synchronizeWorkingCopyBeforeRead() {
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
if (!$device) {
|
||||
return;
|
||||
}
|
||||
|
||||
$repository_phid = $this->getPHID();
|
||||
$device_phid = $device->getPHID();
|
||||
|
||||
$read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock(
|
||||
$repository_phid,
|
||||
$device_phid);
|
||||
|
||||
// TODO: Raise a more useful exception if we fail to grab this lock.
|
||||
$read_lock->lock(phutil_units('2 minutes in seconds'));
|
||||
|
||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
|
||||
$repository_phid);
|
||||
$versions = mpull($versions, null, 'getDevicePHID');
|
||||
|
||||
$this_version = idx($versions, $device_phid);
|
||||
if ($this_version) {
|
||||
$this_version = (int)$this_version->getRepositoryVersion();
|
||||
} else {
|
||||
$this_version = 0;
|
||||
}
|
||||
|
||||
if ($versions) {
|
||||
$max_version = (int)max(mpull($versions, 'getRepositoryVersion'));
|
||||
} else {
|
||||
$max_version = 0;
|
||||
}
|
||||
|
||||
if ($max_version > $this_version) {
|
||||
$fetchable = array();
|
||||
foreach ($versions as $version) {
|
||||
if ($version->getRepositoryVersion() == $max_version) {
|
||||
$fetchable[] = $version->getDevicePHID();
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Actualy fetch the newer version from one of the nodes which has
|
||||
// it.
|
||||
|
||||
PhabricatorRepositoryWorkingCopyVersion::updateVersion(
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$max_version);
|
||||
}
|
||||
|
||||
$read_lock->unlock();
|
||||
|
||||
return $max_version;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @task sync
|
||||
*/
|
||||
public function synchronizeWorkingCopyBeforeWrite() {
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
if (!$device) {
|
||||
return;
|
||||
}
|
||||
|
||||
$repository_phid = $this->getPHID();
|
||||
$device_phid = $device->getPHID();
|
||||
|
||||
$write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock(
|
||||
$repository_phid);
|
||||
|
||||
// TODO: Raise a more useful exception if we fail to grab this lock.
|
||||
$write_lock->lock(phutil_units('2 minutes in seconds'));
|
||||
|
||||
$versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions(
|
||||
$repository_phid);
|
||||
foreach ($versions as $version) {
|
||||
if (!$version->getIsWriting()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO: This should provide more help so users can resolve the issue.
|
||||
throw new Exception(
|
||||
pht(
|
||||
'An incomplete write was previously performed to this repository; '.
|
||||
'refusing new writes.'));
|
||||
}
|
||||
|
||||
$max_version = $this->synchronizeWorkingCopyBeforeRead();
|
||||
|
||||
PhabricatorRepositoryWorkingCopyVersion::willWrite(
|
||||
$repository_phid,
|
||||
$device_phid);
|
||||
|
||||
$this->clusterWriteVersion = $max_version;
|
||||
$this->clusterWriteLock = $write_lock;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @task sync
|
||||
*/
|
||||
public function synchronizeWorkingCopyAfterWrite() {
|
||||
if (!$this->clusterWriteLock) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'Trying to synchronize after write, but not holding a write '.
|
||||
'lock!'));
|
||||
}
|
||||
|
||||
$device = AlmanacKeys::getLiveDevice();
|
||||
if (!$device) {
|
||||
throw new Exception(
|
||||
pht(
|
||||
'Trying to synchronize after write, but this host is not an '.
|
||||
'Almanac device.'));
|
||||
}
|
||||
|
||||
$repository_phid = $this->getPHID();
|
||||
$device_phid = $device->getPHID();
|
||||
|
||||
// NOTE: This means we're still bumping the version when pushes fail. We
|
||||
// could select only un-rejected events instead to bump a little less
|
||||
// often.
|
||||
|
||||
$new_log = id(new PhabricatorRepositoryPushEventQuery())
|
||||
->setViewer(PhabricatorUser::getOmnipotentUser())
|
||||
->withRepositoryPHIDs(array($repository_phid))
|
||||
->setLimit(1)
|
||||
->executeOne();
|
||||
|
||||
$old_version = $this->clusterWriteVersion;
|
||||
if ($new_log) {
|
||||
$new_version = $new_log->getID();
|
||||
} else {
|
||||
$new_version = $old_version;
|
||||
}
|
||||
|
||||
PhabricatorRepositoryWorkingCopyVersion::didWrite(
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$this->clusterWriteVersion,
|
||||
$new_log->getID());
|
||||
|
||||
$this->clusterWriteLock->unlock();
|
||||
$this->clusterWriteLock = null;
|
||||
}
|
||||
|
||||
|
||||
/* -( Symbols )-------------------------------------------------------------*/
|
||||
|
||||
public function getSymbolSources() {
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
<?php
|
||||
|
||||
final class PhabricatorRepositoryWorkingCopyVersion
|
||||
extends PhabricatorRepositoryDAO {
|
||||
|
||||
protected $repositoryPHID;
|
||||
protected $devicePHID;
|
||||
protected $repositoryVersion;
|
||||
protected $isWriting;
|
||||
|
||||
protected function getConfiguration() {
|
||||
return array(
|
||||
self::CONFIG_TIMESTAMPS => false,
|
||||
self::CONFIG_COLUMN_SCHEMA => array(
|
||||
'repositoryVersion' => 'uint32',
|
||||
'isWriting' => 'bool',
|
||||
),
|
||||
self::CONFIG_KEY_SCHEMA => array(
|
||||
'key_workingcopy' => array(
|
||||
'columns' => array('repositoryPHID', 'devicePHID'),
|
||||
'unique' => true,
|
||||
),
|
||||
),
|
||||
) + parent::getConfiguration();
|
||||
}
|
||||
|
||||
public static function loadVersions($repository_phid) {
|
||||
$version = new self();
|
||||
$conn_w = $version->establishConnection('w');
|
||||
$table = $version->getTableName();
|
||||
|
||||
// This is a normal read, but force it to come from the master.
|
||||
$rows = queryfx_all(
|
||||
$conn_w,
|
||||
'SELECT * FROM %T WHERE repositoryPHID = %s',
|
||||
$table,
|
||||
$repository_phid);
|
||||
|
||||
return $version->loadAllFromArray($rows);
|
||||
}
|
||||
|
||||
public static function getReadLock($repository_phid, $device_phid) {
|
||||
$repository_hash = PhabricatorHash::digestForIndex($repository_phid);
|
||||
$device_hash = PhabricatorHash::digestForIndex($device_phid);
|
||||
$lock_key = "repo.read({$repository_hash}, {$device_hash})";
|
||||
|
||||
return PhabricatorGlobalLock::newLock($lock_key);
|
||||
}
|
||||
|
||||
public static function getWriteLock($repository_phid) {
|
||||
$repository_hash = PhabricatorHash::digestForIndex($repository_phid);
|
||||
$lock_key = "repo.write({$repository_hash})";
|
||||
|
||||
return PhabricatorGlobalLock::newLock($lock_key);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Before a write, set the "isWriting" flag.
|
||||
*
|
||||
* This allows us to detect when we lose a node partway through a write and
|
||||
* may have committed and acknowledged a write on a node that lost the lock
|
||||
* partway through the write and is no longer reachable.
|
||||
*
|
||||
* In particular, if a node loses its connection to the datbase the global
|
||||
* lock is released by default. This is a durable lock which stays locked
|
||||
* by default.
|
||||
*/
|
||||
public static function willWrite($repository_phid, $device_phid) {
|
||||
$version = new self();
|
||||
$conn_w = $version->establishConnection('w');
|
||||
$table = $version->getTableName();
|
||||
|
||||
queryfx(
|
||||
$conn_w,
|
||||
'INSERT INTO %T
|
||||
(repositoryPHID, devicePHID, repositoryVersion, isWriting)
|
||||
VALUES
|
||||
(%s, %s, %d, %d)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
isWriting = VALUES(isWriting)',
|
||||
$table,
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
1,
|
||||
1);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* After a write, update the version and release the "isWriting" lock.
|
||||
*/
|
||||
public static function didWrite(
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$old_version,
|
||||
$new_version) {
|
||||
$version = new self();
|
||||
$conn_w = $version->establishConnection('w');
|
||||
$table = $version->getTableName();
|
||||
|
||||
queryfx(
|
||||
$conn_w,
|
||||
'UPDATE %T SET repositoryVersion = %d, isWriting = 0
|
||||
WHERE
|
||||
repositoryPHID = %s AND
|
||||
devicePHID = %s AND
|
||||
repositoryVersion = %d AND
|
||||
isWriting = 1',
|
||||
$table,
|
||||
$new_version,
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$old_version);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* After a fetch, set the local version to the fetched version.
|
||||
*/
|
||||
public static function updateVersion(
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$new_version) {
|
||||
$version = new self();
|
||||
$conn_w = $version->establishConnection('w');
|
||||
$table = $version->getTableName();
|
||||
|
||||
queryfx(
|
||||
$conn_w,
|
||||
'INSERT INTO %T
|
||||
(repositoryPHID, devicePHID, repositoryVersion, isWriting)
|
||||
VALUES
|
||||
(%s, %s, %d, %d)
|
||||
ON DUPLICATE KEY UPDATE
|
||||
repositoryVersion = VALUES(repositoryVersion)',
|
||||
$table,
|
||||
$repository_phid,
|
||||
$device_phid,
|
||||
$new_version,
|
||||
0);
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Add table
Reference in a new issue