Changeset 481
- Timestamp:
- 08/11/08 14:01:39 (5 months ago)
- Location:
- trunk/daemon/orbited
- Files:
-
- 1 added
- 9 modified
-
cometsession.py (modified) (5 diffs)
-
proxy.py (modified) (7 diffs)
-
static/HTMLFileFrame.js (modified) (1 diff)
-
static/Orbited.js (modified) (31 diffs)
-
static/xsdrBridge.html (modified) (4 diffs)
-
transports/__init__.py (modified) (2 diffs)
-
transports/base.py (modified) (3 diffs)
-
transports/longpoll.py (modified) (2 diffs)
-
transports/poll.py (added)
-
transports/xhrstream.py (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/daemon/orbited/cometsession.py
r474 r481 1 1 import os 2 2 import uuid 3 import base64 3 4 4 5 from zope.interface import implements … … 143 144 144 145 # Determines timeout interval after ping has been sent 145 pingTimeout = 10146 pingTimeout = 40 146 147 # Determines interval to wait before sending a ping 147 148 # since the last time we heard from the client. 148 pingInterval = 20149 pingInterval = 40 149 150 150 151 def __init__(self, root, key, peer, host, **options): … … 220 221 221 222 def parseData(self, data): 223 224 # TODO: this method is filled with areas that really should be put 225 # inside try/except blocks. We don't want errors caused by 226 # malicious IO. 227 228 222 229 self.logger.debug('RECV: ' + data) 223 230 frames = [] … … 259 266 # TODO kill the connection with error. 260 267 pass 261 data = args[2]268 data = base64.b64decode(args[2]) 262 269 # NB: parentTransport is-a FakeTCPTransport. 263 270 self.parentTransport.dataReceived(data) … … 373 380 self.cometTransport.sendPacket('close', str(packetId)) 374 381 else: 375 self.cometTransport.sendPacket('data', str(packetId), data)382 self.cometTransport.sendPacket('data', str(packetId), base64.b64encode(data)) 376 383 377 384 def resendUnackQueue(self): -
trunk/daemon/orbited/proxy.py
r468 r481 1 import base642 3 1 from twisted.internet import reactor 4 2 from twisted.internet.protocol import ClientCreator … … 27 25 self.logger.debug("connectionMade") 28 26 self.state = 'handshake' 29 self.binary = False30 27 # TODO rename this to outgoingProtocol 31 28 self.outgoingConn = None … … 38 35 self.logger.debug('dataReceived: data=%r' % data) 39 36 self.logger.debug('self.outgoingConn is', self.outgoingConn) 40 self.logger.debug('self.binary', self.binary)41 37 42 38 if self.outgoingConn: 43 39 # NB: outgoingConn is-a ProxyOutgoingProtocol 44 if self.binary:45 data = base64.b64decode(data)46 40 self.logger.debug("write (out): %r" % data) 47 41 return self.outgoingConn.transport.write(data) … … 49 43 try: 50 44 data = data.strip() 51 self.binary = (data[0] == '1') 52 host, port = data[1:].split(':') 45 host, port = data.split(':') 53 46 port = int(port) 54 47 except: … … 63 56 self.transport.loseConnection() 64 57 return 65 self.logger.access('new %s connection from %s:%s to %s:%d' % (self.binary and 'binary' or 'text',peer.host, peer.port, host, port))58 self.logger.access('new connection from %s:%s to %s:%d' % (peer.host, peer.port, host, port)) 66 59 self.state = 'connecting' 67 60 client = ClientCreator(reactor, ProxyOutgoingProtocol, self) … … 78 71 self.outgoingConn.transport.loseConnection() 79 72 80 # XXX the wording is confusing; shouldn't this be called81 # outgoingConnectionEstablished? dito for remoteConnectionLost.82 73 def outgoingConnectionEstablished(self, outgoingConn): 83 74 if self.state == 'closed': … … 92 83 93 84 def write(self, data): 94 if self.binary: 95 data = base64.b64encode(data) 85 # data = base64.b64encode(data) 96 86 self.logger.debug("write %r" % data) 97 87 self.transport.write(data) -
trunk/daemon/orbited/static/HTMLFileFrame.js
r472 r481 65 65 try { 66 66 HEARTBEAT_TIMEOUT = parent.Orbited.settings.HEARTBEAT_TIMEOUT 67 // alert('same domain.');68 67 } 69 68 catch(e) { -
trunk/daemon/orbited/static/Orbited.js
r479 r481 40 40 Orbited.settings.log = false; 41 41 Orbited.settings.HEARTBEAT_TIMEOUT = 6000 42 Orbited.settings.POLL_INTERVAL = 2000 42 43 Orbited.singleton = {} 43 44 … … 77 78 var tab = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"; 78 79 79 Orbited.base64.encode=function(/* byte[] */ba){ 80 if (window.btoa && window.btoa('1') == 'MQ==') { 81 Orbited.base64.encode = function(data) { return btoa(data) }; 82 Orbited.base64.decode = function(data) { return atob(data) }; 83 console.log('using btoa and atob') 84 return 85 } 86 87 Orbited.base64.encode=function(/* String */ba){ 80 88 // summary 81 // Encode a n array of bytesas a base64-encoded string89 // Encode a string as a base64-encoded string 82 90 var s=[], l=ba.length; 83 91 var rm=l%3; 84 92 var x=l-rm; 85 93 for (var i=0; i<x;){ 86 var t=ba [i++]<<16|ba[i++]<<8|ba[i++];94 var t=ba.charCodeAt(i++)<<16|ba.charCodeAt(i++)<<8|ba.charCodeAt(i++); 87 95 s.push(tab.charAt((t>>>18)&0x3f)); 88 96 s.push(tab.charAt((t>>>12)&0x3f)); … … 93 101 switch(rm){ 94 102 case 2:{ 95 var t=ba [i++]<<16|ba[i++]<<8;103 var t=ba.charCodeAt(i++)<<16|ba.charCodeAt(i++)<<8; 96 104 s.push(tab.charAt((t>>>18)&0x3f)); 97 105 s.push(tab.charAt((t>>>12)&0x3f)); … … 101 109 } 102 110 case 1:{ 103 var t=ba [i++]<<16;111 var t=ba.charCodeAt(i++)<<16; 104 112 s.push(tab.charAt((t>>>18)&0x3f)); 105 113 s.push(tab.charAt((t>>>12)&0x3f)); … … 111 119 return s.join(""); // string 112 120 }; 121 113 122 114 123 Orbited.base64.decode=function(/* string */str){ … … 124 133 if(i<=l){ t|=tab.indexOf(s[i++])<<6 }; 125 134 if(i<=l){ t|=tab.indexOf(s[i++]) }; 126 out.push( (t>>>16)&0xff);127 out.push( (t>>>8)&0xff);128 out.push( t&0xff);135 out.push(String.fromCharCode((t>>>16)&0xff)); 136 out.push(String.fromCharCode((t>>>8)&0xff)); 137 out.push(String.fromCharCode(t&0xff)); 129 138 } 130 139 // strip off trailing padding 131 140 while(tl--){ out.pop(); } 132 return out ; // byte[]141 return out.join(""); // string 133 142 }; 134 143 })(); … … 204 213 self.assert = function() { 205 214 if (!self.enabled) { return } 206 console.assert.apply(this, padArgs(arguments)) 215 var newArgs = [arguments[0], name + ":" ] 216 for (var i = 1; i < arguments.length; ++i) { 217 newArgs.push(arguments[i]); 218 } 219 console.assert.apply(this, newArgs) 207 220 } 208 221 self.trace = function() { … … 214 227 Orbited.Loggers.EmptyLogger = function(name) { 215 228 var self = this; 229 self.enabled = false; 216 230 self.name = name; 217 231 self.log = function() { 218 232 } 219 233 self.debug = function() { 234 if (!self.enabled) { return } 235 var newArgs = [ "<b>" + name + "</b>" ] 236 for (var i = 0; i < arguments.length; ++i) { 237 newArgs.push(arguments[i]); 238 } 239 d = document.createElement('div') 240 d.innerHTML = newArgs.join(", ") 241 document.body.appendChild(d) 220 242 } 221 243 self.info = function() { … … 391 413 throw new Error("Invalid readyState") 392 414 } 415 data = Orbited.base64.encode(data) 393 416 sendQueue.push([++packetCount, "data", data]) 394 417 ;;; self.logger.debug('sending==', sending); … … 459 482 460 483 var transportOnReadFrame = function(frame) { 461 ;;; self.logger.debug('READ FRAME'); 462 ;;; self.logger.debug('id ', frame.id); 463 ;;; self.logger.debug('name ', frame.name); 464 ;;; if (frame.args.length > 0) 465 ;;; self.logger.debug('args ', frame.args[0]); 466 ;;; self.logger.debug('---'); 484 ;;; self.logger.debug('transportOnReadFrame') 485 ;;; self.logger.debug('READ FRAME: ', frame.id, frame.name, frame.data ? frame.data.length : ''); 467 486 if (!isNaN(frame.id)) { 468 487 lastPacketId = Math.max(lastPacketId, frame.id); … … 476 495 break; 477 496 case 'data': 478 self.onread(frame.args[0]); 497 ;;; self.logger.debug('base64 decoding ' + frame.data.length + ' bytes of data') 498 data = Orbited.base64.decode(frame.data) 499 ;;; self.logger.debug('decode complete'); 500 self.onread(data); 479 501 break; 480 502 case 'open': 481 503 if (self.readyState == self.READY_STATE_OPENING) { 482 504 self.readyState = self.READY_STATE_OPEN; 505 ;;; self.logger.debug('Call self.onopen()'); 483 506 self.onopen(); 484 507 } … … 489 512 case 'ping': 490 513 // TODO: don't have a third element (remove the null). 491 sendQueue.push([++packetCount, "ping", null]) 492 if (!sending) { 493 doSend() 514 // NOTE: don't waste a request when we get a longpoll ping. 515 switch(cometTransport.name) { 516 case 'longpoll': 517 break; 518 case 'poll': 519 break; 520 default: 521 sendQueue.push([++packetCount, "ping", null]) 522 if (!sending) { 523 doSend() 524 } 525 break; 494 526 } 495 496 527 } 497 528 } … … 539 570 } 540 571 if (sendQueue.length == 0) { 572 ;;; self.logger.debug('sendQueue exhausted'); 541 573 sending = false; 542 574 return … … 548 580 // xhr = createXHR(); 549 581 xhr.onreadystatechange = function() { 550 ;;; self.logger.debug('send readyState', xhr.readyState)551 ;;; try {552 ;;; self.logger.debug('status', xhr.status);553 ;;; } catch(e) {554 ;;; self.logger.debug('no status');555 ;;; }556 582 switch(xhr.readyState) { 557 583 … … 611 637 self.onread = function() { } 612 638 self.onclose = function() { } 613 639 var buffer = "" 614 640 var session = null; 615 641 var binary = false; … … 675 701 throw new Error("invalid payload: binary mode is set"); 676 702 } 677 session.send(encodeBinary(data))678 703 } 679 704 else { 680 ;;; self.logger.debug('SEND: ', data) 681 session.send(data) 682 } 683 } 684 685 var encodeBinary = Orbited.base64.encode; 686 var decodeBinary = Orbited.base64.decode; 705 data = Orbited.utf8.fromUtf8(data) 706 } 707 ;;; self.logger.debug('SEND: ', data) 708 session.send(data) 709 } 710 711 var process = function() { 712 var result = Orbited.utf8.toUtf8(buffer) 713 var data = result[0] 714 var i = result[1] 715 buffer = buffer.slice(i) 716 if (data.length > 0) { 717 self.onread(data); 718 } 719 } 687 720 688 721 var sessionOnRead = function(data) { … … 690 723 case self.READY_STATE_OPEN: 691 724 ;;; self.logger.debug('READ: ', data) 692 binary ? self.onread(decodeBinary(data)) : self.onread(data) 693 break; 725 var data = data; 726 if (self.binary) { 727 self.onread(data); 728 } 729 else { 730 ;;; self.logger.debug('start buffer size:', buffer.length) 731 buffer += data; 732 // data.splice(0,0,buffer.length, 0) 733 // buffer.splice.apply(buffer, data) 734 process() 735 ;;; self.logger.debug('end buffer size:', buffer.length) 736 } 737 break; 694 738 case self.READY_STATE_OPENING: 695 739 switch(handshakeState) { 696 740 case 'initial': 741 // NOTE: we should only get complete payloads during 742 // the handshake. no need to buffer, then parse 743 data = Orbited.utf8.toUtf8(data)[0]; 697 744 ;;; self.logger.debug('initial'); 698 745 ;;; self.logger.debug('data', data) … … 725 772 var sessionOnOpen = function(data) { 726 773 // TODO: TCPSocket handshake 727 session.send((binary ? '1' : '0') + hostname + ':' + port + '\n') 774 var payload = hostname + ':' + port + '\n' 775 ;;; self.logger.debug('sessionOpen; sending:', payload) 776 payload = Orbited.utf8.fromUtf8(payload) 777 ;;; self.logger.debug('encoded payload:', payload) 778 X = payload 779 session.send(payload) 728 780 handshakeState = 'initial' 729 781 } … … 768 820 Orbited.XSDR = function() { 769 821 var self = this; 770 771 822 var ifr = null; 772 823 var url; … … 836 887 self.abort = function() { 837 888 if (self.readyState > 0 && self.readyState < 4) { 838 queue.push(['ABORT']); 889 // TODO: push an ABORT command (so as not to reload the iframe) 890 // queue.push(['ABORT']); 891 ;;; self.logger.debug('ABORT called'); 892 ifr.src = "about:blank" 893 document.body.removeChild(ifr) 894 ifr = null; 895 self.readyState = 4; 896 self.onreadystatechange(); 839 897 } 840 898 } … … 865 923 var data = payload[1] 866 924 self.readyState = data.readyState 925 ;;; self.logger.debug('readystatechange', self.readyState) 867 926 if (data.status) { 868 927 self.status = data.status 928 ;;; self.logger.debug('status', data.status) 869 929 } 870 930 if (data.responseText) { 871 931 self.responseText += data.responseText 932 ;;; self.logger.debug('responseText', data.responseText) 872 933 } 934 ;;; self.logger.debug('doing trigger'); 873 935 self.onreadystatechange(); 936 ;;; self.logger.debug('trigger complete'); 874 937 } 875 938 } … … 922 985 Orbited.CometTransports.XHRStream = function() { 923 986 var self = this; 987 self.name = 'xhrstream' 924 988 var url = null; 925 989 var xhr = null; … … 929 993 var retryTimer = null; 930 994 var buffer = "" 931 var currentArgs = []932 995 var retryInterval = 50 933 996 self.readyState = 0 … … 1133 1196 var receivedHeartbeat = function() { 1134 1197 window.clearTimeout(heartbeatTimer); 1135 //self.logger.debug('clearing heartbeatTimer', heartbeatTimer)1198 ;;; self.logger.debug('clearing heartbeatTimer', heartbeatTimer) 1136 1199 heartbeatTimer = window.setTimeout(function() { 1137 //self.logger.debug('timer', testtimer, 'did it');1200 ;;; self.logger.debug('timer', testtimer, 'did it'); 1138 1201 heartbeatTimeout(); 1139 1202 }, Orbited.settings.HEARTBEAT_TIMEOUT); 1140 1203 var testtimer = heartbeatTimer; 1141 1204 1142 //self.logger.debug('heartbeatTimer is now', heartbeatTimer)1205 ;;; self.logger.debug('heartbeatTimer is now', heartbeatTimer) 1143 1206 } 1144 1207 var heartbeatTimeout = function() { 1145 //self.logger.debug('heartbeat timeout... reconnect')1208 ;;; self.logger.debug('heartbeat timeout... reconnect') 1146 1209 reconnect(); 1147 1210 } … … 1154 1217 id: testAckId, 1155 1218 name: args[1], 1156 args: args.slice(2) 1157 } 1219 data: args[2] 1220 } 1221 // TODO: shouldn't we put this in a window.setTimeout so that user 1222 // code won't mess up our code? 1158 1223 self.onread(packet) 1159 1224 } … … 1167 1232 Orbited.CometTransports.XHRStream.safari3 = 1.0 1168 1233 1234 1235 1236 1237 1238 Orbited.CometTransports.LongPoll = function() { 1239 var self = this; 1240 self.name = 'longpoll' 1241 var url = null; 1242 var xhr = null; 1243 var ackId = null; 1244 var retryTimer = null; 1245 var buffer = "" 1246 var retryInterval = 50 1247 self.readyState = 0 1248 self.onReadFrame = function(frame) {} 1249 self.onclose = function() { } 1250 1251 self.close = function() { 1252 if (self.readyState == 2) { 1253 return 1254 } 1255 if (xhr != null && (xhr.readyState > 1 || xhr.readyState < 4)) { 1256 xhr.onreadystatechange = function() { } 1257 xhr.abort() 1258 xhr = null; 1259 } 1260 self.readyState = 2 1261 window.clearTimeout(retryTimer); 1262 self.onclose(); 1263 } 1264 1265 self.connect = function(_url) { 1266 if (self.readyState == 1) { 1267 throw new Error("Already Connected") 1268 } 1269 url = new Orbited.URL(_url) 1270 if (xhr == null) { 1271 if (url.isSameDomain(location.href)) { 1272 xhr = createXHR(); 1273 } 1274 else { 1275 xhr = new Orbited.XSDR(); 1276 } 1277 } 1278 url.path += '/longpoll' 1279 // url.setQsParameter('transport', 'xhrstream') 1280 self.readyState = 1 1281 open() 1282 } 1283 var open = function() { 1284 try { 1285 if (typeof(ackId) == "number") { 1286 url.setQsParameter('ack', ackId) 1287 } 1288 if (typeof(xhr)== "undefined" || xhr == null) { 1289 throw new Error("how did this happen?"); 1290 } 1291 1292 xhr.open('GET', url.render(), true) 1293 xhr.onreadystatechange = function() { 1294 ;;; self.logger.debug('readystate', xhr.readyState) 1295 switch(xhr.readyState) { 1296 case 4: 1297 try { 1298 xhr.status 1299 } 1300 catch(e) { 1301 // Expoential backoff: Every time we fail to 1302 // reconnect, double the interval. 1303 // TODO cap the max value. 1304 ;;; self.logger.debug("start reconnect Timer (couldn't access xhr.status)") 1305 retryInterval *= 2; 1306 window.setTimeout(reconnect, retryInterval) 1307 return; 1308 } 1309 switch(xhr.status) { 1310 case 200: 1311 process(); 1312 ;;; self.logger.debug("completed request, reconnect immediately") 1313 setTimeout(open, 0) 1314 break; 1315 case 404: 1316 self.close(); 1317 break 1318 case null: 1319 // NOTE: for the XSDR case: 1320 // (we can always get status, but maybe its null) 1321 retryInterval *= 2; 1322 ;;; self.logger.debug("start reconnect Timer (null xhr.status)") 1323 window.setTimeout(reconnect, retryInterval) 1324 break; 1325 default: 1326 // TODO: do we want to retry here? 1327 ;;; self.logger.debug("something broke, xhr.status=", xhr.status) 1328 self.close(); 1329 break 1330 } 1331 } 1332 } 1333 xhr.send(null); 1334 } 1335 catch(e) { 1336 self.close() 1337 } 1338 } 1339 1340 var reconnect = function() { 1341 ;;; self.logger.debug('reconnect...') 1342 if (xhr.readyState < 4 && xhr.readyState > 0) { 1343 xhr.onreadystatechange = function() { 1344 if (xhr.readyState == 4) { 1345 reconnect(); 1346 } 1347 } 1348 ;;; self.logger.debug('do abort..') 1349 xhr.abort(); 1350 window.clearTimeout(heartbeatTimer); 1351 } 1352 else { 1353 ;;; self.logger.debug('reconnect do open') 1354 offset = 0; 1355 setTimeout(open, 0) 1356 } 1357 } 1358 // 12,ab011,hello world 1359 var process = function() { 1360 var commaPos = -1; 1361 var argEnd = null; 1362 var argSize; 1363 var frame = [] 1364 var stream = xhr.responseText; 1365 var offset = 0 1366 1367 1368 var k = 0 1369 while (true) { 1370 k += 1 1371 if (k > 2000) { 1372