socket.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745
  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports/index');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parsejson = require('parsejson');
  11. var parseqs = require('parseqs');
  12. /**
  13. * Module exports.
  14. */
  15. module.exports = Socket;
  16. /**
  17. * Socket constructor.
  18. *
  19. * @param {String|Object} uri or options
  20. * @param {Object} options
  21. * @api public
  22. */
  23. function Socket (uri, opts) {
  24. if (!(this instanceof Socket)) return new Socket(uri, opts);
  25. opts = opts || {};
  26. if (uri && 'object' === typeof uri) {
  27. opts = uri;
  28. uri = null;
  29. }
  30. if (uri) {
  31. uri = parseuri(uri);
  32. opts.hostname = uri.host;
  33. opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
  34. opts.port = uri.port;
  35. if (uri.query) opts.query = uri.query;
  36. } else if (opts.host) {
  37. opts.hostname = parseuri(opts.host).host;
  38. }
  39. this.secure = null != opts.secure ? opts.secure
  40. : (global.location && 'https:' === location.protocol);
  41. if (opts.hostname && !opts.port) {
  42. // if no port is specified manually, use the protocol default
  43. opts.port = this.secure ? '443' : '80';
  44. }
  45. this.agent = opts.agent || false;
  46. this.hostname = opts.hostname ||
  47. (global.location ? location.hostname : 'localhost');
  48. this.port = opts.port || (global.location && location.port
  49. ? location.port
  50. : (this.secure ? 443 : 80));
  51. this.query = opts.query || {};
  52. if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
  53. this.upgrade = false !== opts.upgrade;
  54. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  55. this.forceJSONP = !!opts.forceJSONP;
  56. this.jsonp = false !== opts.jsonp;
  57. this.forceBase64 = !!opts.forceBase64;
  58. this.enablesXDR = !!opts.enablesXDR;
  59. this.timestampParam = opts.timestampParam || 't';
  60. this.timestampRequests = opts.timestampRequests;
  61. this.transports = opts.transports || ['polling', 'websocket'];
  62. this.transportOptions = opts.transportOptions || {};
  63. this.readyState = '';
  64. this.writeBuffer = [];
  65. this.prevBufferLen = 0;
  66. this.policyPort = opts.policyPort || 843;
  67. this.rememberUpgrade = opts.rememberUpgrade || false;
  68. this.binaryType = null;
  69. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  70. this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
  71. if (true === this.perMessageDeflate) this.perMessageDeflate = {};
  72. if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
  73. this.perMessageDeflate.threshold = 1024;
  74. }
  75. // SSL options for Node.js client
  76. this.pfx = opts.pfx || null;
  77. this.key = opts.key || null;
  78. this.passphrase = opts.passphrase || null;
  79. this.cert = opts.cert || null;
  80. this.ca = opts.ca || null;
  81. this.ciphers = opts.ciphers || null;
  82. this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? true : opts.rejectUnauthorized;
  83. this.forceNode = !!opts.forceNode;
  84. // other options for Node.js client
  85. var freeGlobal = typeof global === 'object' && global;
  86. if (freeGlobal.global === freeGlobal) {
  87. if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
  88. this.extraHeaders = opts.extraHeaders;
  89. }
  90. if (opts.localAddress) {
  91. this.localAddress = opts.localAddress;
  92. }
  93. }
  94. // set on handshake
  95. this.id = null;
  96. this.upgrades = null;
  97. this.pingInterval = null;
  98. this.pingTimeout = null;
  99. // set on heartbeat
  100. this.pingIntervalTimer = null;
  101. this.pingTimeoutTimer = null;
  102. this.open();
  103. }
  104. Socket.priorWebsocketSuccess = false;
  105. /**
  106. * Mix in `Emitter`.
  107. */
  108. Emitter(Socket.prototype);
  109. /**
  110. * Protocol version.
  111. *
  112. * @api public
  113. */
  114. Socket.protocol = parser.protocol; // this is an int
  115. /**
  116. * Expose deps for legacy compatibility
  117. * and standalone browser access.
  118. */
  119. Socket.Socket = Socket;
  120. Socket.Transport = require('./transport');
  121. Socket.transports = require('./transports/index');
  122. Socket.parser = require('engine.io-parser');
  123. /**
  124. * Creates transport of the given type.
  125. *
  126. * @param {String} transport name
  127. * @return {Transport}
  128. * @api private
  129. */
  130. Socket.prototype.createTransport = function (name) {
  131. debug('creating transport "%s"', name);
  132. var query = clone(this.query);
  133. // append engine.io protocol identifier
  134. query.EIO = parser.protocol;
  135. // transport name
  136. query.transport = name;
  137. // per-transport options
  138. var options = this.transportOptions[name] || {};
  139. // session id if we already have one
  140. if (this.id) query.sid = this.id;
  141. var transport = new transports[name]({
  142. query: query,
  143. socket: this,
  144. agent: options.agent || this.agent,
  145. hostname: options.hostname || this.hostname,
  146. port: options.port || this.port,
  147. secure: options.secure || this.secure,
  148. path: options.path || this.path,
  149. forceJSONP: options.forceJSONP || this.forceJSONP,
  150. jsonp: options.jsonp || this.jsonp,
  151. forceBase64: options.forceBase64 || this.forceBase64,
  152. enablesXDR: options.enablesXDR || this.enablesXDR,
  153. timestampRequests: options.timestampRequests || this.timestampRequests,
  154. timestampParam: options.timestampParam || this.timestampParam,
  155. policyPort: options.policyPort || this.policyPort,
  156. pfx: options.pfx || this.pfx,
  157. key: options.key || this.key,
  158. passphrase: options.passphrase || this.passphrase,
  159. cert: options.cert || this.cert,
  160. ca: options.ca || this.ca,
  161. ciphers: options.ciphers || this.ciphers,
  162. rejectUnauthorized: options.rejectUnauthorized || this.rejectUnauthorized,
  163. perMessageDeflate: options.perMessageDeflate || this.perMessageDeflate,
  164. extraHeaders: options.extraHeaders || this.extraHeaders,
  165. forceNode: options.forceNode || this.forceNode,
  166. localAddress: options.localAddress || this.localAddress,
  167. requestTimeout: options.requestTimeout || this.requestTimeout,
  168. protocols: options.protocols || void (0)
  169. });
  170. return transport;
  171. };
  172. function clone (obj) {
  173. var o = {};
  174. for (var i in obj) {
  175. if (obj.hasOwnProperty(i)) {
  176. o[i] = obj[i];
  177. }
  178. }
  179. return o;
  180. }
  181. /**
  182. * Initializes transport to use and starts probe.
  183. *
  184. * @api private
  185. */
  186. Socket.prototype.open = function () {
  187. var transport;
  188. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
  189. transport = 'websocket';
  190. } else if (0 === this.transports.length) {
  191. // Emit error on next tick so it can be listened to
  192. var self = this;
  193. setTimeout(function () {
  194. self.emit('error', 'No transports available');
  195. }, 0);
  196. return;
  197. } else {
  198. transport = this.transports[0];
  199. }
  200. this.readyState = 'opening';
  201. // Retry with the next transport if the transport is disabled (jsonp: false)
  202. try {
  203. transport = this.createTransport(transport);
  204. } catch (e) {
  205. this.transports.shift();
  206. this.open();
  207. return;
  208. }
  209. transport.open();
  210. this.setTransport(transport);
  211. };
  212. /**
  213. * Sets the current transport. Disables the existing one (if any).
  214. *
  215. * @api private
  216. */
  217. Socket.prototype.setTransport = function (transport) {
  218. debug('setting transport %s', transport.name);
  219. var self = this;
  220. if (this.transport) {
  221. debug('clearing existing transport %s', this.transport.name);
  222. this.transport.removeAllListeners();
  223. }
  224. // set up transport
  225. this.transport = transport;
  226. // set up transport listeners
  227. transport
  228. .on('drain', function () {
  229. self.onDrain();
  230. })
  231. .on('packet', function (packet) {
  232. self.onPacket(packet);
  233. })
  234. .on('error', function (e) {
  235. self.onError(e);
  236. })
  237. .on('close', function () {
  238. self.onClose('transport close');
  239. });
  240. };
  241. /**
  242. * Probes a transport.
  243. *
  244. * @param {String} transport name
  245. * @api private
  246. */
  247. Socket.prototype.probe = function (name) {
  248. debug('probing transport "%s"', name);
  249. var transport = this.createTransport(name, { probe: 1 });
  250. var failed = false;
  251. var self = this;
  252. Socket.priorWebsocketSuccess = false;
  253. function onTransportOpen () {
  254. if (self.onlyBinaryUpgrades) {
  255. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  256. failed = failed || upgradeLosesBinary;
  257. }
  258. if (failed) return;
  259. debug('probe transport "%s" opened', name);
  260. transport.send([{ type: 'ping', data: 'probe' }]);
  261. transport.once('packet', function (msg) {
  262. if (failed) return;
  263. if ('pong' === msg.type && 'probe' === msg.data) {
  264. debug('probe transport "%s" pong', name);
  265. self.upgrading = true;
  266. self.emit('upgrading', transport);
  267. if (!transport) return;
  268. Socket.priorWebsocketSuccess = 'websocket' === transport.name;
  269. debug('pausing current transport "%s"', self.transport.name);
  270. self.transport.pause(function () {
  271. if (failed) return;
  272. if ('closed' === self.readyState) return;
  273. debug('changing transport and sending upgrade packet');
  274. cleanup();
  275. self.setTransport(transport);
  276. transport.send([{ type: 'upgrade' }]);
  277. self.emit('upgrade', transport);
  278. transport = null;
  279. self.upgrading = false;
  280. self.flush();
  281. });
  282. } else {
  283. debug('probe transport "%s" failed', name);
  284. var err = new Error('probe error');
  285. err.transport = transport.name;
  286. self.emit('upgradeError', err);
  287. }
  288. });
  289. }
  290. function freezeTransport () {
  291. if (failed) return;
  292. // Any callback called by transport should be ignored since now
  293. failed = true;
  294. cleanup();
  295. transport.close();
  296. transport = null;
  297. }
  298. // Handle any error that happens while probing
  299. function onerror (err) {
  300. var error = new Error('probe error: ' + err);
  301. error.transport = transport.name;
  302. freezeTransport();
  303. debug('probe transport "%s" failed because of error: %s', name, err);
  304. self.emit('upgradeError', error);
  305. }
  306. function onTransportClose () {
  307. onerror('transport closed');
  308. }
  309. // When the socket is closed while we're probing
  310. function onclose () {
  311. onerror('socket closed');
  312. }
  313. // When the socket is upgraded while we're probing
  314. function onupgrade (to) {
  315. if (transport && to.name !== transport.name) {
  316. debug('"%s" works - aborting "%s"', to.name, transport.name);
  317. freezeTransport();
  318. }
  319. }
  320. // Remove all listeners on the transport and on self
  321. function cleanup () {
  322. transport.removeListener('open', onTransportOpen);
  323. transport.removeListener('error', onerror);
  324. transport.removeListener('close', onTransportClose);
  325. self.removeListener('close', onclose);
  326. self.removeListener('upgrading', onupgrade);
  327. }
  328. transport.once('open', onTransportOpen);
  329. transport.once('error', onerror);
  330. transport.once('close', onTransportClose);
  331. this.once('close', onclose);
  332. this.once('upgrading', onupgrade);
  333. transport.open();
  334. };
  335. /**
  336. * Called when connection is deemed open.
  337. *
  338. * @api public
  339. */
  340. Socket.prototype.onOpen = function () {
  341. debug('socket open');
  342. this.readyState = 'open';
  343. Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  344. this.emit('open');
  345. this.flush();
  346. // we check for `readyState` in case an `open`
  347. // listener already closed the socket
  348. if ('open' === this.readyState && this.upgrade && this.transport.pause) {
  349. debug('starting upgrade probes');
  350. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  351. this.probe(this.upgrades[i]);
  352. }
  353. }
  354. };
  355. /**
  356. * Handles a packet.
  357. *
  358. * @api private
  359. */
  360. Socket.prototype.onPacket = function (packet) {
  361. if ('opening' === this.readyState || 'open' === this.readyState ||
  362. 'closing' === this.readyState) {
  363. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  364. this.emit('packet', packet);
  365. // Socket is live - any packet counts
  366. this.emit('heartbeat');
  367. switch (packet.type) {
  368. case 'open':
  369. this.onHandshake(parsejson(packet.data));
  370. break;
  371. case 'pong':
  372. this.setPing();
  373. this.emit('pong');
  374. break;
  375. case 'error':
  376. var err = new Error('server error');
  377. err.code = packet.data;
  378. this.onError(err);
  379. break;
  380. case 'message':
  381. this.emit('data', packet.data);
  382. this.emit('message', packet.data);
  383. break;
  384. }
  385. } else {
  386. debug('packet received with socket readyState "%s"', this.readyState);
  387. }
  388. };
  389. /**
  390. * Called upon handshake completion.
  391. *
  392. * @param {Object} handshake obj
  393. * @api private
  394. */
  395. Socket.prototype.onHandshake = function (data) {
  396. this.emit('handshake', data);
  397. this.id = data.sid;
  398. this.transport.query.sid = data.sid;
  399. this.upgrades = this.filterUpgrades(data.upgrades);
  400. this.pingInterval = data.pingInterval;
  401. this.pingTimeout = data.pingTimeout;
  402. this.onOpen();
  403. // In case open handler closes socket
  404. if ('closed' === this.readyState) return;
  405. this.setPing();
  406. // Prolong liveness of socket on heartbeat
  407. this.removeListener('heartbeat', this.onHeartbeat);
  408. this.on('heartbeat', this.onHeartbeat);
  409. };
  410. /**
  411. * Resets ping timeout.
  412. *
  413. * @api private
  414. */
  415. Socket.prototype.onHeartbeat = function (timeout) {
  416. clearTimeout(this.pingTimeoutTimer);
  417. var self = this;
  418. self.pingTimeoutTimer = setTimeout(function () {
  419. if ('closed' === self.readyState) return;
  420. self.onClose('ping timeout');
  421. }, timeout || (self.pingInterval + self.pingTimeout));
  422. };
  423. /**
  424. * Pings server every `this.pingInterval` and expects response
  425. * within `this.pingTimeout` or closes connection.
  426. *
  427. * @api private
  428. */
  429. Socket.prototype.setPing = function () {
  430. var self = this;
  431. clearTimeout(self.pingIntervalTimer);
  432. self.pingIntervalTimer = setTimeout(function () {
  433. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  434. self.ping();
  435. self.onHeartbeat(self.pingTimeout);
  436. }, self.pingInterval);
  437. };
  438. /**
  439. * Sends a ping packet.
  440. *
  441. * @api private
  442. */
  443. Socket.prototype.ping = function () {
  444. var self = this;
  445. this.sendPacket('ping', function () {
  446. self.emit('ping');
  447. });
  448. };
  449. /**
  450. * Called on `drain` event
  451. *
  452. * @api private
  453. */
  454. Socket.prototype.onDrain = function () {
  455. this.writeBuffer.splice(0, this.prevBufferLen);
  456. // setting prevBufferLen = 0 is very important
  457. // for example, when upgrading, upgrade packet is sent over,
  458. // and a nonzero prevBufferLen could cause problems on `drain`
  459. this.prevBufferLen = 0;
  460. if (0 === this.writeBuffer.length) {
  461. this.emit('drain');
  462. } else {
  463. this.flush();
  464. }
  465. };
  466. /**
  467. * Flush write buffers.
  468. *
  469. * @api private
  470. */
  471. Socket.prototype.flush = function () {
  472. if ('closed' !== this.readyState && this.transport.writable &&
  473. !this.upgrading && this.writeBuffer.length) {
  474. debug('flushing %d packets in socket', this.writeBuffer.length);
  475. this.transport.send(this.writeBuffer);
  476. // keep track of current length of writeBuffer
  477. // splice writeBuffer and callbackBuffer on `drain`
  478. this.prevBufferLen = this.writeBuffer.length;
  479. this.emit('flush');
  480. }
  481. };
  482. /**
  483. * Sends a message.
  484. *
  485. * @param {String} message.
  486. * @param {Function} callback function.
  487. * @param {Object} options.
  488. * @return {Socket} for chaining.
  489. * @api public
  490. */
  491. Socket.prototype.write =
  492. Socket.prototype.send = function (msg, options, fn) {
  493. this.sendPacket('message', msg, options, fn);
  494. return this;
  495. };
  496. /**
  497. * Sends a packet.
  498. *
  499. * @param {String} packet type.
  500. * @param {String} data.
  501. * @param {Object} options.
  502. * @param {Function} callback function.
  503. * @api private
  504. */
  505. Socket.prototype.sendPacket = function (type, data, options, fn) {
  506. if ('function' === typeof data) {
  507. fn = data;
  508. data = undefined;
  509. }
  510. if ('function' === typeof options) {
  511. fn = options;
  512. options = null;
  513. }
  514. if ('closing' === this.readyState || 'closed' === this.readyState) {
  515. return;
  516. }
  517. options = options || {};
  518. options.compress = false !== options.compress;
  519. var packet = {
  520. type: type,
  521. data: data,
  522. options: options
  523. };
  524. this.emit('packetCreate', packet);
  525. this.writeBuffer.push(packet);
  526. if (fn) this.once('flush', fn);
  527. this.flush();
  528. };
  529. /**
  530. * Closes the connection.
  531. *
  532. * @api private
  533. */
  534. Socket.prototype.close = function () {
  535. if ('opening' === this.readyState || 'open' === this.readyState) {
  536. this.readyState = 'closing';
  537. var self = this;
  538. if (this.writeBuffer.length) {
  539. this.once('drain', function () {
  540. if (this.upgrading) {
  541. waitForUpgrade();
  542. } else {
  543. close();
  544. }
  545. });
  546. } else if (this.upgrading) {
  547. waitForUpgrade();
  548. } else {
  549. close();
  550. }
  551. }
  552. function close () {
  553. self.onClose('forced close');
  554. debug('socket closing - telling transport to close');
  555. self.transport.close();
  556. }
  557. function cleanupAndClose () {
  558. self.removeListener('upgrade', cleanupAndClose);
  559. self.removeListener('upgradeError', cleanupAndClose);
  560. close();
  561. }
  562. function waitForUpgrade () {
  563. // wait for upgrade to finish since we can't send packets while pausing a transport
  564. self.once('upgrade', cleanupAndClose);
  565. self.once('upgradeError', cleanupAndClose);
  566. }
  567. return this;
  568. };
  569. /**
  570. * Called upon transport error
  571. *
  572. * @api private
  573. */
  574. Socket.prototype.onError = function (err) {
  575. debug('socket error %j', err);
  576. Socket.priorWebsocketSuccess = false;
  577. this.emit('error', err);
  578. this.onClose('transport error', err);
  579. };
  580. /**
  581. * Called upon transport close.
  582. *
  583. * @api private
  584. */
  585. Socket.prototype.onClose = function (reason, desc) {
  586. if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
  587. debug('socket close with reason: "%s"', reason);
  588. var self = this;
  589. // clear timers
  590. clearTimeout(this.pingIntervalTimer);
  591. clearTimeout(this.pingTimeoutTimer);
  592. // stop event from firing again for transport
  593. this.transport.removeAllListeners('close');
  594. // ensure transport won't stay open
  595. this.transport.close();
  596. // ignore further transport communication
  597. this.transport.removeAllListeners();
  598. // set ready state
  599. this.readyState = 'closed';
  600. // clear session id
  601. this.id = null;
  602. // emit close event
  603. this.emit('close', reason, desc);
  604. // clean buffers after, so users can still
  605. // grab the buffers on `close` event
  606. self.writeBuffer = [];
  607. self.prevBufferLen = 0;
  608. }
  609. };
  610. /**
  611. * Filters upgrades, returning only those matching client transports.
  612. *
  613. * @param {Array} server upgrades
  614. * @api private
  615. *
  616. */
  617. Socket.prototype.filterUpgrades = function (upgrades) {
  618. var filteredUpgrades = [];
  619. for (var i = 0, j = upgrades.length; i < j; i++) {
  620. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  621. }
  622. return filteredUpgrades;
  623. };