1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-12-22 13:30:55 +01:00

Modify the Aphlict server to transmit messages instead of broadcasting them.

Summary: Ref T4324. Ref T5284. This adds server-side support for keeping track of a set of PHIDs that the Aphlict clients have subscribed to. Instead of broadcasting a notification to all clients (after which the clients can poll `/notification/individual` in order to determine whether or not they are interested in the notification), transmit notifications only to clients that have subscribed to a PHID that is relevant to the notification.

Test Plan:
I opened up two clients on the same host (incognito tabs in Chrome). Here is the output from the server:

```
> sudo ./bin/aphlict debug
Starting Aphlict server in foreground...
Launching server:

    $ 'nodejs' '/usr/src/phabricator/src/applications/aphlict/management/../../../../support/aphlict/server/aphlict_server.js' --port='22280' --admin='22281' --host='localhost' --user='aphlict'

[Wed Jun 11 2014 19:10:27 GMT+0000 (UTC)] Started Server (PID 4546)
[Wed Jun 11 2014 19:10:36 GMT+0000 (UTC)] <FlashPolicy> Policy Request From ::ffff:192.168.1.1
[Wed Jun 11 2014 19:10:37 GMT+0000 (UTC)] <Listener/1> Connected from ::ffff:192.168.1.1
[Wed Jun 11 2014 19:10:37 GMT+0000 (UTC)] <Listener/1> Received data: {"command":"subscribe","data":["PHID-USER-cb5af6p4oepy5tlgqypi"]}
[Wed Jun 11 2014 19:10:37 GMT+0000 (UTC)] <Listener/1> Subscribed to: ["PHID-USER-cb5af6p4oepy5tlgqypi"]
[Wed Jun 11 2014 19:10:39 GMT+0000 (UTC)] <Listener/1> Received data: {"command":"subscribe","data":["PHID-USER-kfohe3ca5oe6ygykmioq"]}
[Wed Jun 11 2014 19:10:39 GMT+0000 (UTC)] <Listener/1> Subscribed to: ["PHID-USER-kfohe3ca5oe6ygykmioq"]
[Wed Jun 11 2014 19:10:42 GMT+0000 (UTC)] notification: {"key":"6023751084283587681","type":"notification","subscribers":["PHID-USER-cb5af6p4oepy5tlgqypi"]}
[Wed Jun 11 2014 19:10:42 GMT+0000 (UTC)] <Listener/1> Wrote Message
```

I verified (using the "Network" tab in Chrome) that an AJAX request to `/notification/individual/` was only made in the tab belonging to the user that triggered the test notification.

Reviewers: epriestley, #blessed_reviewers

Reviewed By: epriestley, #blessed_reviewers

Subscribers: epriestley, Korvin

Maniphest Tasks: T5284, T4324

Differential Revision: https://secure.phabricator.com/D9458
This commit is contained in:
Joshua Spence 2014-06-11 12:17:18 -07:00 committed by epriestley
parent 82f889c421
commit 84d259cea2
10 changed files with 257 additions and 45 deletions

View file

@ -8,7 +8,7 @@ return array(
'names' =>
array(
'core.pkg.css' => 'd82d2f53',
'core.pkg.js' => '88ca2043',
'core.pkg.js' => '4af4aa9d',
'darkconsole.pkg.js' => 'ca8671ce',
'differential.pkg.css' => '4a93db37',
'differential.pkg.js' => 'eca39a2c',
@ -334,9 +334,9 @@ return array(
'rsrc/image/texture/table_header.png' => '5c433037',
'rsrc/image/texture/table_header_hover.png' => '038ec3b9',
'rsrc/image/texture/table_header_tall.png' => 'd56b434f',
'rsrc/js/application/aphlict/Aphlict.js' => '493665ee',
'rsrc/js/application/aphlict/Aphlict.js' => '08be8878',
'rsrc/js/application/aphlict/behavior-aphlict-dropdown.js' => '2a2dba85',
'rsrc/js/application/aphlict/behavior-aphlict-listen.js' => '027c888a',
'rsrc/js/application/aphlict/behavior-aphlict-listen.js' => 'acda9f51',
'rsrc/js/application/auth/behavior-persona-login.js' => '9414ff18',
'rsrc/js/application/config/behavior-reorder-fields.js' => '938aed89',
'rsrc/js/application/conpherence/behavior-menu.js' => '7ee23816',
@ -477,7 +477,7 @@ return array(
'rsrc/js/phuix/PHUIXActionListView.js' => 'b5c256b8',
'rsrc/js/phuix/PHUIXActionView.js' => '6e8cefa4',
'rsrc/js/phuix/PHUIXDropdownMenu.js' => 'bd4c8dca',
'rsrc/swf/aphlict.swf' => 'f45c3edc',
'rsrc/swf/aphlict.swf' => 'd9bca85d',
),
'symbols' =>
array(
@ -525,10 +525,10 @@ return array(
'herald-rule-editor' => '22d2966a',
'herald-test-css' => '778b008e',
'inline-comment-summary-css' => '8cfd34e8',
'javelin-aphlict' => '493665ee',
'javelin-aphlict' => '08be8878',
'javelin-behavior' => '8a3ed18b',
'javelin-behavior-aphlict-dropdown' => '2a2dba85',
'javelin-behavior-aphlict-listen' => '027c888a',
'javelin-behavior-aphlict-listen' => 'acda9f51',
'javelin-behavior-aphront-basic-tokenizer' => 'b3a4b884',
'javelin-behavior-aphront-crop' => 'b98fc918',
'javelin-behavior-aphront-drag-and-drop-textarea' => '4a11ea9c',
@ -819,18 +819,6 @@ return array(
4 => 'javelin-vector',
5 => 'differential-inline-comment-editor',
),
'027c888a' =>
array(
0 => 'javelin-behavior',
1 => 'javelin-aphlict',
2 => 'javelin-stratcom',
3 => 'javelin-request',
4 => 'javelin-uri',
5 => 'javelin-dom',
6 => 'javelin-json',
7 => 'javelin-router',
8 => 'phabricator-notification',
),
'029a133d' =>
array(
0 => 'aphront-dialog-view-css',
@ -865,6 +853,11 @@ return array(
3 => 'javelin-vector',
4 => 'javelin-stratcom',
),
'08be8878' =>
array(
0 => 'javelin-install',
1 => 'javelin-util',
),
'08e56a4e' =>
array(
0 => 'javelin-install',
@ -1153,11 +1146,6 @@ return array(
2 => 'javelin-stratcom',
3 => 'phabricator-tooltip',
),
'493665ee' =>
array(
0 => 'javelin-install',
1 => 'javelin-util',
),
'4a11ea9c' =>
array(
0 => 'javelin-behavior',
@ -1620,6 +1608,18 @@ return array(
1 => 'javelin-dom',
2 => 'javelin-stratcom',
),
'acda9f51' =>
array(
0 => 'javelin-behavior',
1 => 'javelin-aphlict',
2 => 'javelin-stratcom',
3 => 'javelin-request',
4 => 'javelin-uri',
5 => 'javelin-dom',
6 => 'javelin-json',
7 => 'javelin-router',
8 => 'phabricator-notification',
),
'ad7a69ca' =>
array(
0 => 'javelin-install',

View file

@ -376,16 +376,23 @@ final class PhabricatorStandardPageView extends PhabricatorBarePageView {
$swf_uri = $response->getURI($map, 'rsrc/swf/aphlict.swf');
$enable_debug = PhabricatorEnv::getEnvConfig('notification.debug');
$subscriptions = $this->pageObjects;
if ($user) {
$subscriptions[] = $user->getPHID();
}
Javelin::initBehavior(
'aphlict-listen',
array(
'id' => $aphlict_object_id,
'containerID' => $aphlict_container_id,
'server' => $client_uri->getDomain(),
'port' => $client_uri->getPort(),
'debug' => $enable_debug,
'swfURI' => $swf_uri,
'pageObjects' => array_fill_keys($this->pageObjects, true),
'id' => $aphlict_object_id,
'containerID' => $aphlict_container_id,
'server' => $client_uri->getDomain(),
'port' => $client_uri->getPort(),
'debug' => $enable_debug,
'swfURI' => $swf_uri,
'pageObjects' => array_fill_keys($this->pageObjects, true),
'subscriptions' => $subscriptions,
));
$tail[] = phutil_tag(

View file

@ -2,6 +2,7 @@ package {
import flash.events.TimerEvent;
import flash.external.ExternalInterface;
import flash.utils.Dictionary;
import flash.utils.Timer;
@ -43,7 +44,11 @@ package {
{});
}
public function externalConnect(server:String, port:Number):void {
public function externalConnect(
server:String,
port:Number,
subscriptions:Array):void {
this.externalInvoke('connect');
this.remoteServer = server;
@ -56,6 +61,10 @@ package {
this.timer.addEventListener(TimerEvent.TIMER, this.keepalive);
this.connectToMaster();
// Send subscriptions to master.
this.log('Sending subscriptions to master.');
this.send.send('aphlict_master', 'subscribe', this.client, subscriptions);
}
/**

View file

@ -40,6 +40,11 @@ package {
*/
private var remotePort:Number;
/**
* A dictionary mapping PHID to subscribed clients.
*/
private var subscriptions:Dictionary;
private var socket:Socket;
private var readBuffer:ByteArray;
@ -50,12 +55,13 @@ package {
this.remoteServer = server;
this.remotePort = port;
this.clients = new Dictionary();
this.subscriptions = new Dictionary();
// Connect to the Aphlict Server.
this.recv.connect('aphlict_master');
this.connectToServer();
this.clients = new Dictionary();
// Start a timer and regularly purge dead clients.
this.timer = new Timer(AphlictMaster.PURGE_INTERVAL);
this.timer.addEventListener(TimerEvent.TIMER, this.purgeClients);
@ -116,6 +122,16 @@ package {
private function didConnectSocket(event:Event):void {
this.externalInvoke('connected');
// Send subscriptions
var phids = new Array();
for (var phid:String in this.subscriptions) {
phids.push(phid);
}
if (phids.length) {
this.sendSubscribeCommand(phids);
}
}
private function didCloseSocket(event:Event):void {
@ -130,6 +146,69 @@ package {
this.externalInvoke('error', event.text);
}
public function subscribe(client:String, phids:Array):void {
var newPHIDs = new Array();
for (var i:String in phids) {
var phid = phids[i];
if (!this.subscriptions[phid]) {
this.subscriptions[phid] = new Dictionary();
newPHIDs.push(phid);
}
this.subscriptions[phid][client] = true;
}
if (newPHIDs.length) {
this.sendSubscribeCommand(newPHIDs);
}
}
public function unsubscribe(client:String, phids:Array):void {
var oldPHIDs = new Array();
for (var phid:String in phids) {
if (!this.subscriptions[phid]) {
continue;
}
delete this.subscriptions[phid][client];
var empty = true;
for (var key:String in this.subscriptions[phid]) {
empty = false;
}
if (empty) {
delete this.subscriptions[phid];
oldPHIDs.push(phid);
}
}
if (oldPHIDs.length) {
this.sendUnsubscribeCommand(oldPHIDs);
}
}
private function sendSubscribeCommand(phids:Array):void {
var msg:Dictionary = new Dictionary();
msg['command'] = 'subscribe';
msg['data'] = phids;
this.log('Sending subscribe command to server.');
this.socket.writeUTF(vegas.strings.JSON.serialize(msg));
this.socket.flush();
}
private function sendUnsubscribeCommand(phids:Array):void {
var msg:Dictionary = new Dictionary();
msg['command'] = 'unsubscribe';
msg['data'] = phids;
this.log('Sending subscribe command to server.');
this.socket.writeUTF(vegas.strings.JSON.serialize(msg));
this.socket.flush();
}
private function didReceiveSocket(event:Event):void {
try {
var b:ByteArray = this.readBuffer;
@ -153,8 +232,22 @@ package {
// Send the message to all clients.
for (var client:String in this.clients) {
this.log('Sending message to client: ' + client);
this.send.send(client, 'receiveMessage', data);
var subscribed = false;
for (var i:String in data.subscribers) {
var phid = data.subscribers[i];
if (this.subscriptions[phid] &&
this.subscriptions[phid][client]) {
subscribed = true;
break;
}
}
if (subscribed) {
this.log('Sending message to client: ' + client);
this.send.send(client, 'receiveMessage', data);
}
}
} else {
break;

View file

@ -83,6 +83,61 @@ var send_server = net.createServer(function(socket) {
listener.getDescription(),
socket.remoteAddress);
var buffer = new Buffer([]);
var length = 0;
socket.on('data', function(data) {
buffer = Buffer.concat([buffer, new Buffer(data)]);
while (buffer.length) {
if (!length) {
length = buffer.readUInt16BE(0);
buffer = buffer.slice(2);
}
if (buffer.length < length) {
// We need to wait for the rest of the data.
return;
}
var message;
try {
message = JSON.parse(buffer.toString('utf8', 0, length));
} catch (err) {
debug.log('<%s> Received invalid data.', listener.getDescription());
continue;
} finally {
buffer = buffer.slice(length);
length = 0;
}
debug.log('<%s> Received data: %s',
listener.getDescription(),
JSON.stringify(message));
switch (message.command) {
case 'subscribe':
debug.log(
'<%s> Subscribed to: %s',
listener.getDescription(),
JSON.stringify(message.data));
listener.subscribe(message.data);
break;
case 'unsubscribe':
debug.log(
'<%s> Unsubscribed from: %s',
listener.getDescription(),
JSON.stringify(message.data));
listener.unsubscribe(message.data);
break;
default:
debug.log('<s> Unrecognized command.', listener.getDescription());
}
}
});
socket.on('close', function() {
clients.removeListener(listener);
debug.log('<%s> Disconnected', listener.getDescription());
@ -122,7 +177,7 @@ var receive_server = http.createServer(function(request, response) {
debug.log('notification: ' + JSON.stringify(msg));
++messages_in;
broadcast(msg);
transmit(msg);
response.writeHead(200, {'Content-Type': 'text/plain'});
} catch (err) {
@ -161,12 +216,16 @@ var receive_server = http.createServer(function(request, response) {
}).listen(config.admin, config.host);
function broadcast(data) {
var listeners = clients.getListeners();
for (var id in listeners) {
var listener = listeners[id];
function transmit(msg) {
var listeners = clients.getListeners().filter(function(client) {
return client.isSubscribedToAny(msg.subscribers);
});
for (var i = 0; i < listeners.length; i++) {
var listener = listeners[i];
try {
listener.writeMessage(data);
listener.writeMessage(msg);
++messages_out;
debug.log('<%s> Wrote Message', listener.getDescription());

View file

@ -9,11 +9,37 @@ JX.install('AphlictListener', {
members: {
_id: null,
_socket: null,
_subscriptions: {},
getID: function() {
return this._id;
},
subscribe: function(phids) {
for (var i = 0; i < phids.length; i++) {
var phid = phids[i];
this._subscriptions[phid] = true;
}
return this;
},
unsubscribe: function(phids) {
for (var i = 0; i < phids.length; i++) {
var phid = phids[i];
delete this._subscriptions[phid];
}
return this;
},
isSubscribedToAny: function(phids) {
var intersection = phids.filter(function(phid) {
return phid in this._subscriptions;
}, this);
return intersection.length > 0;
},
getSocket: function() {
return this._socket;
},

View file

@ -31,7 +31,14 @@ JX.install('AphlictListenerList', {
},
getListeners: function() {
return this._listeners;
var keys = Object.keys(this._listeners);
var listeners = [];
for (var i = 0; i < keys.length; i++) {
listeners.push(this._listeners[keys[i]]);
}
return listeners;
},
getActiveListenerCount: function() {

View file

@ -25,7 +25,7 @@
*/
JX.install('Aphlict', {
construct : function(id, server, port) {
construct : function(id, server, port, subscriptions) {
if (__DEV__) {
if (JX.Aphlict._instance) {
JX.$E('Aphlict object is sort of a singleton..!');
@ -36,6 +36,7 @@ JX.install('Aphlict', {
this._server = server;
this._port = port;
this._subscriptions = subscriptions;
// Flash puts its "objects" into global scope in an inconsistent way,
// because it was written in like 1816 when globals were awesome and IE4
@ -48,8 +49,12 @@ JX.install('Aphlict', {
members : {
_server : null,
_port : null,
_subscriptions : null,
start : function() {
this._flashContainer.connect(this._server, this._port);
this._flashContainer.connect(
this._server,
this._port,
this._subscriptions);
}
},

View file

@ -16,7 +16,13 @@ JX.behavior('aphlict-listen', function(config) {
var showing_reload = false;
function onready() {
var client = new JX.Aphlict(config.id, config.server, config.port)
var client = new JX.Aphlict(
config.id,
config.server,
config.port,
config.subscriptions);
client
.setHandler(onaphlictmessage)
.start();
}

Binary file not shown.