diff --git a/resources/sql/autopatches/20160411.repo.1.version.sql b/resources/sql/autopatches/20160411.repo.1.version.sql new file mode 100644 index 0000000000..bd0db5f5ce --- /dev/null +++ b/resources/sql/autopatches/20160411.repo.1.version.sql @@ -0,0 +1,8 @@ +CREATE TABLE {$NAMESPACE}_repository.repository_workingcopyversion ( + id INT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY, + repositoryPHID VARBINARY(64) NOT NULL, + devicePHID VARBINARY(64) NOT NULL, + repositoryVersion INT UNSIGNED NOT NULL, + isWriting BOOL NOT NULL, + UNIQUE KEY `key_workingcopy` (repositoryPHID, devicePHID) +) ENGINE=InnoDB, COLLATE {$COLLATE_TEXT}; diff --git a/src/__phutil_library_map__.php b/src/__phutil_library_map__.php index 72782aa8ef..a1588abcae 100644 --- a/src/__phutil_library_map__.php +++ b/src/__phutil_library_map__.php @@ -3210,6 +3210,7 @@ phutil_register_library_map(array( 'PhabricatorRepositoryURITestCase' => 'applications/repository/storage/__tests__/PhabricatorRepositoryURITestCase.php', 'PhabricatorRepositoryVCSPassword' => 'applications/repository/storage/PhabricatorRepositoryVCSPassword.php', 'PhabricatorRepositoryVersion' => 'applications/repository/constants/PhabricatorRepositoryVersion.php', + 'PhabricatorRepositoryWorkingCopyVersion' => 'applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php', 'PhabricatorRequestExceptionHandler' => 'aphront/handler/PhabricatorRequestExceptionHandler.php', 'PhabricatorResourceSite' => 'aphront/site/PhabricatorResourceSite.php', 'PhabricatorRobotsController' => 'applications/system/controller/PhabricatorRobotsController.php', @@ -7854,6 +7855,7 @@ phutil_register_library_map(array( 'PhabricatorRepositoryURITestCase' => 'PhabricatorTestCase', 'PhabricatorRepositoryVCSPassword' => 'PhabricatorRepositoryDAO', 'PhabricatorRepositoryVersion' => 'Phobject', + 'PhabricatorRepositoryWorkingCopyVersion' => 'PhabricatorRepositoryDAO', 'PhabricatorRequestExceptionHandler' => 'AphrontRequestExceptionHandler', 'PhabricatorResourceSite' => 'PhabricatorSite', 'PhabricatorRobotsController' => 'PhabricatorController', diff --git a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php index 2a6841ebdb..27991f62c7 100644 --- a/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php +++ b/src/applications/diffusion/management/DiffusionRepositoryClusterManagementPanel.php @@ -44,6 +44,16 @@ final class DiffusionRepositoryClusterManagementPanel $bindings = $service->getBindings(); $bindings = mgroup($bindings, 'getDevicePHID'); + // This is an unusual read which always comes from the master. + if (PhabricatorEnv::isReadOnly()) { + $versions = array(); + } else { + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository->getPHID()); + } + + $versions = mpull($versions, null, 'getDevicePHID'); + foreach ($bindings as $binding_group) { $all_disabled = true; foreach ($binding_group as $binding) { @@ -73,6 +83,27 @@ final class DiffusionRepositoryClusterManagementPanel $device = $any_binding->getDevice(); + $version = idx($versions, $device->getPHID()); + if ($version) { + $version_number = $version->getRepositoryVersion(); + $version_number = phutil_tag( + 'a', + array( + 'href' => "/diffusion/pushlog/view/{$version_number}/", + ), + $version_number); + } else { + $version_number = '-'; + } + + if ($version && $version->getIsWriting()) { + $is_writing = id(new PHUIIconView()) + ->setIcon('fa-pencil green'); + } else { + $is_writing = id(new PHUIIconView()) + ->setIcon('fa-pencil grey'); + } + $rows[] = array( $binding_icon, phutil_tag( @@ -81,6 +112,8 @@ final class DiffusionRepositoryClusterManagementPanel 'href' => $device->getURI(), ), $device->getName()), + $version_number, + $is_writing, ); } } @@ -91,11 +124,15 @@ final class DiffusionRepositoryClusterManagementPanel array( null, pht('Device'), + pht('Version'), + pht('Writing'), )) ->setColumnClasses( array( null, - 'wide', + null, + null, + 'right wide', )); $doc_href = PhabricatorEnv::getDoclink('Cluster: Repositories'); diff --git a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php index e4eabc72ef..b138a2ef7d 100644 --- a/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitReceivePackSSHWorkflow.php @@ -21,8 +21,12 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { if ($this->shouldProxy()) { $command = $this->getProxyCommand(); + $is_proxy = true; } else { $command = csprintf('git-receive-pack %s', $repository->getLocalPath()); + $is_proxy = false; + + $repository->synchronizeWorkingCopyBeforeWrite(); } $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); @@ -41,6 +45,10 @@ final class DiffusionGitReceivePackSSHWorkflow extends DiffusionGitSSHWorkflow { $this->waitForGitClient(); } + if (!$is_proxy) { + $repository->synchronizeWorkingCopyAfterWrite(); + } + return $err; } diff --git a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php index 4812b960a0..926a477627 100644 --- a/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php +++ b/src/applications/diffusion/ssh/DiffusionGitUploadPackSSHWorkflow.php @@ -20,6 +20,7 @@ final class DiffusionGitUploadPackSSHWorkflow extends DiffusionGitSSHWorkflow { $command = $this->getProxyCommand(); } else { $command = csprintf('git-upload-pack -- %s', $repository->getLocalPath()); + $repository->synchronizeWorkingCopyBeforeRead(); } $command = PhabricatorDaemon::sudoCommandAsDaemonUser($command); diff --git a/src/applications/repository/storage/PhabricatorRepository.php b/src/applications/repository/storage/PhabricatorRepository.php index 7068657acc..498310e28f 100644 --- a/src/applications/repository/storage/PhabricatorRepository.php +++ b/src/applications/repository/storage/PhabricatorRepository.php @@ -3,6 +3,7 @@ /** * @task uri Repository URI Management * @task autoclose Autoclose + * @task sync Cluster Synchronization */ final class PhabricatorRepository extends PhabricatorRepositoryDAO implements @@ -62,6 +63,9 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO private $mostRecentCommit = self::ATTACHABLE; private $projectPHIDs = self::ATTACHABLE; + private $clusterWriteLock; + private $clusterWriteVersion; + public static function initializeNewRepository(PhabricatorUser $actor) { $app = id(new PhabricatorApplicationQuery()) ->setViewer($actor) @@ -2262,6 +2266,161 @@ final class PhabricatorRepository extends PhabricatorRepositoryDAO } +/* -( Cluster Synchronization )-------------------------------------------- */ + + + /** + * @task sync + */ + public function synchronizeWorkingCopyBeforeRead() { + $device = AlmanacKeys::getLiveDevice(); + if (!$device) { + return; + } + + $repository_phid = $this->getPHID(); + $device_phid = $device->getPHID(); + + $read_lock = PhabricatorRepositoryWorkingCopyVersion::getReadLock( + $repository_phid, + $device_phid); + + // TODO: Raise a more useful exception if we fail to grab this lock. + $read_lock->lock(phutil_units('2 minutes in seconds')); + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + $versions = mpull($versions, null, 'getDevicePHID'); + + $this_version = idx($versions, $device_phid); + if ($this_version) { + $this_version = (int)$this_version->getRepositoryVersion(); + } else { + $this_version = 0; + } + + if ($versions) { + $max_version = (int)max(mpull($versions, 'getRepositoryVersion')); + } else { + $max_version = 0; + } + + if ($max_version > $this_version) { + $fetchable = array(); + foreach ($versions as $version) { + if ($version->getRepositoryVersion() == $max_version) { + $fetchable[] = $version->getDevicePHID(); + } + } + + // TODO: Actualy fetch the newer version from one of the nodes which has + // it. + + PhabricatorRepositoryWorkingCopyVersion::updateVersion( + $repository_phid, + $device_phid, + $max_version); + } + + $read_lock->unlock(); + + return $max_version; + } + + + /** + * @task sync + */ + public function synchronizeWorkingCopyBeforeWrite() { + $device = AlmanacKeys::getLiveDevice(); + if (!$device) { + return; + } + + $repository_phid = $this->getPHID(); + $device_phid = $device->getPHID(); + + $write_lock = PhabricatorRepositoryWorkingCopyVersion::getWriteLock( + $repository_phid); + + // TODO: Raise a more useful exception if we fail to grab this lock. + $write_lock->lock(phutil_units('2 minutes in seconds')); + + $versions = PhabricatorRepositoryWorkingCopyVersion::loadVersions( + $repository_phid); + foreach ($versions as $version) { + if (!$version->getIsWriting()) { + continue; + } + + // TODO: This should provide more help so users can resolve the issue. + throw new Exception( + pht( + 'An incomplete write was previously performed to this repository; '. + 'refusing new writes.')); + } + + $max_version = $this->synchronizeWorkingCopyBeforeRead(); + + PhabricatorRepositoryWorkingCopyVersion::willWrite( + $repository_phid, + $device_phid); + + $this->clusterWriteVersion = $max_version; + $this->clusterWriteLock = $write_lock; + } + + + /** + * @task sync + */ + public function synchronizeWorkingCopyAfterWrite() { + if (!$this->clusterWriteLock) { + throw new Exception( + pht( + 'Trying to synchronize after write, but not holding a write '. + 'lock!')); + } + + $device = AlmanacKeys::getLiveDevice(); + if (!$device) { + throw new Exception( + pht( + 'Trying to synchronize after write, but this host is not an '. + 'Almanac device.')); + } + + $repository_phid = $this->getPHID(); + $device_phid = $device->getPHID(); + + // NOTE: This means we're still bumping the version when pushes fail. We + // could select only un-rejected events instead to bump a little less + // often. + + $new_log = id(new PhabricatorRepositoryPushEventQuery()) + ->setViewer(PhabricatorUser::getOmnipotentUser()) + ->withRepositoryPHIDs(array($repository_phid)) + ->setLimit(1) + ->executeOne(); + + $old_version = $this->clusterWriteVersion; + if ($new_log) { + $new_version = $new_log->getID(); + } else { + $new_version = $old_version; + } + + PhabricatorRepositoryWorkingCopyVersion::didWrite( + $repository_phid, + $device_phid, + $this->clusterWriteVersion, + $new_log->getID()); + + $this->clusterWriteLock->unlock(); + $this->clusterWriteLock = null; + } + + /* -( Symbols )-------------------------------------------------------------*/ public function getSymbolSources() { diff --git a/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php new file mode 100644 index 0000000000..00e74a3d61 --- /dev/null +++ b/src/applications/repository/storage/PhabricatorRepositoryWorkingCopyVersion.php @@ -0,0 +1,145 @@ + false, + self::CONFIG_COLUMN_SCHEMA => array( + 'repositoryVersion' => 'uint32', + 'isWriting' => 'bool', + ), + self::CONFIG_KEY_SCHEMA => array( + 'key_workingcopy' => array( + 'columns' => array('repositoryPHID', 'devicePHID'), + 'unique' => true, + ), + ), + ) + parent::getConfiguration(); + } + + public static function loadVersions($repository_phid) { + $version = new self(); + $conn_w = $version->establishConnection('w'); + $table = $version->getTableName(); + + // This is a normal read, but force it to come from the master. + $rows = queryfx_all( + $conn_w, + 'SELECT * FROM %T WHERE repositoryPHID = %s', + $table, + $repository_phid); + + return $version->loadAllFromArray($rows); + } + + public static function getReadLock($repository_phid, $device_phid) { + $repository_hash = PhabricatorHash::digestForIndex($repository_phid); + $device_hash = PhabricatorHash::digestForIndex($device_phid); + $lock_key = "repo.read({$repository_hash}, {$device_hash})"; + + return PhabricatorGlobalLock::newLock($lock_key); + } + + public static function getWriteLock($repository_phid) { + $repository_hash = PhabricatorHash::digestForIndex($repository_phid); + $lock_key = "repo.write({$repository_hash})"; + + return PhabricatorGlobalLock::newLock($lock_key); + } + + + /** + * Before a write, set the "isWriting" flag. + * + * This allows us to detect when we lose a node partway through a write and + * may have committed and acknowledged a write on a node that lost the lock + * partway through the write and is no longer reachable. + * + * In particular, if a node loses its connection to the datbase the global + * lock is released by default. This is a durable lock which stays locked + * by default. + */ + public static function willWrite($repository_phid, $device_phid) { + $version = new self(); + $conn_w = $version->establishConnection('w'); + $table = $version->getTableName(); + + queryfx( + $conn_w, + 'INSERT INTO %T + (repositoryPHID, devicePHID, repositoryVersion, isWriting) + VALUES + (%s, %s, %d, %d) + ON DUPLICATE KEY UPDATE + isWriting = VALUES(isWriting)', + $table, + $repository_phid, + $device_phid, + 1, + 1); + } + + + /** + * After a write, update the version and release the "isWriting" lock. + */ + public static function didWrite( + $repository_phid, + $device_phid, + $old_version, + $new_version) { + $version = new self(); + $conn_w = $version->establishConnection('w'); + $table = $version->getTableName(); + + queryfx( + $conn_w, + 'UPDATE %T SET repositoryVersion = %d, isWriting = 0 + WHERE + repositoryPHID = %s AND + devicePHID = %s AND + repositoryVersion = %d AND + isWriting = 1', + $table, + $new_version, + $repository_phid, + $device_phid, + $old_version); + } + + + /** + * After a fetch, set the local version to the fetched version. + */ + public static function updateVersion( + $repository_phid, + $device_phid, + $new_version) { + $version = new self(); + $conn_w = $version->establishConnection('w'); + $table = $version->getTableName(); + + queryfx( + $conn_w, + 'INSERT INTO %T + (repositoryPHID, devicePHID, repositoryVersion, isWriting) + VALUES + (%s, %s, %d, %d) + ON DUPLICATE KEY UPDATE + repositoryVersion = VALUES(repositoryVersion)', + $table, + $repository_phid, + $device_phid, + $new_version, + 0); + } + + +}