1
0
Fork 0
mirror of https://we.phorge.it/source/phorge.git synced 2024-11-18 04:42:40 +01:00

Modify the Aphlict client to use LocalConnection.

Summary:
Ref T4324. Currently, an Aphlict client (with a corresponding connection to the Aphlict Server) is created for every tab that a user has open. This significantly affects the scalability of Aphlict as a service. Instead, we can use `LocalConnection` instances to coordinate the communication of multiple Aphlict clients to the server.

Similar functionality existed prior to D2704, but was removed as the author was not able to get this functionality working as intended. It seems that the main issue with the initial attempt was the use of the `setTimeout` function, which seemed to be a blocking call which prevented messages from being received. I have instead used an event-based model using a `Timer` object.

Roughly this works as follows:

# The first instance will create an `AphlictClient` and an `AphlictMaster`. The `AphlictClient` will register itself with the `AphlictMaster` and will consequently be notified of incoming messages.
# The `AphlictClient` is then responsible for pinging the `AphlictMaster` at regular intervals. If the client does not ping the master in a given period of time, the master will assume that the client is dead and will remove the client from the pool.
# Similarly, the `AphlictMaster` is required to respond to pings with a "pong" response. The pong response lets the clients know that the `AphlictMaster` is still alive. If the clients do not receive a pong in a given period of time, then the clients will attempt to spawn a new master.

Test Plan: I have tested this on our Phabricator install with a few tabs opened and inspecting the console output. I will upload a screencast of my test results.

Reviewers: #blessed_reviewers, epriestley

Reviewed By: #blessed_reviewers, epriestley

Subscribers: epriestley, Korvin

Maniphest Tasks: T4324

Differential Revision: https://secure.phabricator.com/D9327
This commit is contained in:
Joshua Spence 2014-05-29 07:04:12 -07:00 committed by epriestley
parent 7f2b6412a2
commit a42ec32c98
5 changed files with 315 additions and 106 deletions

View file

@ -9,20 +9,13 @@ if [ -z "$MXMLC" ]; then
fi;
set -e
set -x
# cp -R $ROOT/externals/vegas/src $BASEDIR/src/vegas
(cd $BASEDIR && $MXMLC \
-output aphlict.swf \
$MXMLC \
-output=$ROOT/webroot/rsrc/swf/aphlict.swf \
-default-background-color=0x444444 \
-default-size=500,500 \
-warnings=true \
-debug=true \
-source-path=$ROOT/externals/vegas/src \
-static-link-runtime-shared-libraries=true \
src/Aphlict.as)
mv $BASEDIR/aphlict.swf $ROOT/webroot/rsrc/swf/aphlict.swf
# -target-player=10.2.0 \
$BASEDIR/src/AphlictClient.as

View file

@ -1,117 +1,38 @@
package {
import flash.net.*;
import flash.utils.*;
import flash.media.*;
import flash.display.*;
import flash.events.*;
import flash.display.Sprite;
import flash.external.ExternalInterface;
import flash.net.LocalConnection;
import vegas.strings.JSON;
public class Aphlict extends Sprite {
private var client:String;
/**
* A transport channel used to receive data.
*/
protected var recv:LocalConnection;
private var socket:Socket;
private var readBuffer:ByteArray;
/**
* A transport channel used to send data.
*/
protected var send:LocalConnection;
private var remoteServer:String;
private var remotePort:Number;
public function Aphlict() {
super();
ExternalInterface.addCallback('connect', this.externalConnect);
ExternalInterface.call(
'JX.Stratcom.invoke',
'aphlict-component-ready',
null,
{});
this.recv = new LocalConnection();
this.recv.client = this;
this.send = new LocalConnection();
}
public function externalConnect(server:String, port:Number):void {
this.externalInvoke('connect');
this.remoteServer = server;
this.remotePort = port;
this.connectToServer();
}
public function connectToServer():void {
var socket:Socket = new Socket();
socket.addEventListener(Event.CONNECT, didConnectSocket);
socket.addEventListener(Event.CLOSE, didCloseSocket);
socket.addEventListener(ProgressEvent.SOCKET_DATA, didReceiveSocket);
socket.addEventListener(IOErrorEvent.IO_ERROR, didIOErrorSocket);
socket.addEventListener(
SecurityErrorEvent.SECURITY_ERROR,
didSecurityErrorSocket);
socket.connect(this.remoteServer, this.remotePort);
this.readBuffer = new ByteArray();
this.socket = socket;
}
private function didConnectSocket(event:Event):void {
this.externalInvoke('connected');
}
private function didCloseSocket(event:Event):void {
this.externalInvoke('close');
}
private function didIOErrorSocket(event:IOErrorEvent):void {
this.externalInvoke('error', event.text);
}
private function didSecurityErrorSocket(event:SecurityErrorEvent):void {
this.externalInvoke('error', event.text);
}
private function didReceiveSocket(event:Event):void {
var b:ByteArray = this.readBuffer;
this.socket.readBytes(b, b.length);
do {
b = this.readBuffer;
b.position = 0;
if (b.length <= 8) {
break;
}
var msg_len:Number = parseInt(b.readUTFBytes(8), 10);
if (b.length >= msg_len + 8) {
var bytes:String = b.readUTFBytes(msg_len);
var data:Object = vegas.strings.JSON.deserialize(bytes);
var t:ByteArray = new ByteArray();
t.writeBytes(b, msg_len + 8);
this.readBuffer = t;
this.receiveMessage(data);
} else {
break;
}
} while (true);
}
public function receiveMessage(msg:Object):void {
this.externalInvoke('receive', msg);
}
public function externalInvoke(type:String, object:Object = null):void {
protected function externalInvoke(type:String, object:Object = null):void {
ExternalInterface.call('JX.Aphlict.didReceiveEvent', type, object);
}
public function log(message:String):void {
ExternalInterface.call('console.log', message);
protected function log(message:String):void {
this.externalInvoke('log', message);
}
}

View file

@ -0,0 +1,129 @@
package {
import flash.events.TimerEvent;
import flash.external.ExternalInterface;
import flash.utils.Timer;
public class AphlictClient extends Aphlict {
/**
* The connection name for this client. This will be used for the
* @{class:LocalConnection} object.
*/
private var client:String;
/**
* The expiry timestamp for the @{class:AphlictMaster}. If this time is
* elapsed then the master will be assumed to be dead and another
* @{class:AphlictClient} will create a master.
*/
private var expiry:Number = 0;
/**
* The interval at which to ping the @{class:AphlictMaster}.
*/
public static const INTERVAL:Number = 3000;
private var master:AphlictMaster;
private var timer:Timer;
private var remoteServer:String;
private var remotePort:Number;
public function AphlictClient() {
super();
ExternalInterface.addCallback('connect', this.externalConnect);
ExternalInterface.call(
'JX.Stratcom.invoke',
'aphlict-component-ready',
null,
{});
}
public function externalConnect(server:String, port:Number):void {
this.externalInvoke('connect');
this.remoteServer = server;
this.remotePort = port;
this.client = AphlictClient.generateClientId();
this.recv.connect(this.client);
this.timer = new Timer(AphlictClient.INTERVAL);
this.timer.addEventListener(TimerEvent.TIMER, this.keepalive);
this.connectToMaster();
}
/**
* Generate a unique identifier that will be used to communicate with the
* @{class:AphlictMaster}.
*/
private static function generateClientId():String {
return 'aphlict_client_' + Math.round(Math.random() * 100000);
}
/**
* Create a new connection to the @{class:AphlictMaster}.
*
* If there is no current @{class:AphlictMaster} instance, then a new master
* will be created.
*/
private function connectToMaster():void {
this.timer.stop();
// Try to become the master.
try {
this.log('Attempting to become the master...');
this.master = new AphlictMaster(this.remoteServer, this.remotePort);
this.log('I am the master.');
} catch (x:Error) {
// Couldn't become the master
this.log('Cannot become the master... probably one already exists');
}
this.send.send('aphlict_master', 'register', this.client);
this.expiry = new Date().getTime() + (5 * AphlictClient.INTERVAL);
this.log('Registered client ' + this.client);
this.timer.start();
}
/**
* Send a keepalive signal to the @{class:AphlictMaster}.
*
* If the connection to the master has expired (because the master has not
* sent a heartbeat signal), then a new connection to master will be
* created.
*/
private function keepalive(event:TimerEvent):void {
if (new Date().getTime() > this.expiry) {
this.connectToMaster();
}
this.send.send('aphlict_master', 'ping', this.client);
}
/**
* This function is used to receive the heartbeat signal from the
* @{class:AphlictMaster}.
*/
public function pong():void {
this.expiry = new Date().getTime() + (2 * AphlictClient.INTERVAL);
}
/**
* Receive a message from the Aphlict Server, via the
* @{class:AphlictMaster}.
*/
public function receiveMessage(msg:Object):void {
this.log('Received message.');
this.externalInvoke('receive', msg);
}
}
}

View file

@ -0,0 +1,166 @@
package {
import flash.events.Event;
import flash.events.IOErrorEvent;
import flash.events.ProgressEvent;
import flash.events.SecurityErrorEvent;
import flash.events.TimerEvent;
import flash.net.Socket;
import flash.utils.ByteArray;
import flash.utils.Dictionary;
import flash.utils.Timer;
import vegas.strings.JSON;
public class AphlictMaster extends Aphlict {
/**
* The pool of connected clients.
*/
private var clients:Dictionary;
/**
* A timer used to trigger periodic events.
*/
private var timer:Timer;
/**
* The interval after which clients will be considered dead and removed
* from the pool.
*/
public static const PURGE_INTERVAL:Number = 3 * AphlictClient.INTERVAL;
/**
* The hostname for the Aphlict Server.
*/
private var remoteServer:String;
/**
* The port number for the Aphlict Server.
*/
private var remotePort:Number;
private var socket:Socket;
private var readBuffer:ByteArray;
public function AphlictMaster(server:String, port:Number) {
super();
this.remoteServer = server;
this.remotePort = port;
// Connect to the Aphlict Server.
this.recv.connect('aphlict_master');
this.connectToServer();
this.clients = new Dictionary();
// Start a timer and regularly purge dead clients.
this.timer = new Timer(AphlictMaster.PURGE_INTERVAL);
this.timer.addEventListener(TimerEvent.TIMER, this.purgeClients);
this.timer.start();
}
/**
* Register a @{class:AphlictClient}.
*/
public function register(client:String):void {
if (!this.clients[client]) {
this.log('Registering client: ' + client);
this.clients[client] = new Date().getTime();
}
}
/**
* Purge stale client connections from the client pool.
*/
private function purgeClients(event:TimerEvent):void {
for (var client:String in this.clients) {
var checkin:Number = this.clients[client];
if (new Date().getTime() - checkin > AphlictMaster.PURGE_INTERVAL) {
this.log('Purging client: ' + client);
delete this.clients[client];
}
}
}
/**
* Clients will regularly "ping" the master to let us know that they are
* still alive. We will "pong" them back to let the client know that the
* master is still alive.
*/
public function ping(client:String):void {
this.clients[client] = new Date().getTime();
this.send.send(client, 'pong');
}
private function connectToServer():void {
var socket:Socket = new Socket();
socket.addEventListener(Event.CONNECT, didConnectSocket);
socket.addEventListener(Event.CLOSE, didCloseSocket);
socket.addEventListener(ProgressEvent.SOCKET_DATA, didReceiveSocket);
socket.addEventListener(IOErrorEvent.IO_ERROR, didIOErrorSocket);
socket.addEventListener(
SecurityErrorEvent.SECURITY_ERROR,
didSecurityErrorSocket);
socket.connect(this.remoteServer, this.remotePort);
this.readBuffer = new ByteArray();
this.socket = socket;
}
private function didConnectSocket(event:Event):void {
this.externalInvoke('connected');
}
private function didCloseSocket(event:Event):void {
this.externalInvoke('close');
}
private function didIOErrorSocket(event:IOErrorEvent):void {
this.externalInvoke('error', event.text);
}
private function didSecurityErrorSocket(event:SecurityErrorEvent):void {
this.externalInvoke('error', event.text);
}
private function didReceiveSocket(event:Event):void {
var b:ByteArray = this.readBuffer;
this.socket.readBytes(b, b.length);
do {
b = this.readBuffer;
b.position = 0;
if (b.length <= 8) {
break;
}
var msg_len:Number = parseInt(b.readUTFBytes(8), 10);
if (b.length >= msg_len + 8) {
var bytes:String = b.readUTFBytes(msg_len);
var data:Object = vegas.strings.JSON.deserialize(bytes);
var t:ByteArray = new ByteArray();
t.writeBytes(b, msg_len + 8);
this.readBuffer = t;
// Send the message to all clients.
for (var client:String in this.clients) {
this.log('Sending message to client: ' + client);
this.send.send(client, 'receiveMessage', data);
}
} else {
break;
}
} while (true);
}
}
}

Binary file not shown.