Changeset 481

Show
Ignore:
Timestamp:
08/11/08 14:01:39 (5 months ago)
Author:
heyadayo
Message:

Changed the responsibility of base64 and unicode encoding/decoding

js - CometSession? handles all base64
js - TCPSocket handles all the unicode
py - cometsession handles all of the base64

Removed binary bit from proxy handshake. Now all data is base64 encoded. The only difference between binary and text is that the TCPSocket class will handle the unicode en/decoding if its
set to text mode.

Changes base64 and unicode to operate on strings instead of byte (int) arrays
Updated Orbited.base64 to use btoa/atob (mozilla) when present

Added long-polling and polling transports

Added body.appendchild (div) logging for when firebug, firebug lite, and log4js all fail (cross-subdomain on IE)

Tested XSDR stuff in IE and FF

Location:
trunk/daemon/orbited
Files:
1 added
9 modified

Legend:

Unmodified
Added
Removed
  • trunk/daemon/orbited/cometsession.py

    r474 r481  
    11import os 
    22import uuid 
     3import base64 
    34 
    45from zope.interface import implements 
     
    143144 
    144145    # Determines timeout interval after ping has been sent 
    145     pingTimeout = 10 
     146    pingTimeout = 40 
    146147    # Determines interval to wait before sending a ping 
    147148    # since the last time we heard from the client. 
    148     pingInterval = 20 
     149    pingInterval = 40 
    149150 
    150151    def __init__(self, root, key, peer, host, **options): 
     
    220221         
    221222    def parseData(self, data): 
     223       
     224        # TODO: this method is filled with areas that really should be put 
     225        #       inside try/except blocks. We don't want errors caused by 
     226        #       malicious IO. 
     227       
     228       
    222229        self.logger.debug('RECV: ' + data) 
    223230        frames = [] 
     
    259266                    # TODO kill the connection with error. 
    260267                    pass 
    261                 data = args[2] 
     268                data = base64.b64decode(args[2]) 
    262269                # NB: parentTransport is-a FakeTCPTransport. 
    263270                self.parentTransport.dataReceived(data) 
     
    373380            self.cometTransport.sendPacket('close', str(packetId)) 
    374381        else: 
    375             self.cometTransport.sendPacket('data', str(packetId), data) 
     382            self.cometTransport.sendPacket('data', str(packetId), base64.b64encode(data)) 
    376383     
    377384    def resendUnackQueue(self): 
  • trunk/daemon/orbited/proxy.py

    r468 r481  
    1 import base64 
    2  
    31from twisted.internet import reactor 
    42from twisted.internet.protocol import ClientCreator 
     
    2725        self.logger.debug("connectionMade") 
    2826        self.state = 'handshake' 
    29         self.binary = False 
    3027        # TODO rename this to outgoingProtocol 
    3128        self.outgoingConn = None 
     
    3835        self.logger.debug('dataReceived: data=%r' % data) 
    3936        self.logger.debug('self.outgoingConn is', self.outgoingConn) 
    40         self.logger.debug('self.binary', self.binary) 
    4137 
    4238        if self.outgoingConn: 
    4339            # NB: outgoingConn is-a ProxyOutgoingProtocol 
    44             if self.binary: 
    45                 data = base64.b64decode(data) 
    4640            self.logger.debug("write (out): %r" % data) 
    4741            return self.outgoingConn.transport.write(data) 
     
    4943            try: 
    5044                data = data.strip() 
    51                 self.binary = (data[0] == '1') 
    52                 host, port = data[1:].split(':') 
     45                host, port = data.split(':') 
    5346                port = int(port) 
    5447            except: 
     
    6356                self.transport.loseConnection() 
    6457                return 
    65             self.logger.access('new %s connection from %s:%s to %s:%d' % (self.binary and 'binary' or 'text', peer.host, peer.port, host, port)) 
     58            self.logger.access('new connection from %s:%s to %s:%d' % (peer.host, peer.port, host, port)) 
    6659            self.state = 'connecting' 
    6760            client = ClientCreator(reactor, ProxyOutgoingProtocol, self) 
     
    7871            self.outgoingConn.transport.loseConnection() 
    7972 
    80     # XXX the wording is confusing;  shouldn't this be called 
    81     #     outgoingConnectionEstablished?  dito for remoteConnectionLost. 
    8273    def outgoingConnectionEstablished(self, outgoingConn): 
    8374        if self.state == 'closed': 
     
    9283 
    9384    def write(self, data): 
    94         if self.binary: 
    95             data = base64.b64encode(data) 
     85#        data = base64.b64encode(data) 
    9686        self.logger.debug("write %r" % data) 
    9787        self.transport.write(data) 
  • trunk/daemon/orbited/static/HTMLFileFrame.js

    r472 r481  
    6565try { 
    6666    HEARTBEAT_TIMEOUT = parent.Orbited.settings.HEARTBEAT_TIMEOUT 
    67 //    alert('same domain.'); 
    6867} 
    6968catch(e) { 
  • trunk/daemon/orbited/static/Orbited.js

    r479 r481  
    4040Orbited.settings.log = false; 
    4141Orbited.settings.HEARTBEAT_TIMEOUT = 6000 
     42Orbited.settings.POLL_INTERVAL = 2000 
    4243Orbited.singleton = {} 
    4344 
     
    7778    var tab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; 
    7879 
    79     Orbited.base64.encode=function(/* byte[] */ba){ 
     80    if (window.btoa && window.btoa('1') == 'MQ==') { 
     81        Orbited.base64.encode = function(data) { return btoa(data) }; 
     82        Orbited.base64.decode = function(data) { return atob(data) }; 
     83        console.log('using btoa and atob') 
     84        return 
     85    } 
     86 
     87    Orbited.base64.encode=function(/* String */ba){ 
    8088        //  summary 
    81         //  Encode an array of bytes as a base64-encoded string 
     89        //  Encode a string as a base64-encoded string 
    8290        var s=[], l=ba.length; 
    8391        var rm=l%3; 
    8492        var x=l-rm; 
    8593        for (var i=0; i<x;){ 
    86             var t=ba[i++]<<16|ba[i++]<<8|ba[i++]; 
     94            var t=ba.charCodeAt(i++)<<16|ba.charCodeAt(i++)<<8|ba.charCodeAt(i++); 
    8795            s.push(tab.charAt((t>>>18)&0x3f));  
    8896            s.push(tab.charAt((t>>>12)&0x3f)); 
     
    93101        switch(rm){ 
    94102            case 2:{ 
    95                 var t=ba[i++]<<16|ba[i++]<<8; 
     103                var t=ba.charCodeAt(i++)<<16|ba.charCodeAt(i++)<<8; 
    96104                s.push(tab.charAt((t>>>18)&0x3f)); 
    97105                s.push(tab.charAt((t>>>12)&0x3f)); 
     
    101109            } 
    102110            case 1:{ 
    103                 var t=ba[i++]<<16; 
     111                var t=ba.charCodeAt(i++)<<16; 
    104112                s.push(tab.charAt((t>>>18)&0x3f)); 
    105113                s.push(tab.charAt((t>>>12)&0x3f)); 
     
    111119        return s.join("");  //  string 
    112120    }; 
     121 
    113122 
    114123    Orbited.base64.decode=function(/* string */str){ 
     
    124133            if(i<=l){ t|=tab.indexOf(s[i++])<<6 }; 
    125134            if(i<=l){ t|=tab.indexOf(s[i++]) }; 
    126             out.push((t>>>16)&0xff); 
    127             out.push((t>>>8)&0xff); 
    128             out.push(t&0xff); 
     135            out.push(String.fromCharCode((t>>>16)&0xff)); 
     136            out.push(String.fromCharCode((t>>>8)&0xff)); 
     137            out.push(String.fromCharCode(t&0xff)); 
    129138        } 
    130139        // strip off trailing padding 
    131140        while(tl--){ out.pop(); } 
    132         return out; //  byte[] 
     141        return out.join(""); //  string 
    133142    }; 
    134143})(); 
     
    204213    self.assert = function() { 
    205214        if (!self.enabled) { return } 
    206         console.assert.apply(this, padArgs(arguments)) 
     215        var newArgs = [arguments[0], name + ":" ] 
     216        for (var i = 1; i < arguments.length; ++i) { 
     217            newArgs.push(arguments[i]); 
     218        } 
     219        console.assert.apply(this, newArgs) 
    207220    } 
    208221    self.trace = function() { 
     
    214227Orbited.Loggers.EmptyLogger = function(name) { 
    215228    var self = this; 
     229    self.enabled = false; 
    216230    self.name = name; 
    217231    self.log = function() { 
    218232    } 
    219233    self.debug = function() { 
     234        if (!self.enabled) { return } 
     235        var newArgs = [ "<b>" + name + "</b>" ] 
     236        for (var i = 0; i < arguments.length; ++i) { 
     237            newArgs.push(arguments[i]); 
     238        } 
     239        d = document.createElement('div') 
     240        d.innerHTML = newArgs.join(", ") 
     241        document.body.appendChild(d) 
    220242    } 
    221243    self.info = function() { 
     
    391413            throw new Error("Invalid readyState") 
    392414        } 
     415        data = Orbited.base64.encode(data) 
    393416        sendQueue.push([++packetCount, "data", data]) 
    394417;;;     self.logger.debug('sending==', sending); 
     
    459482 
    460483    var transportOnReadFrame = function(frame) { 
    461 ;;;     self.logger.debug('READ FRAME'); 
    462 ;;;     self.logger.debug('id ', frame.id); 
    463 ;;;     self.logger.debug('name ', frame.name); 
    464 ;;;     if (frame.args.length > 0) 
    465 ;;;         self.logger.debug('args ', frame.args[0]); 
    466 ;;;     self.logger.debug('---'); 
     484;;;     self.logger.debug('transportOnReadFrame') 
     485;;;     self.logger.debug('READ FRAME: ', frame.id, frame.name, frame.data ? frame.data.length : ''); 
    467486        if (!isNaN(frame.id)) { 
    468487            lastPacketId = Math.max(lastPacketId, frame.id); 
     
    476495                break; 
    477496            case 'data': 
    478                 self.onread(frame.args[0]); 
     497;;;             self.logger.debug('base64 decoding ' + frame.data.length + ' bytes of data') 
     498                data = Orbited.base64.decode(frame.data) 
     499;;;             self.logger.debug('decode complete'); 
     500                self.onread(data); 
    479501                break; 
    480502            case 'open': 
    481503                if (self.readyState == self.READY_STATE_OPENING) { 
    482504                    self.readyState = self.READY_STATE_OPEN; 
     505;;;                 self.logger.debug('Call self.onopen()'); 
    483506                    self.onopen(); 
    484507                } 
     
    489512            case 'ping': 
    490513                // TODO: don't have a third element (remove the null). 
    491                 sendQueue.push([++packetCount, "ping", null]) 
    492                 if (!sending) { 
    493                     doSend() 
     514                // NOTE: don't waste a request when we get a longpoll ping. 
     515                switch(cometTransport.name) { 
     516                    case 'longpoll': 
     517                        break; 
     518                    case 'poll': 
     519                        break; 
     520                    default: 
     521                        sendQueue.push([++packetCount, "ping", null]) 
     522                        if (!sending) { 
     523                            doSend() 
     524                        } 
     525                        break;                     
    494526                } 
    495                  
    496527        } 
    497528    } 
     
    539570        } 
    540571        if (sendQueue.length == 0) { 
     572;;;         self.logger.debug('sendQueue exhausted'); 
    541573            sending = false; 
    542574            return 
     
    548580//        xhr = createXHR(); 
    549581        xhr.onreadystatechange = function() { 
    550 ;;;         self.logger.debug('send readyState', xhr.readyState) 
    551 ;;;         try { 
    552 ;;;             self.logger.debug('status', xhr.status); 
    553 ;;;         } catch(e) { 
    554 ;;;             self.logger.debug('no status'); 
    555 ;;;         } 
    556582            switch(xhr.readyState) { 
    557583                 
     
    611637    self.onread = function() { } 
    612638    self.onclose = function() { } 
    613  
     639    var buffer = "" 
    614640    var session = null; 
    615641    var binary = false; 
     
    675701                throw new Error("invalid payload: binary mode is set"); 
    676702            } 
    677             session.send(encodeBinary(data)) 
    678703        } 
    679704        else { 
    680 ;;;         self.logger.debug('SEND: ', data) 
    681             session.send(data) 
    682         } 
    683        } 
    684  
    685     var encodeBinary = Orbited.base64.encode; 
    686     var decodeBinary = Orbited.base64.decode; 
     705            data = Orbited.utf8.fromUtf8(data) 
     706        } 
     707;;;     self.logger.debug('SEND: ', data) 
     708        session.send(data) 
     709    } 
     710 
     711    var process = function() { 
     712        var result = Orbited.utf8.toUtf8(buffer) 
     713        var data = result[0] 
     714        var i = result[1] 
     715        buffer = buffer.slice(i) 
     716        if (data.length > 0) { 
     717            self.onread(data); 
     718        } 
     719    } 
    687720 
    688721    var sessionOnRead = function(data) { 
     
    690723            case self.READY_STATE_OPEN: 
    691724;;;             self.logger.debug('READ: ', data) 
    692                 binary ? self.onread(decodeBinary(data)) : self.onread(data) 
    693                 break; 
     725                var data = data; 
     726                if (self.binary) { 
     727                    self.onread(data); 
     728                } 
     729                else { 
     730;;;                 self.logger.debug('start buffer size:', buffer.length) 
     731                    buffer += data; 
     732//                    data.splice(0,0,buffer.length, 0) 
     733//                    buffer.splice.apply(buffer, data) 
     734                    process() 
     735;;;                 self.logger.debug('end buffer size:', buffer.length) 
     736                } 
     737            break; 
    694738            case self.READY_STATE_OPENING: 
    695739                switch(handshakeState) { 
    696740                    case 'initial': 
     741                        // NOTE: we should only get complete payloads during 
     742                        //       the handshake. no need to buffer, then parse 
     743                        data = Orbited.utf8.toUtf8(data)[0]; 
    697744;;;                     self.logger.debug('initial'); 
    698745;;;                     self.logger.debug('data', data) 
     
    725772    var sessionOnOpen = function(data) { 
    726773        // TODO: TCPSocket handshake 
    727         session.send((binary ? '1' : '0') + hostname + ':' + port + '\n') 
     774        var payload = hostname + ':' + port + '\n'  
     775;;;     self.logger.debug('sessionOpen; sending:', payload) 
     776        payload = Orbited.utf8.fromUtf8(payload) 
     777;;;     self.logger.debug('encoded payload:', payload) 
     778        X = payload 
     779        session.send(payload) 
    728780        handshakeState = 'initial' 
    729781    } 
     
    768820Orbited.XSDR = function() { 
    769821    var self = this; 
    770  
    771822    var ifr = null; 
    772823    var url; 
     
    836887    self.abort = function() { 
    837888        if (self.readyState > 0 && self.readyState < 4) { 
    838             queue.push(['ABORT']); 
     889            // TODO: push an ABORT command (so as not to reload the iframe) 
     890//            queue.push(['ABORT']); 
     891;;;         self.logger.debug('ABORT called'); 
     892            ifr.src = "about:blank" 
     893            document.body.removeChild(ifr) 
     894            ifr = null; 
     895            self.readyState = 4; 
     896            self.onreadystatechange(); 
    839897        } 
    840898    } 
     
    865923                var data = payload[1] 
    866924                self.readyState = data.readyState 
     925;;;             self.logger.debug('readystatechange', self.readyState) 
    867926                if (data.status) { 
    868927                    self.status = data.status 
     928;;;                 self.logger.debug('status', data.status) 
    869929                } 
    870930                if (data.responseText) { 
    871931                    self.responseText += data.responseText 
     932;;;                 self.logger.debug('responseText', data.responseText) 
    872933                } 
     934;;;             self.logger.debug('doing trigger'); 
    873935                self.onreadystatechange(); 
     936;;;             self.logger.debug('trigger complete'); 
    874937        } 
    875938    } 
     
    922985Orbited.CometTransports.XHRStream = function() { 
    923986    var self = this; 
     987    self.name = 'xhrstream' 
    924988    var url = null; 
    925989    var xhr = null; 
     
    929993    var retryTimer = null; 
    930994    var buffer = "" 
    931     var currentArgs = [] 
    932995    var retryInterval = 50 
    933996    self.readyState = 0 
     
    11331196    var receivedHeartbeat = function() { 
    11341197        window.clearTimeout(heartbeatTimer); 
    1135 //        self.logger.debug('clearing heartbeatTimer', heartbeatTimer) 
     1198;;;     self.logger.debug('clearing heartbeatTimer', heartbeatTimer) 
    11361199        heartbeatTimer = window.setTimeout(function() {  
    1137 //            self.logger.debug('timer', testtimer, 'did it');  
     1200;;;         self.logger.debug('timer', testtimer, 'did it');  
    11381201            heartbeatTimeout(); 
    11391202        }, Orbited.settings.HEARTBEAT_TIMEOUT); 
    11401203        var testtimer = heartbeatTimer; 
    11411204 
    1142 //        self.logger.debug('heartbeatTimer is now', heartbeatTimer) 
     1205;;;     self.logger.debug('heartbeatTimer is now', heartbeatTimer) 
    11431206    } 
    11441207    var heartbeatTimeout = function() { 
    1145 //        self.logger.debug('heartbeat timeout... reconnect') 
     1208;;;     self.logger.debug('heartbeat timeout... reconnect') 
    11461209        reconnect(); 
    11471210    } 
     
    11541217            id: testAckId, 
    11551218            name: args[1], 
    1156             args: args.slice(2) 
    1157         } 
     1219            data: args[2] 
     1220        } 
     1221        // TODO: shouldn't we put this in a window.setTimeout so that user 
     1222        //       code won't mess up our code? 
    11581223        self.onread(packet) 
    11591224    } 
     
    11671232Orbited.CometTransports.XHRStream.safari3 = 1.0 
    11681233 
     1234 
     1235 
     1236 
     1237 
     1238Orbited.CometTransports.LongPoll = function() { 
     1239    var self = this; 
     1240    self.name = 'longpoll' 
     1241    var url = null; 
     1242    var xhr = null; 
     1243    var ackId = null; 
     1244    var retryTimer = null; 
     1245    var buffer = "" 
     1246    var retryInterval = 50 
     1247    self.readyState = 0 
     1248    self.onReadFrame = function(frame) {} 
     1249    self.onclose = function() { } 
     1250 
     1251    self.close = function() { 
     1252        if (self.readyState == 2) { 
     1253            return 
     1254        } 
     1255        if (xhr != null && (xhr.readyState > 1 || xhr.readyState < 4)) { 
     1256            xhr.onreadystatechange = function() { } 
     1257            xhr.abort() 
     1258            xhr = null; 
     1259        } 
     1260        self.readyState = 2 
     1261        window.clearTimeout(retryTimer); 
     1262        self.onclose(); 
     1263    } 
     1264 
     1265    self.connect = function(_url) { 
     1266        if (self.readyState == 1) { 
     1267            throw new Error("Already Connected") 
     1268        } 
     1269        url = new Orbited.URL(_url) 
     1270        if (xhr == null) { 
     1271            if (url.isSameDomain(location.href)) { 
     1272                xhr = createXHR(); 
     1273            } 
     1274            else { 
     1275                xhr = new Orbited.XSDR(); 
     1276            } 
     1277        } 
     1278        url.path += '/longpoll' 
     1279//        url.setQsParameter('transport', 'xhrstream') 
     1280        self.readyState = 1 
     1281        open() 
     1282    } 
     1283    var open = function() { 
     1284        try { 
     1285            if (typeof(ackId) == "number") { 
     1286                url.setQsParameter('ack', ackId) 
     1287            } 
     1288            if (typeof(xhr)== "undefined" || xhr == null) { 
     1289                throw new Error("how did this happen?"); 
     1290            } 
     1291             
     1292            xhr.open('GET', url.render(), true) 
     1293            xhr.onreadystatechange = function() { 
     1294;;;             self.logger.debug('readystate', xhr.readyState) 
     1295                switch(xhr.readyState) { 
     1296                   case 4: 
     1297                        try { 
     1298                            xhr.status 
     1299                        } 
     1300                        catch(e) { 
     1301                            // Expoential backoff: Every time we fail to  
     1302                            // reconnect, double the interval. 
     1303                            // TODO cap the max value.  
     1304;;;                         self.logger.debug("start reconnect Timer (couldn't access xhr.status)") 
     1305                            retryInterval *= 2; 
     1306                            window.setTimeout(reconnect, retryInterval) 
     1307                            return; 
     1308                        } 
     1309                        switch(xhr.status) { 
     1310                            case 200: 
     1311                                process(); 
     1312;;;                         self.logger.debug("completed request, reconnect immediately") 
     1313                                setTimeout(open, 0) 
     1314                                break; 
     1315                            case 404: 
     1316                                self.close(); 
     1317                                break 
     1318                            case null: 
     1319                                // NOTE: for the XSDR case:  
     1320                                // (we can always get status, but maybe its null) 
     1321                                retryInterval *= 2; 
     1322;;;                         self.logger.debug("start reconnect Timer (null xhr.status)") 
     1323                                window.setTimeout(reconnect, retryInterval) 
     1324                                break;                                 
     1325                            default: 
     1326                                // TODO: do we want to retry here? 
     1327;;;                         self.logger.debug("something broke, xhr.status=", xhr.status) 
     1328                                self.close(); 
     1329                                break 
     1330                        } 
     1331                } 
     1332            } 
     1333            xhr.send(null); 
     1334        } 
     1335        catch(e) { 
     1336            self.close() 
     1337        } 
     1338    } 
     1339 
     1340    var reconnect = function() { 
     1341;;;     self.logger.debug('reconnect...') 
     1342        if (xhr.readyState < 4 && xhr.readyState > 0) { 
     1343            xhr.onreadystatechange = function() { 
     1344                if (xhr.readyState == 4) { 
     1345                    reconnect(); 
     1346                } 
     1347            } 
     1348;;;         self.logger.debug('do abort..') 
     1349            xhr.abort(); 
     1350            window.clearTimeout(heartbeatTimer);             
     1351        } 
     1352        else { 
     1353;;;         self.logger.debug('reconnect do open') 
     1354            offset = 0; 
     1355            setTimeout(open, 0) 
     1356        } 
     1357    } 
     1358    // 12,ab011,hello world 
     1359    var process = function() { 
     1360        var commaPos = -1; 
     1361        var argEnd = null; 
     1362        var argSize; 
     1363        var frame = [] 
     1364        var stream = xhr.responseText; 
     1365        var offset = 0 
     1366 
     1367 
     1368        var k = 0 
     1369        while (true) { 
     1370            k += 1 
     1371            if (k > 2000) { 
     1372