1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-11-17 12:22:42 +01:00

Hold recent messages in Aphlict so they can be replayed after clients reconnect

Summary:
Ref T12563. Before broadcasting messages from the server, store them in a history buffer.

A future change will let clients retrieve them.

Test Plan:
  - Used the web frontend to look at the buffer, reloaded over time, sent messages. Saw buffer size go up as I sent messages and fall after 60 seconds.
  - Set size to 4 messages, sent a bunch of messages, saw the buffer size max out at 4 messages.

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T12563

Differential Revision: https://secure.phabricator.com/D17707
This commit is contained in:
epriestley 2017-04-17 13:46:35 -07:00
parent 8fdc1bff5f
commit 88157a9442
2 changed files with 80 additions and 2 deletions

View file

@ -103,10 +103,20 @@ final class PhabricatorConfigClusterNotificationsController
new PhutilNumber(idx($details, 'messages.in')),
new PhutilNumber(idx($details, 'messages.out')));
if (idx($details, 'history.size')) {
$history = pht(
'%s Held / %sms',
new PhutilNumber(idx($details, 'history.size')),
new PhutilNumber(idx($details, 'history.age')));
} else {
$history = pht('No Messages');
}
} else {
$uptime = null;
$clients = null;
$stats = null;
$history = null;
}
$status_view = array(
@ -126,6 +136,7 @@ final class PhabricatorConfigClusterNotificationsController
$uptime,
$clients,
$stats,
$history,
$messages,
);
}
@ -143,6 +154,7 @@ final class PhabricatorConfigClusterNotificationsController
pht('Uptime'),
pht('Clients'),
pht('Messages'),
pht('History'),
null,
))
->setColumnClasses(
@ -155,6 +167,7 @@ final class PhabricatorConfigClusterNotificationsController
null,
null,
null,
null,
'wide',
));

View file

@ -17,6 +17,7 @@ JX.install('AphlictAdminServer', {
server.on('request', JX.bind(this, this._onrequest));
this._server = server;
this._clientServers = [];
this._messageHistory = [];
},
properties: {
@ -30,6 +31,7 @@ JX.install('AphlictAdminServer', {
_messagesOut: null,
_server: null,
_startTime: null,
_messageHistory: null,
getListenerLists: function(instance) {
var clients = this.getClientServers();
@ -121,14 +123,24 @@ JX.install('AphlictAdminServer', {
total_count += list.getTotalListenerCount();
}
var now = new Date().getTime();
var history_size = this._messageHistory.length;
var history_age = null;
if (history_size) {
history_age = (now - this._messageHistory[0].timestamp);
}
var server_status = {
'instance': instance,
'uptime': (new Date().getTime() - this._startTime),
'uptime': (now - this._startTime),
'clients.active': active_count,
'clients.total': total_count,
'messages.in': this._messagesIn,
'messages.out': this._messagesOut,
'version': 7
'version': 7,
'history.size': history_size,
'history.age': history_age
};
response.writeHead(200, {'Content-Type': 'application/json'});
@ -140,6 +152,16 @@ JX.install('AphlictAdminServer', {
* Transmits a message to all subscribed listeners.
*/
_transmit: function(instance, message, response) {
var now = new Date().getTime();
this._messageHistory.push(
{
timestamp: now,
message: message
});
this._purgeHistory();
var peer_list = this.getPeerList();
message = peer_list.addFingerprint(message);
@ -191,7 +213,50 @@ JX.install('AphlictAdminServer', {
error);
}
}
},
getHistory: function(min_age) {
var history = this._messageHistory;
var results = [];
for (var ii = 0; ii < history.length; ii++) {
if (history[ii].timestamp >= min_age) {
results.push(history[ii].message);
}
}
return results;
},
_purgeHistory: function() {
var messages = this._messageHistory;
// Maximum number of messages to retain.
var size_limit = 4096;
// Find the index of the first item we're going to keep. If we have too
// many items, this will be somewhere past the beginning of the list.
var keep = Math.max(0, messages.length - size_limit);
// Maximum number of milliseconds of history to retain.
var age_limit = 60000;
// Move the index forward until we find an item that is recent enough
// to retain.
var now = new Date().getTime();
var min_age = (now - age_limit);
for (keep; keep < messages.length; keep++) {
if (messages[keep].timestamp >= min_age) {
break;
}
}
// Throw away extra messages.
if (keep) {
this._messageHistory.splice(0, keep);
}
}
}
});