diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c64a7bd --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.actionScriptProperties +.flexLibProperties +.project +.settings/* +.metadata + +sample/node_modules +sample/npm-debug.log +sample/client.swf diff --git a/.gitmodules b/.gitmodules index bdf558e..e69eaaf 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ -[submodule "library/support/web-socket-js"] - path = library/support/web-socket-js - url = git://github.com/simb/web-socket-js.git -[submodule "support/web-socket-js"] - path = support/web-socket-js - url = git://github.com/simb/web-socket-js.git +[submodule "libs/AS3WebSocket"] + path = libs/AS3WebSocket + url = https://github.com/chatziko/AS3WebSocket.git diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..7d98310 --- /dev/null +++ b/Makefile @@ -0,0 +1,24 @@ +find = $(foreach dir,$(1),$(foreach d,$(wildcard $(dir)/*),$(call find,$(d),$(2))) $(wildcard $(dir)/$(strip $(2)))) + +FLEX_HOME?=/usr/local/apache-flex-sdk + +all: bin/Flash-Socket.IO.swc +sample: sample/client.swf + +bin/Flash-Socket.IO.swc: $(call find, src, *.as) + @mkdir -p bin + $(FLEX_HOME)/bin/compc \ + --source-path=./src \ + --source-path=./libs/AS3WebSocket/AS3WebSocket/src \ + --library-path=./libs \ + --include-classes=com.pnwrain.flashsocket.FlashSocket \ + --debug=true \ + --output=$@ + +sample/client.swf: sample/client.as bin/Flash-Socket.IO.swc + $(FLEX_HOME)/bin/mxmlc \ + --library-path=bin/Flash-Socket.IO.swc \ + --debug=true \ + --output=$@ \ + sample/client.as + diff --git a/README.md b/README.md index 46d5fe5..228fa0b 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,55 @@ # FlashSocket.IO -Flash library to facilitate communication between Flex applications and Socket.IO servers. +Flash client for [Socket.IO](http://socket.io/) version 1.0 and above. Connects +solely through WebSocket and supports binary data and native TLS (through +[SecureSocket](http://help.adobe.com/en_US/FlashPlatform/reference/actionscript/3/flash/net/SecureSocket.html)). -The actual websocket communication is taken care of by my fork of gimite/web-socket-js project. +## Notes for this fork -This project wraps that and facilitates the hearbeat and en/decoding of messages so they work with Socket.IO servers +This fork is based on +[redannick/FlashSocket.IO](https://github.com/redannick/FlashSocket.IO) (which +itself is based on +[jimib/FlashSocket.IO](https://github.com/jimib/FlashSocket.IO)) and contains +several improvements and bugfixes: -# Checkout + * support for both polling and websockets + * upgrade support (by default connect with polling and upgrade to websocket later) + * support for sending/receiving binary data (as + [ByteArray](http://help.adobe.com/en_US/FlashPlatform/reference/actionscript/3/flash/utils/ByteArray.html)) + * use [my fork](https://github.com/chatziko/AS3WebSocket) of + [AS3WebSocket](https://github.com/theturtle32/AS3WebSocket), with the following improvements: + * use native SecureSocket for TLS (faster, more sucure, reduces swc size by 86%) + * limited support for [permessage-deflate](https://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-19) compression + (accept compressed messages from the server, although the client never compresses itself) + * separate engine.io client code + * add ERROR event for handling server-side errors + * bugfix: propertly add callback to messages for sending ACK + * bugfix: properly clean ACK callbacks + * bugfix: emit buffered message after dispatching the CONNECT event + * bugfix: stop heartbeat timer when socket abruptly closes + * bugfix: dispatch DISCONNECT when connecting is manually close()ed + * use native [JSON](http://help.adobe.com/en_US/FlashPlatform/reference/actionscript/3/JSON.html) + * remove CLOSE event (unclear semantics), use DISCONNECT instead + * add destroy() method (similar to the js socket.io-client) -Because this project makes use of git submodules you must make use of the recursive clone. +Tested with Socket.IO 1.4. + +## Sample client and server + +A sample [client](./sample/client.as) and [server](./sample/server.js) are provided. +To try them: + + * Install dependencies and start server + ``` + cd sample + npm install + npm start + ``` + + * Compile the client, eg with ```mxmlc```: + ``` + mxmlc --library-path=bin/Flash-Socket.IO.swc sample/client.as + ``` + + * Open [http://localhost:3000/](http://localhost:3000/) -git clone --recursive git://github.com/simb/FlashSocket.IO.git \ No newline at end of file diff --git a/bin/Flash-Socket.IO.swc b/bin/Flash-Socket.IO.swc new file mode 100644 index 0000000..a07bbd4 Binary files /dev/null and b/bin/Flash-Socket.IO.swc differ diff --git a/libs/AS3WebSocket b/libs/AS3WebSocket new file mode 160000 index 0000000..7f6e6cb --- /dev/null +++ b/libs/AS3WebSocket @@ -0,0 +1 @@ +Subproject commit 7f6e6cb2fa8d498b86a3205336ab2037ee853d8e diff --git a/libs/as3corelib.swc b/libs/as3corelib.swc old mode 100644 new mode 100755 index 58928ca..68477f1 Binary files a/libs/as3corelib.swc and b/libs/as3corelib.swc differ diff --git a/sample/client.as b/sample/client.as new file mode 100644 index 0000000..3f1d0fc --- /dev/null +++ b/sample/client.as @@ -0,0 +1,88 @@ +package { + +import flash.display.Sprite; +import flash.external.ExternalInterface; +import flash.text.TextField; +import flash.utils.ByteArray; + +import com.pnwrain.flashsocket.FlashSocket; +import com.pnwrain.flashsocket.events.FlashSocketEvent; + + +public class client extends Sprite { + + public var socket:FlashSocket; + public var txt:TextField; + + // log in the TextField, via trace, and with console.log + // + public function log(...args):void { + if(socket) + args.unshift(socket.transport) + + txt.appendText(args.join(' ') + "\n") + + trace(args.join(' ')) + + if(ExternalInterface.available) { + args.unshift('console.log'); + ExternalInterface.call.apply(ExternalInterface, args) + } + } + + + public function client() { + txt = new TextField(); + txt.width = 1000; + txt.height = 1000; + addChild(txt); + + FlashSocket.debug = true; + + // connect to the same url as the page we're in + var url:String = ExternalInterface.call("window.location.href.toString"); + log("connecting to: " + url); + + socket = new FlashSocket(url); + + socket.addEventListener(FlashSocketEvent.CONNECT, function(e:FlashSocketEvent):void { + log("connected"); + + log("sending bar") + socket.emit('bar', 'foo', function(ba:ByteArray):void { + log('bar: got back ByteArray: ', ba[0], ba[1]); + }); + }); + + socket.addEventListener('foo', function(e:FlashSocketEvent):void { + var s:String = e.data[0]; + var cb:Function = e.data[1]; + + log("got 'foo' from server with data: " + s); + log("sending back ByteArray with 2 bytes"); + + var ba:ByteArray = new ByteArray(); + ba[0] = 1; + ba[1] = 2; + cb(ba); + }) + + socket.addEventListener(FlashSocketEvent.DISCONNECT, function(e:FlashSocketEvent):void { + log("disconnect"); + }); + socket.addEventListener(FlashSocketEvent.SECURITY_ERROR, function(e:FlashSocketEvent):void { + log("security error"); + }); + socket.addEventListener(FlashSocketEvent.CONNECT_ERROR, function(e:FlashSocketEvent):void { + log("connect error"); + }); + socket.addEventListener(FlashSocketEvent.IO_ERROR, function(e:FlashSocketEvent):void { + log("io error"); + }); + socket.addEventListener(FlashSocketEvent.ERROR, function(e:FlashSocketEvent):void { + log("error"); + }); + } + +} // class +} // package diff --git a/sample/client.html b/sample/client.html new file mode 100644 index 0000000..ca229d4 --- /dev/null +++ b/sample/client.html @@ -0,0 +1,15 @@ + + + +
+ + + + + + diff --git a/sample/package.json b/sample/package.json new file mode 100644 index 0000000..8926853 --- /dev/null +++ b/sample/package.json @@ -0,0 +1,10 @@ +{ + "name": "flashsocket-sample", + "version": "0.1.0", + "description": "A sample server to connect using FlashSocket.IO", + "dependencies": { + "express": "^4.13.4", + "policyfile": "chatziko/FlashPolicyFileServer", + "socket.io": "^1.4.0" + } +} diff --git a/sample/server.js b/sample/server.js new file mode 100755 index 0000000..eee192e --- /dev/null +++ b/sample/server.js @@ -0,0 +1,75 @@ + +var fs = require('fs') +var http = require('http') +var https = require('https') +var express = require('express') +var policyfile = require('policyfile') +var ioServer = require('socket.io') + + +// CONFIGURATION. Set key/cert to enable https +var httpPort = 3000 +var httpsPort = 3001 +var key // = "...path to key file..." +var cert // = "...path to cert file..." + +// basic express app to serve client.html and client.swf +var expressApp = express() +expressApp.use('/', express.static('.', { index: 'client.html' })) + +// http server +var masterHttp = http.Server(expressApp) +masterHttp.listen(httpPort, function() { + console.log('listening to port', httpPort) + console.log('open http://localhost:'+httpPort+'/ in your browser') +}) + +// https server (if key/cert are set) +if(cert) { + var credentials = { + key: fs.readFileSync(key), + cert: fs.readFileSync(cert), + } + var masterHttps = https.Server(credentials, expressApp) + masterHttps.listen(httpsPort) +} + +// serve policy file on httpPort, and also on port 843 if we can +var canOpenLowPorts = !process.getuid || process.getuid() == 0 +var policyPort = canOpenLowPorts ? 843 : -1 + +policyfile.createServer().listen(policyPort, masterHttp) +if(cert) + policyfile.createServer().listen(-1, masterHttps) + +// socket.io server +var io = new ioServer(masterHttp, { + pingInterval: 5000, + pingTimeout: 5000, +}) +if(cert) + io.attach(masterHttps) + +io.on('connection', function(socket) { + console.log('client connected') + + setTimeout(function() { + console.log('sending foo') + socket.emit('foo', 'bar', function(buf) { + console.log('foo: got back', buf) + }) + }, 1000) + + socket.on('bar', function(s, cb) { + console.log("got 'bar' from client with data: " + s) + console.log("sending back Buffer with 2 bytes") + + cb(new Buffer([1, 2])) + }) + + socket.on('disconnect', function() { + console.log('client disconnected'); + }) +}) + + diff --git a/src/com/pnwrain/flashsocket/Engine.as b/src/com/pnwrain/flashsocket/Engine.as new file mode 100644 index 0000000..8a5c88a --- /dev/null +++ b/src/com/pnwrain/flashsocket/Engine.as @@ -0,0 +1,398 @@ +package com.pnwrain.flashsocket +{ + import flash.utils.setTimeout; + import flash.utils.clearTimeout; + + import com.pnwrain.flashsocket.events.FlashSocketEvent; + import com.pnwrain.flashsocket.events.EventEmitter; + + /////////////////////////// engine.io ////////////////////////////// + // + // This class handles the lower-level (engine.io) communication. + // They closely ressemble the code of the js client (engine.io-client/lib/socket.js) + // + public class Engine extends EventEmitter { + + private var opts:Object; + + private var pingInterval:int; + private var pingIntervalTimer:int; + private var pingTimeout:int; + private var pingTimeoutTimer:int; + + private var writeBuffer:Array = []; + private var prevBufferLen:int = 0; + + public var upgrading:Boolean = false; + public var readyState:String; + + public var transport:Transport; + private var upgrades:Array; + + + public function Engine(popts:Object) { + opts = popts; + opts.transports = opts.transports ? opts.transports.concat() : ['polling', 'websocket']; + opts.upgrade = opts.upgrade !== false; + + open(); + } + + private function open():void { + if(!opts.transports.length) + throw new Error('no transports'); + + readyState = 'opening'; + + var transport:Transport = Transport.create(opts.transports[0], opts); + transport.open(); + setTransport(transport); + }; + + private function setTransport(newtran:Transport):void { + FlashSocket.log('setting transport ' + newtran.name); + + if(transport) { + FlashSocket.log('clearing existing transport ' + transport.name); + transport.removeListener('drain', onDrain); + transport.removeListener('packet', onPacket); + transport.removeListener('error', onError); + transport.removeListener('close', onClose); + } + + // set up transport + transport = newtran; + + // set up transport listeners + transport.on('drain', onDrain); + transport.on('packet', onPacket); + transport.on('error', onError); + transport.on('close', onClose); + }; + + // probes a transport + private function probe(name:String):void { + FlashSocket.log('probing transport ', name); + var newtransport:Transport = Transport.create(name, opts); + var failed:Boolean = false; + + function onTransportOpen(e:FlashSocketEvent):void { + if (failed) return; + + FlashSocket.log('probe transport "%s" opened', name); + newtransport.send([{ type: 'ping', data: 'probe' }]); + newtransport.once('packet', function(e:FlashSocketEvent):void { + var msg:Object = e.data; + if (failed) return; + if ('pong' == msg.type && 'probe' == msg.data) { + FlashSocket.log('probe transport "%s" pong', name); + upgrading = true; + _emit('upgrading', newtransport); + if (!newtransport) return; + + FlashSocket.log('pausing current transport "%s"', transport.name); + transport.pause(function():void { + if (failed) return; + if ('closed' == readyState) return; + FlashSocket.log('changing transport and sending upgrade packet'); + + cleanup(); + + setTransport(newtransport); + newtransport.send([{ type: 'upgrade' }]); + _emit('upgrade', newtransport); + newtransport = null; + upgrading = false; + flush(); + }); + } else { + FlashSocket.log('probe transport failed', name); + _emit('upgradeError', { transport: name }); + } + }); + } + + function freezeTransport():void { + if (failed) return; + + // Any callback called by transport should be ignored since now + failed = true; + + cleanup(); + + newtransport.close(); + newtransport = null; + } + + //Handle any error that happens while probing + function onerror(err:*):void { + freezeTransport(); + + FlashSocket.log('probe transport failed because of error: ', name, err); + + _emit('upgradeError', { transport: name, error: "probe error: "+err }); + } + + function onTransportClose():void { + onerror("transport closed"); + } + + //When the socket is closed while we're probing + function onclose():void { + onerror("socket closed"); + } + + //When the socket is upgraded while we're probing + function onupgrade(e:FlashSocketEvent):void { + var to:Transport = e.data; + if (newtransport && to.name != newtransport.name) { + FlashSocket.log('"%s" works - aborting "%s"', to.name, newtransport.name); + freezeTransport(); + } + } + + //Remove all listeners on the transport and on self + function cleanup():void { + newtransport.removeListener('open', onTransportOpen); + newtransport.removeListener('error', onerror); + newtransport.removeListener('close', onTransportClose); + removeListener('close', onclose); + removeListener('upgrading', onupgrade); + } + + newtransport.once('open', onTransportOpen); + newtransport.once('error', onerror); + newtransport.once('close', onTransportClose); + + once('close', onclose); + once('upgrading', onupgrade); + + newtransport.open(); + + } // proble + + protected function onHandshake(hs:Object):void { + FlashSocket.log('handshake', hs); + + opts.sid = hs.sid; + upgrades = hs.upgrades.filter(function(u:*):* { return opts.transports.indexOf(u) != -1 }); + pingTimeout = hs.pingTimeout; + pingInterval = hs.pingInterval; + + onOpen(); + setPing(); + } + + private function onOpen():void { + readyState = 'open'; + flush(); + + // we check for `readyState` in case an `open` + // listener already closed the socket + if ('open' == readyState && opts.upgrade && transport.pausable) { + FlashSocket.log('starting upgrade probes', upgrades); + for(var i:int = 0; i < upgrades.length; i++) + probe(upgrades[i]); + } + } + + private function onHeartbeat(timeout:int):void { + clearTimeout(pingTimeoutTimer); + pingTimeoutTimer = setTimeout(function ():void { + if('closed' == readyState) + return; + FlashSocket.log("onHeartbeat closing", timeout) + close(); + }, timeout); + }; + + /** + * Pings server every `pingInterval` and expects response * within `pingTimeout` or closes connection. + */ + private function setPing():void { + clearTimeout(pingIntervalTimer); + pingIntervalTimer = setTimeout(function ():void { + FlashSocket.log('writing ping packet - expecting pong within ' + pingTimeout + ' ms'); + sendPacket('ping'); + onHeartbeat(pingTimeout); + }, pingInterval); + }; + + // sends engine.io packet + public function sendPacket(type:String, data:* = null, options:Object = null, doFlush:Boolean = true):void { + if('closing' == readyState || 'closed' == readyState) + return; + + options = options || {}; + options.compress = false !== options.compress; + + var packet:Object = { + type: type, + data: data, + options: options + }; + writeBuffer.push(packet); + + if(doFlush) + flush(); + }; + + public function flush():void { + if(!('closed' != readyState && transport.writable && !upgrading && writeBuffer.length)) + return; + + FlashSocket.log('flushing ' + writeBuffer.length + ' packets in socket', writeBuffer); + + transport.send(writeBuffer); + // keep track of current length of writeBuffer + // splice writeBuffer and callbackBuffer on `drain` + prevBufferLen = writeBuffer.length; + } + + protected function onPacket(e:FlashSocketEvent):void { + /* This is the lower-level engine.io protocol + * https://github.com/socketio/engine.io-protocol + open: 0 // non-ws + , close: 1 // non-ws + , ping: 2 + , pong: 3 + , message: 4 + , upgrade: 5 + , noop: 6 + */ + if(readyState != 'opening' && readyState != 'open' && readyState != 'closing') { + FlashSocket.log('packet received with socket readyState ' + readyState); + return; + } + + // Socket is live - any packet counts + if(readyState == 'open') + onHeartbeat(pingInterval + pingTimeout); + + var packet:Object = e.data; + if (packet.type == 'open') { + // data is a json connection options + onHandshake(JSON.parse(packet.data)); + + } else if(packet.type == "pong") { + FlashSocket.log('pong') + setPing(); + + } else if(packet.type == 'error') { + onError({ data: FlashSocketEvent.IO_ERROR }); + + } else if (packet.type == 'message') { + _emit('data', packet.data); + } + } + + private function onDrain(e:FlashSocketEvent):void { + writeBuffer.splice(0, prevBufferLen); + + // setting prevBufferLen = 0 is very important + // for example, when upgrading, upgrade packet is sent over, + // and a nonzero prevBufferLen could cause problems on `drain` + prevBufferLen = 0; + + if(writeBuffer.length) + flush(); + }; + + private function onClose(e:Object):void { + if(!('opening' == readyState || 'open' == readyState || 'closing' == readyState)) return; + + var reason:String = e.data; + FlashSocket.log('socket close with reason: "%s"', reason); + + // clear timers + clearTimeout(pingIntervalTimer); + clearTimeout(pingTimeoutTimer); + + // stop event from firing again for transport + transport.removeListener('close', onClose); + + // ensure transport won't stay open + transport.close(); + + // ignore further transport communication + transport.removeListener('drain', onDrain); + transport.removeListener('packet', onPacket); + transport.removeListener('error', onError); + + if('opening' == readyState && opts.transports.length > 1) { + // try next transport + opts.transports.shift(); + open(); + + } else { + // set ready state + readyState = 'closed'; + + // clear session id + opts.sid = null; + + // emit close event + _emit('close', reason); + + // clean buffers after, so users can still + // grab the buffers on `close` event + writeBuffer = []; + prevBufferLen = 0; + } + }; + + private function onError(e:Object):void { + FlashSocket.log('transport error', e.data) + + // emit 'error' unless we're going to try another transport + if(!('opening' == readyState && opts.transports.length > 1)) + _emit('error', e.data); + + onClose({ data: 'transport error: '+e.data }); + } + + public function close():void { + if ('opening' == readyState || 'open' == readyState) { + readyState = 'closing'; + + if (this.writeBuffer.length) { + this.once('drain', function():void { + if(upgrading) { + waitForUpgrade(); + } else { + close(); + } + }); + } else if (upgrading) { + waitForUpgrade(); + } else { + close(); + } + } + + function close():void { + // NOTE: + // the js client calls onClose immediately at this poing. However, + // if opening is in progress, transport.close() might not work, we need + // a handshake first to get the sid! So we just close the transport and wait for it to close + // + // onClose({ data: 'forced close' }); + + FlashSocket.log('socket closing - telling transport to close'); + transport.close(); + } + + function cleanupAndClose(e:*):void { + removeListener('upgrade', cleanupAndClose); + removeListener('upgradeError', cleanupAndClose); + close(); + } + + function waitForUpgrade():void { + // wait for upgrade to finish since we can't send packets while pausing a transport + once('upgrade', cleanupAndClose); + once('upgradeError', cleanupAndClose); + } + } + } +} diff --git a/src/com/pnwrain/flashsocket/FlashSocket.as b/src/com/pnwrain/flashsocket/FlashSocket.as index 5dd4e74..be7b2aa 100644 --- a/src/com/pnwrain/flashsocket/FlashSocket.as +++ b/src/com/pnwrain/flashsocket/FlashSocket.as @@ -1,184 +1,290 @@ -package com.pnwrain.flashsocket -{ - import com.adobe.serialization.json.JSON; +package com.pnwrain.flashsocket { + import com.adobe.net.URI + import flash.external.ExternalInterface; + import flash.utils.ByteArray; + + import socket.io.parser.Decoder; + import socket.io.parser.Encoder; + import socket.io.parser.Parser; + import socket.io.parser.ParserEvent; + import com.pnwrain.flashsocket.events.FlashSocketEvent; - - import flash.events.Event; - import flash.events.EventDispatcher; - import flash.events.IOErrorEvent; - import flash.events.SecurityErrorEvent; - import flash.system.Security; - - import mx.utils.URLUtil; - - public class FlashSocket extends EventDispatcher implements IWebSocketWrapper - { - protected var debug:Boolean = false; - protected var callerUrl:String; - protected var socketURL:String; - protected var webSocket:WebSocket; - - public function FlashSocket( url:String, protocol:String=null, proxyHost:String = null, proxyPort:int = 0, headers:String = null) - { - this.socketURL = url; - this.callerUrl = "http://localhost/socket.swf"; - - loadDefaultPolicyFile(url); - webSocket = new WebSocket(this, url, protocol, proxyHost, proxyPort, headers); - webSocket.addEventListener("event", onData); - webSocket.addEventListener(Event.CLOSE, onClose); - webSocket.addEventListener(Event.CONNECT, onConnect); - webSocket.addEventListener(IOErrorEvent.IO_ERROR, onIoError); - webSocket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSecurityError); - } - - protected function onClose(event:Event):void{ - var fe:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.CLOSE); - dispatchEvent(fe); - } - - protected function onConnect(event:Event):void{ - var fe:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.CONNECT); - dispatchEvent(fe); - } - protected function onIoError(event:Event):void{ - var fe:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.IO_ERROR); - dispatchEvent(fe); - } - protected function onSecurityError(event:Event):void{ - var fe:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.SECURITY_ERROR); - dispatchEvent(fe); - } - - protected function loadDefaultPolicyFile(wsUrl:String):void { - var policyUrl:String = "xmlsocket://" + URLUtil.getServerName(wsUrl) + ":843"; - log("policy file: " + policyUrl); - Security.loadPolicyFile(policyUrl); - } - - public function getOrigin():String { - return (URLUtil.getProtocol(this.callerUrl) + "://" + - URLUtil.getServerNameWithPort(this.callerUrl)).toLowerCase(); - } - - public function getCallerHost():String { - return null; - //I dont think we need this - //return URLUtil.getServerName(this.callerUrl); - } - public function log(message:String):void { - if (debug) { - trace("webSocketLog: " + message); - } + import com.pnwrain.flashsocket.events.EventEmitter; + + public class FlashSocket extends EventEmitter { + + public static var debug:Boolean = false; + + private var opts:Object; + private var _uri:String; + + private var engine:Engine; + + private var ackId:int = 0; + private var acks:Object = {}; + private var _receiveBuffer:Array = []; + + public var connected:Boolean; + public var connecting:Boolean; + + private var encoder:Encoder; + private var decoder:Decoder; + + + public function FlashSocket(puri:String, popts:Object = null) { + opts = popts || {}; + uri = puri; + + encoder = new Encoder(); + decoder = new Decoder(); + decoder.addEventListener(ParserEvent.DECODED, onDecoded); + + open(); + } + + public function get uri():String { + return _uri; + } + + public function set uri(puri:String):void { + _uri = puri; + + var parsed:URI = new URI(puri) + + opts.protocol = parsed.scheme; + opts.host = parsed.authority + (parsed.port ? ':'+parsed.port : ''); + opts.query = parsed.query; + opts.channel = parsed.path || "/"; } - - public function error(message:String):void { - trace("webSocketError: " + message); - } - - public function fatal(message:String):void { - trace("webSocketError: " + message); - } - - ///////////////////////////////////////////////////////////////// - ///////////////////////////////////////////////////////////////// - protected var frame:String = '~m~'; - - protected function onData(e:*):void{ - var event:Object = (e.target as WebSocket).receiveEvents(); - var data:Object = event[0]; - - if ( data.type == "message" ){ - this._setTimeout(); - var msgs:Array = this._decode(data.data); - if (msgs && msgs.length){ - for (var i:int = 0, l:int = msgs.length; i < l; i++){ - this._onMessage(msgs[i]); + + public function get transport():String { + return engine && engine.transport ? engine.transport.name : null; + } + + private function open():void { + connecting = true; + + engine = new Engine(opts); + + engine.on('data', onData); + engine.on('close', onClose); + engine.on('error', onError); + }; + + private function onData(ev:FlashSocketEvent):void { + decoder.add(ev.data); + } + + // called when a packet is fully decoded + private function onDecoded(ev:ParserEvent):void { + /* This is the higher-level socket.io protocol + https://github.com/automattic/socket.io-protocol + Packet#CONNECT (0) + Packet#DISCONNECT (1) + Packet#EVENT (2) + Packet#ACK (3) + Packet#ERROR (4) + Packet#BINARY_EVENT (5) + Packet#BINARY_ACK (6) + */ + var args:Array; + var packet:Object = ev.packet; + + switch (packet.type) { + case Parser.CONNECT: + if (packet.nsp == opts.channel) { + connected = true; + connecting = false; + + _emit(FlashSocketEvent.CONNECT); + + emitBuffered() // after CONNECT + } + else { + sendPacket({ + type: Parser.CONNECT, nsp: opts.channel + }); } - } + break; + + case Parser.EVENT: + case Parser.BINARY_EVENT: + args = packet.data || []; + + if(null != packet.id) + // the message has packet.id so it wants an ack + args.push(function(...args):void { + sendAck(args, packet.id) + }) + + if(this.connected) + _emit(args.shift(), args) + else + _receiveBuffer.push(args); + + break; + + case Parser.ACK: + case Parser.BINARY_ACK: + args = packet.data || []; + if (this.acks.hasOwnProperty(packet.id)) { + var func:Function = this.acks[packet.id] as Function; + delete this.acks[packet.id]; + + //pass however many args the function is looking for back to it + if (args.length > func.length) + func.apply(null, args.slice(0, func.length)); + else + func.apply(null, args); + } + break; + + case Parser.DISCONNECT: + onClose(); + break; + + case Parser.ERROR: + log('3: error: ' + packet.data); + + _emit(FlashSocketEvent.ERROR, packet.data); + break; } } - private function _setTimeout():void{ - + + private function onClose(e:FlashSocketEvent = null):void { + log('engine closed', connecting, connected) + + var event:String = + connected ? FlashSocketEvent.DISCONNECT : + connecting ? FlashSocketEvent.CONNECT_ERROR : + null; + + destroy() + + if(event) + _emit(event); + }; + + private function onError(e:FlashSocketEvent = null):void { + // e.data is the actual event to emit (CONNECT_ERROR | IO_ERROR | ...) + _emit(e.data); + destroy(); } - public var sessionid:String; - public var connected:Boolean; - public var connecting:Boolean; - - private function _onMessage(message:String):void{ - if (!this.sessionid){ - this.sessionid = message; - this._onConnect(); - } else if (message.substr(0, 3) == '~h~'){ - this._onHeartbeat(message.substr(3)); - } else if (message.substr(0, 3) == '~j~'){ - var json:String = message.substring(3,message.length); - var fe:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.MESSAGE); - fe.data = JSON.decode(json); - dispatchEvent(fe); - } else { - var fe2:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.MESSAGE); - fe2.data = message; - dispatchEvent(fe2); + + // packet = { type: ..., data: ..., nsp: ... } + // + private function sendPacket(packet:Object):void { + for each (var ioPacket:Object in encoder.encode(packet)) + engine.sendPacket('message', ioPacket, null, false); // false = don't flush + engine.flush(); + } + + public function emit(event:String, msg:Object, callback:Function = null):void { + if(msg as Array) + (msg as Array).unshift(event); + else + msg = [event, msg]; + + var type:Number = hasBin(msg) ? Parser.BINARY_EVENT : Parser.EVENT; + var packet:Object = { type: type, data: msg, nsp: opts.channel } + + if (null != callback) { + var messageId:int = this.ackId; + this.acks[this.ackId] = callback; + this.ackId++; + packet.id = messageId } + + sendPacket(packet); } - private function _decode(data:String):Array{ - var messages:Array = [], number:*, n:*; - do { - if (data.substr(0, 3) !== frame) return messages; - data = data.substr(3); - number = '', n = ''; - for (var i:int = 0, l:int = data.length; i < l; i++){ - n = Number(data.substr(i, 1)); - if (data.substr(i, 1) == n){ - number += n; - } else { - data = unescape(data.substr(number.length + frame.length)); - number = Number(number); - break; - } - } - messages.push(data.substr(0, number)); // here - data = data.substr(number); - } while(data !== ''); - return messages; - } - - private function _onHeartbeat(heartbeat:*):void{ - var enc:String = '~h~' + heartbeat; - send( enc ); // echo - }; - - public function send(msg:Object):void{ - - if ( msg is String){ - webSocket.send(_encode(msg)); - }else if ( msg is Object ){ - webSocket.send(_encode(JSON.encode(msg), true)); - }else{ - throw("Unsupported Message Type"); + + private function sendAck(data:Array, id:String):void { + sendPacket({ + type: hasBin(data) ? Parser.BINARY_ACK : Parser.ACK, + data: data, + nsp: opts.channel, + id: id + }) + } + + // returns try if val contains a ByteArray + // + private function hasBin(val:*):Boolean { + if(val is ByteArray) { + return true; + } else if(typeof val == 'object') { + for each (var elem:* in val) + if(hasBin(elem)) + return true; } + return false; } - - private function _onConnect():void{ - this.connected = true; - this.connecting = false; - var e:FlashSocketEvent = new FlashSocketEvent(FlashSocketEvent.CONNECT); - dispatchEvent(e); - }; - - private function _encode(messages:*, json:Boolean=false):String{ - var ret:String = '', - message:String, - messages:* = (messages is Array) ? messages : [messages]; - for (var i:int = 0, l:int = messages.length; i < l; i++){ - message = messages[i] === null || messages[i] === undefined ? '' : (messages[i].toString()); - if ( json ) { - message = "~j~" + message; - } - ret += frame + message.length + frame + message; + + private function emitBuffered():void { + if(!connected) return; // just to be sure + + var i:int; + for (i = 0; i < _receiveBuffer.length; i++) { + var args:Array = _receiveBuffer[i] as Array + _emit(args.shift(), args); } - return ret; - }; + _receiveBuffer = []; + } + + // full cleanup + public function destroy():void { + connected = connecting = false; + + if(engine) { + // ignore further transport communication + engine.removeListener('data', onData); + engine.removeListener('close', onClose); + engine.removeListener('error', onError); + + engine.close(); + engine = null; + } + + if (decoder) { + decoder.removeEventListener(ParserEvent.DECODED, onDecoded); + decoder.destroy(); + decoder = null; + } + encoder = null; + acks = null; + opts = null; + _receiveBuffer = null; + } + + public function close():void { + // if connected close engine, we'll destroy when closed + if (connected || connecting) + engine.close(); + else + destroy() + } + + + /////////////////////////// logging ////////////////////////////// + // + // + public static function log(...args):void { + if(!debug) return; + + trace("FlashSocket: " + args.map(function(a:*, ...r):String { return JSON.stringify(a) }).join(' ')); + + if(ExternalInterface.available) { + args.unshift('console.log'); + ExternalInterface.call.apply(ExternalInterface, args); + } + } + + public static function error(message:String):void { + trace("FlashSocket Error: " + message); + } + + public static function fatal(message:String):void { + trace("FlashSocket Error: " + message); + } } -} \ No newline at end of file +} diff --git a/src/com/pnwrain/flashsocket/Transport.as b/src/com/pnwrain/flashsocket/Transport.as new file mode 100644 index 0000000..eab2387 --- /dev/null +++ b/src/com/pnwrain/flashsocket/Transport.as @@ -0,0 +1,132 @@ +package com.pnwrain.flashsocket +{ + import flash.utils.ByteArray; + import flash.utils.setTimeout; + import flash.utils.clearTimeout; + + import com.pnwrain.flashsocket.FlashSocket; + import com.pnwrain.flashsocket.events.FlashSocketEvent; + import com.pnwrain.flashsocket.events.EventEmitter; + import com.pnwrain.flashsocket.transports.WebSocket; + import com.pnwrain.flashsocket.transports.Polling; + + public class Transport extends EventEmitter { + + static public function create(transport:String, opts:Object):Transport { + return transport == 'polling' + ? new Polling(opts) + : new WebSocket(opts); + }; + + static protected const typeCodes:Object = { + open: 0, + close: 1, + ping: 2, + pong: 3, + message: 4, + upgrade: 5, + noop: 6 + }; + static protected const typeNames:Array = [ + 'open', 'close', 'ping', 'pong', 'message', 'upgrade', 'noop' + ]; + + + public var name:String; + protected var readyState:String; + public var opts:Object; + public var writable:Boolean = false; + public var pausable:Boolean = false; + private var connectTimeoutTimer:int = 0; + + + public function Transport(popts:Object) { + opts = popts; + } + + public function open():void { + readyState = 'opening'; + + connectTimeoutTimer = setTimeout(onConnectTimeout, opts.connectTimeout || 20000); + } + + public function close():void { + } + + public function send(packets:Array):void { + } + + public function pause(cb:Function):void { + } + + protected function decodePacket(data:*):Object { + + var packet:Object = {}; + + if(data is ByteArray) { + packet.type = typeNames[data.readUnsignedByte()]; + + // remove first byte without copy + data.position = 0; + data.writeBytes(data, 1, data.length - 1); + data.length--; + data.position = 0; // ready to read + + packet.data = data; + + } else { + // string + data = decodeURIComponent(data); + + packet.type = typeNames[int(data.charAt(0))]; + packet.data = data.substr(1); + } + + return packet; + } + + private function onConnectTimeout():void { + onError(FlashSocketEvent.CONNECT_ERROR); + } + + // should be overrided by subclasses + protected function cleanup():void { + } + + // methods to be called by subclasses on various events + protected function onOpen():void { + readyState = 'open'; + writable = true; + + clearTimeout(connectTimeoutTimer) + + _emit('open'); + } + + protected function onClose():void { + writable = false + readyState = 'closed'; + + clearTimeout(connectTimeoutTimer) + cleanup(); + + _emit('close'); + } + + protected function onPacket(packet:Object):void { + _emit('packet', packet); + } + + protected function onError(err:String):void { + FlashSocket.log('transport error', err); + + writable = false + readyState = 'closed'; + + clearTimeout(connectTimeoutTimer) + cleanup(); + + _emit('error', err); + } + } +} diff --git a/src/com/pnwrain/flashsocket/Yeast.as b/src/com/pnwrain/flashsocket/Yeast.as new file mode 100644 index 0000000..d297005 --- /dev/null +++ b/src/com/pnwrain/flashsocket/Yeast.as @@ -0,0 +1,48 @@ +// port of https://github.com/unshiftio/yeast (MIT license) +// +package com.pnwrain.flashsocket { + + public class Yeast { + + private const length:int = 64 + + private var alphabet:Array + private var map:Object + private var seed:int = 0 + private var prev:String + + public function Yeast() { + + alphabet = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz-_'.split('') + + // Map each character to its index. + // + map = [] + for (var i:int = 0; i < length; i++) + map[alphabet[i]] = i + } + + private function encode(num:int):String { + var encoded:String = '' + + do { + encoded = alphabet[num % length] + encoded + num = Math.floor(num / length) + } while (num > 0) + + return encoded + } + + public function next():String { + var now:String = encode(new Date().getTime()) + + if (now !== prev) { + seed = 0 + prev = now + return now + } else { + return now +'.'+ encode(seed++) + } + } + } +} diff --git a/src/com/pnwrain/flashsocket/events/EventEmitter.as b/src/com/pnwrain/flashsocket/events/EventEmitter.as new file mode 100644 index 0000000..551176b --- /dev/null +++ b/src/com/pnwrain/flashsocket/events/EventEmitter.as @@ -0,0 +1,49 @@ +package com.pnwrain.flashsocket.events { + + import flash.events.EventDispatcher; + import flash.events.IEventDispatcher; + import flash.utils.Dictionary; + + public class EventEmitter extends EventDispatcher { + + private var _wrapped:Dictionary; + + public function EventEmitter(target:IEventDispatcher = null) { + super(target); + } + + public function on(type:String, listener:Function):void { + addEventListener(type, listener); + } + + public function once(type:String, listener:Function):void { + if(!_wrapped) + _wrapped = new Dictionary(true); + + _wrapped[listener] = function():void { + if(!(listener in _wrapped)) return; // just to be sure + removeListener(type, listener); + + listener.apply(this, arguments); + }; + + on(type, _wrapped[listener]); + } + + public function removeListener(type:String, listener:Function):void { + // use the wrapped version, if available + if(_wrapped && listener in _wrapped) { + removeEventListener(type, _wrapped[listener]); + delete _wrapped[listener]; + } else { + removeEventListener(type, listener); + } + } + + public function _emit(type:String, data:Object = null):void { + var e:FlashSocketEvent = new FlashSocketEvent(type); + e.data = data; + dispatchEvent(e); + } + } +} diff --git a/src/com/pnwrain/flashsocket/events/FlashSocketEvent.as b/src/com/pnwrain/flashsocket/events/FlashSocketEvent.as index 2912a75..6502554 100644 --- a/src/com/pnwrain/flashsocket/events/FlashSocketEvent.as +++ b/src/com/pnwrain/flashsocket/events/FlashSocketEvent.as @@ -1,20 +1,33 @@ package com.pnwrain.flashsocket.events { import flash.events.Event; - + public class FlashSocketEvent extends Event { - public static const CLOSE:String = "close"; public static const CONNECT:String = "connect"; public static const MESSAGE:String = "message"; public static const IO_ERROR:String = "ioError"; public static const SECURITY_ERROR:String = "securityError"; - + public static const DISCONNECT:String = "disconnect"; + public static const CONNECT_ERROR:String = "connectError"; + public static const ERROR:String = "error"; + public var data:*; - + public function FlashSocketEvent(type:String, bubbles:Boolean=true, cancelable:Boolean=false) { super(type, bubbles, cancelable); } + + public override function clone():Event + { + var event:FlashSocketEvent = new FlashSocketEvent(type, bubbles, cancelable); + event.data = data; + return event; + } + public override function toString():String + { + return formatToString("FlashSocketEvent", "type", "bubbles", "cancelable", "eventPhase", "data"); + } } -} \ No newline at end of file +} diff --git a/src/com/pnwrain/flashsocket/transports/Polling.as b/src/com/pnwrain/flashsocket/transports/Polling.as new file mode 100644 index 0000000..5839de3 --- /dev/null +++ b/src/com/pnwrain/flashsocket/transports/Polling.as @@ -0,0 +1,287 @@ +package com.pnwrain.flashsocket.transports { + + import flash.events.Event; + import flash.events.IOErrorEvent; + import flash.events.HTTPStatusEvent; + import flash.events.SecurityErrorEvent; + import flash.utils.ByteArray; + import flash.net.URLRequest; + import flash.net.URLLoader; + + import com.pnwrain.flashsocket.FlashSocket; + import com.pnwrain.flashsocket.Yeast; + import com.pnwrain.flashsocket.Transport; + import com.pnwrain.flashsocket.events.FlashSocketEvent; + + public class Polling extends Transport { + + private var polling:Boolean = false; + private var pollLoader:URLLoader; + private var sendLoader:URLLoader; + private var yeast:Yeast = new Yeast(); + + + public function Polling(popts:Object) { + super(popts); + + name = 'polling'; + pausable = true; + } + + override public function open():void { + super.open(); + + poll(); + } + + private function request(data:* = null):URLRequest { + var protocol:String = opts.protocol; + var host:String = opts.host; + var query:String = opts.query; + var sid:String = opts.sid; + + var req:URLRequest = new URLRequest(); + req.method = (data ? 'POST' : 'GET'); + req.contentType = 'application/octet-stream'; + req.data = data; + req.url = protocol + "://" + host + "/socket.io/?EIO=3&transport=polling" + + "&t=" + yeast.next() + (sid ? "&sid="+sid : "") + (query ? "&"+query : ""); + + return req; + } + + private function poll():void { + polling = true; + + FlashSocket.log('polling'); + + pollLoader = new URLLoader(); + pollLoader.dataFormat = 'binary'; + pollLoader.addEventListener(Event.COMPLETE, onPollData); + pollLoader.addEventListener(HTTPStatusEvent.HTTP_STATUS, onPollHttpStatus); + pollLoader.addEventListener(IOErrorEvent.IO_ERROR, onIoError); + pollLoader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSecurityError); + pollLoader.load(request()); + } + + override public function close():void { + FlashSocket.log("Polling.close", readyState) + if(readyState == 'opening') { + // opening phase, wait until we're open and then close + // + once('open', function(e:*):void { + close() + }) + + } else if(readyState == 'open') { + send([{ type: 'close' }]); + + readyState = 'closing'; + + // close as soon as the message is sent + once('drain', function(e:Event):void { + FlashSocket.log('Polling: close message send, closing') + onClose(); + }) + } + } + + // sends a sequence of engine.io packets: + // { + // type: close (1) | ping (2) | pong (3) | message (4) | upgrade (5) + // data: String (for text packets) | ByteArray (for binary packets) + // } + // see: https://github.com/socketio/engine.io-protocol + // + override public function send(packets:Array):void { + if(readyState != 'open' && readyState != 'opening') return + writable = false; + + var data:ByteArray = encodePayload(packets); + + sendLoader = new URLLoader(); + sendLoader.dataFormat = 'binary'; + sendLoader.addEventListener(HTTPStatusEvent.HTTP_STATUS, onSendHttpStatus); + sendLoader.addEventListener(IOErrorEvent.IO_ERROR, onIoError); + sendLoader.addEventListener(SecurityErrorEvent.SECURITY_ERROR, onSecurityError); + sendLoader.load(request(data)); + } + + // pauses polling, calls cb() after current polling/sending is finished + override public function pause(cb:Function):void { + readyState = 'pausing'; + + function pause():void { + FlashSocket.log('paused'); + readyState = 'paused'; + cb(); + } + + if(polling || !writable) { + var total:int = 0; + + if(polling) { + FlashSocket.log('we are currently polling - waiting to pause'); + total++; + once('pollComplete', function(e:*):void { + FlashSocket.log('pre-pause polling complete'); + --total || pause(); + }); + } + + if(!writable) { + FlashSocket.log('we are currently writing - waiting to pause'); + total++; + once('drain', function(e:*):void { + FlashSocket.log('pre-pause writing complete'); + --total || pause(); + }); + } + } else { + pause(); + } + } + + private function onPollData(e:Event):void { + var packets:Array = decodePayload(e.target.data); + + for each(var packet:Object in packets) { + FlashSocket.log('polling decoded packet ', packet, packet.data is String); + + // if its a close packet, we close the ongoing requests + if('close' == packet.type) { + super.onClose(); + return; + } + + // otherwise handle the message + super.onPacket(packet); + + // we consider the transport as open after processing the 'open' packet, + // so that we have the sid for future polls/sends + if('open' == packet.type) + super.onOpen(); + } + + // if an event did not trigger closing + if ('closed' != readyState) { + // if we got data we're not polling + polling = false; + _emit('pollComplete'); + + if('open' == readyState) + poll(); + else + FlashSocket.log('stopping poll - transport state ', readyState); + } + } + + private function onPollHttpStatus(event:HTTPStatusEvent):void { + if(event.status != 200) + super.onError(FlashSocketEvent.CONNECT_ERROR); + } + + private function onSendHttpStatus(event:HTTPStatusEvent):void { + if(event.status == 200) { + // suuccess + writable = true; + _emit('drain'); + } else + super.onError(FlashSocketEvent.IO_ERROR); + } + + private function onIoError(event:flash.events.Event):void { + super.onError(FlashSocketEvent.IO_ERROR); + } + + private function onSecurityError(event:flash.events.Event):void { + super.onError(FlashSocketEvent.SECURITY_ERROR); + } + + // implementation of engine.io-parser's encodePayloadAsBinary + // Encodes an array of packets in a single ByteArray + // for each packet we write: + // <0=string | 1=binary>