1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2025-01-10 14:51:06 +01:00
phorge-phorge/support/aphlict/server/lib/AphlictAdminServer.js
epriestley bbb321395a Support Aphlict clustering
Summary:
Ref T6915. This allows multiple notification servers to talk to each other:

  - Every server has a list of every other server, including itself.
  - Every server generates a unique fingerprint at startup, like "XjeHuPKPBKHUmXkB".
  - Every time a server gets a message, it marks it with its personal fingerprint, then sends it to every other server.
  - Servers do not retransmit messages that they've already seen (already marked with their fingerprint).
  - Servers learn other servers' fingerprints after they send them a message, and stop sending them messages they've already seen.

This is pretty crude, and the first message to a cluster will transmit N^2 times, but N is going to be like 3 or 4 in even the most extreme cases for a very long time.

The fingerprinting stops cycles, and stops servers from sending themselves copies of messages.

We don't need to do anything more sophisticated than this because it's fine if some notifications get lost when a server dies. Clients will reconnect after a short period of time and life will continue.

Test Plan:
  - Wrote two server configs.
  - Started two servers.
  - Told Phabricator about all four services.
  - Loaded Chrome and Safari.
  - Saw them connect to different servers.
  - Sent messages in one, got notifications in the other (magic!).
  - Saw the fingerprinting stuff work on the console, no infinite retransmission of messages, etc.

(This pretty much just worked when I ran it the first time so I probably missed something?)

{F1218835}

Reviewers: chad

Reviewed By: chad

Maniphest Tasks: T6915

Differential Revision: https://secure.phabricator.com/D15711
2016-04-14 13:26:30 -07:00

197 lines
5.1 KiB
JavaScript

'use strict';
var JX = require('./javelin').JX;
require('./AphlictListenerList');
var http = require('http');
var url = require('url');
JX.install('AphlictAdminServer', {
construct: function(server) {
this._startTime = new Date().getTime();
this._messagesIn = 0;
this._messagesOut = 0;
server.on('request', JX.bind(this, this._onrequest));
this._server = server;
this._clientServers = [];
},
properties: {
clientServers: null,
logger: null,
peerList: null
},
members: {
_messagesIn: null,
_messagesOut: null,
_server: null,
_startTime: null,
getListenerLists: function(instance) {
var clients = this.getClientServers();
var lists = [];
for (var ii = 0; ii < clients.length; ii++) {
lists.push(clients[ii].getListenerList(instance));
}
return lists;
},
log: function() {
var logger = this.getLogger();
if (!logger) {
return;
}
logger.log.apply(logger, arguments);
return this;
},
listen: function() {
return this._server.listen.apply(this._server, arguments);
},
_onrequest: function(request, response) {
var self = this;
var u = url.parse(request.url, true);
var instance = u.query.instance || 'default';
// Publishing a notification.
if (u.pathname == '/') {
if (request.method == 'POST') {
var body = '';
request.on('data', function(data) {
body += data;
});
request.on('end', function() {
try {
var msg = JSON.parse(body);
self.log(
'Received notification (' + instance + '): ' +
JSON.stringify(msg));
++self._messagesIn;
try {
self._transmit(instance, msg, response);
} catch (err) {
self.log(
'<%s> Internal Server Error! %s',
request.socket.remoteAddress,
err);
response.writeHead(500, 'Internal Server Error');
}
} catch (err) {
self.log(
'<%s> Bad Request! %s',
request.socket.remoteAddress,
err);
response.writeHead(400, 'Bad Request');
} finally {
response.end();
}
});
} else {
response.writeHead(405, 'Method Not Allowed');
response.end();
}
} else if (u.pathname == '/status/') {
this._handleStatusRequest(request, response, instance);
} else {
response.writeHead(404, 'Not Found');
response.end();
}
},
_handleStatusRequest: function(request, response, instance) {
var active_count = 0;
var total_count = 0;
var lists = this.getListenerLists(instance);
for (var ii = 0; ii < lists.length; ii++) {
var list = lists[ii];
active_count += list.getActiveListenerCount();
total_count += list.getTotalListenerCount();
}
var server_status = {
'instance': instance,
'uptime': (new Date().getTime() - this._startTime),
'clients.active': active_count,
'clients.total': total_count,
'messages.in': this._messagesIn,
'messages.out': this._messagesOut,
'version': 7
};
response.writeHead(200, {'Content-Type': 'application/json'});
response.write(JSON.stringify(server_status));
response.end();
},
/**
* Transmits a message to all subscribed listeners.
*/
_transmit: function(instance, message, response) {
var peer_list = this.getPeerList();
message = peer_list.addFingerprint(message);
if (message) {
var lists = this.getListenerLists(instance);
for (var ii = 0; ii < lists.length; ii++) {
var list = lists[ii];
var listeners = list.getListeners();
this._transmitToListeners(list, listeners, message);
}
peer_list.broadcastMessage(instance, message);
}
// Respond to the caller with our fingerprint so it can stop sending
// us traffic we don't need to know about if it's a peer. In particular,
// this stops us from broadcasting messages to ourselves if we appear
// in the cluster list.
var receipt = {
fingerprint: this.getPeerList().getFingerprint()
};
response.writeHead(200, {'Content-Type': 'application/json'});
response.write(JSON.stringify(receipt));
},
_transmitToListeners: function(list, listeners, message) {
for (var ii = 0; ii < listeners.length; ii++) {
var listener = listeners[ii];
if (!listener.isSubscribedToAny(message.subscribers)) {
continue;
}
try {
listener.writeMessage(message);
++this._messagesOut;
this.log(
'<%s> Wrote Message',
listener.getDescription());
} catch (error) {
list.removeListener(listener);
this.log(
'<%s> Write Error: %s',
listener.getDescription(),
error);
}
}
}
}
});