client.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. var assert = require('assert');
  2. var net = require('net');
  3. var tls = require('tls');
  4. var util = require('util');
  5. var events = require('events');
  6. var StompFrame = require('./frame').StompFrame;
  7. var StompFrameEmitter = require('./parser').StompFrameEmitter;
  8. // Copied from modern node util._extend, because it didn't exist
  9. // in node 0.4.
  10. function _extend(origin, add) {
  11. // Don't do anything if add isn't an object
  12. if (!add || typeof add !== 'object') return origin;
  13. var keys = Object.keys(add);
  14. var i = keys.length;
  15. while (i--) {
  16. origin[keys[i]] = add[keys[i]];
  17. }
  18. return origin;
  19. };
  20. // Inbound frame validators
  21. var StompFrameCommands = {
  22. '1.0': {
  23. 'CONNECTED': {
  24. 'headers': { 'session': { required: true } }
  25. },
  26. 'MESSAGE' : {
  27. 'headers': {
  28. 'destination': { required: true },
  29. 'message-id': { required: true }
  30. }
  31. },
  32. 'ERROR': {},
  33. 'RECEIPT': {}
  34. },
  35. '1.1': {
  36. 'CONNECTED': {
  37. 'headers': { 'session': { required: true } }
  38. },
  39. 'MESSAGE' : {
  40. 'headers': {
  41. 'destination': { required: true },
  42. 'message-id': { required: true }
  43. }
  44. },
  45. 'ERROR': {},
  46. 'RECEIPT': {}
  47. }
  48. };
  49. function StompClient(opts) {
  50. var address, port, user, pass, protocolVersion, vhost, reconnectOpts, tlsOpts;
  51. if(arguments.length !== 1 || typeof opts === 'string') {
  52. address = opts;
  53. port = arguments[1];
  54. user = arguments[2];
  55. pass = arguments[3];
  56. protocolVersion = arguments[4];
  57. vhost = arguments[5];
  58. reconnectOpts = arguments[6];
  59. tlsOpts = arguments[7];
  60. if(tlsOpts === true) {
  61. tlsOpts = {};
  62. }
  63. }
  64. else {
  65. address = opts.address || opts.host;
  66. port = opts.port;
  67. user = opts.user;
  68. pass = opts.pass;
  69. protocolVersion = opts.protocolVersion;
  70. vhost = opts.vhost;
  71. reconnectOpts = opts.reconnectOpts;
  72. tlsOpts = opts.tls;
  73. // If boolean then TLS options are mixed in with other options
  74. if(tlsOpts === true) {
  75. tlsOpts = opts;
  76. }
  77. }
  78. events.EventEmitter.call(this);
  79. this.user = (user || '');
  80. this.pass = (pass || '');
  81. this.address = (address || '127.0.0.1');
  82. this.port = (port || 61613);
  83. this.version = (protocolVersion || '1.0');
  84. this.subscriptions = {};
  85. assert(StompFrameCommands[this.version], 'STOMP version '+this.version+' is not supported');
  86. this._stompFrameEmitter = new StompFrameEmitter(StompFrameCommands[this.version]);
  87. this.vhost = vhost || null;
  88. this.reconnectOpts = reconnectOpts || {};
  89. this.tls = tlsOpts;
  90. this._retryNumber = 0;
  91. this._retryDelay = this.reconnectOpts.delay;
  92. return this;
  93. }
  94. util.inherits(StompClient, events.EventEmitter);
  95. StompClient.prototype.connect = function (connectedCallback, errorCallback) {
  96. var self = this;
  97. //reset this field.
  98. delete this._disconnectCallback;
  99. if (errorCallback) {
  100. self.on('error', errorCallback);
  101. }
  102. var connectEvent;
  103. if(this.tls) {
  104. self.stream = tls.connect(self.port, self.address, this.tls);
  105. connectEvent = 'secureConnect';
  106. }
  107. else {
  108. self.stream = net.createConnection(self.port, self.address);
  109. connectEvent = 'connect';
  110. }
  111. self.stream.on(connectEvent, self.onConnect.bind(this));
  112. self.stream.on('error', function(err) {
  113. process.nextTick(function() {
  114. //clear all of the stomp frame emitter listeners - we don't need them, we've disconnected.
  115. self._stompFrameEmitter.removeAllListeners();
  116. });
  117. if (self._retryNumber < self.reconnectOpts.retries) {
  118. if (self._retryNumber === 0) {
  119. //we're disconnected, but we're going to try and reconnect.
  120. self.emit('reconnecting');
  121. }
  122. self._reconnectTimer = setTimeout(function() {
  123. self.connect();
  124. }, self._retryNumber++ * self.reconnectOpts.delay)
  125. } else {
  126. if (self._retryNumber === self.reconnectOpts.retries) {
  127. err.message += ' [reconnect attempts reached]';
  128. err.reconnectionFailed = true;
  129. }
  130. self.emit('error', err);
  131. }
  132. });
  133. if (connectedCallback) {
  134. self.on('connect', connectedCallback);
  135. }
  136. return this;
  137. };
  138. StompClient.prototype.disconnect = function (callback) {
  139. var self = this;
  140. //just a bit of housekeeping. Remove the no-longer-useful reconnect timer.
  141. if (self._reconnectTimer) {
  142. clearTimeout(self._reconnectTimer);
  143. }
  144. if (this.stream) {
  145. //provide a default no-op function as the callback is optional
  146. this._disconnectCallback = callback || function() {};
  147. var frame = new StompFrame({
  148. command: 'DISCONNECT'
  149. }).send(this.stream);
  150. process.nextTick(function() {
  151. self.stream.end();
  152. });
  153. }
  154. return this;
  155. };
  156. StompClient.prototype.onConnect = function() {
  157. var self = this;
  158. // First set up the frame parser
  159. var frameEmitter = self._stompFrameEmitter;
  160. self.stream.on('data', function(data) {
  161. frameEmitter.handleData(data);
  162. });
  163. self.stream.on('end', function() {
  164. if (self._disconnectCallback) {
  165. self._disconnectCallback();
  166. } else {
  167. self.stream.emit('error', new Error('Server has gone away'));
  168. }
  169. });
  170. frameEmitter.on('MESSAGE', function(frame) {
  171. var subscribed = self.subscriptions[frame.headers.destination];
  172. // .unsubscribe() deletes the subscribed callbacks from the subscriptions,
  173. // but until that UNSUBSCRIBE message is processed, we might still get
  174. // MESSAGE. Check to make sure we don't call .map() on null.
  175. if (subscribed) {
  176. subscribed.listeners.map(function(callback) {
  177. callback(frame.body, frame.headers);
  178. });
  179. }
  180. self.emit('message', frame.body, frame.headers);
  181. });
  182. frameEmitter.on('CONNECTED', function(frame) {
  183. if (self._retryNumber > 0) {
  184. //handle a reconnection differently to the initial connection.
  185. self.emit('reconnect', frame.headers.session, self._retryNumber);
  186. self._retryNumber = 0;
  187. } else {
  188. self.emit('connect', frame.headers.session);
  189. }
  190. });
  191. frameEmitter.on('ERROR', function(frame) {
  192. var er = new Error(frame.headers.message);
  193. // frame.headers used to be passed as er, so put the headers on er object
  194. _extend(er, frame.headers);
  195. self.emit('error', er, frame.body);
  196. });
  197. frameEmitter.on('parseError', function(err) {
  198. // XXX(sam) err should be an Error object to more easily track the
  199. // point of error detection, but it isn't, so create one now.
  200. var er = new Error(err.message);
  201. if (err.details) {
  202. er.details = err.details;
  203. }
  204. self.emit('error', er);
  205. self.stream.destroy();
  206. });
  207. // Send the CONNECT frame
  208. var headers = {
  209. 'login': self.user,
  210. 'passcode': self.pass
  211. };
  212. if(this.vhost && this.version === '1.1')
  213. headers.host = this.vhost;
  214. var frame = new StompFrame({
  215. command: 'CONNECT',
  216. headers: headers
  217. }).send(self.stream);
  218. //if we've just reconnected, we'll need to re-subscribe
  219. for (var queue in self.subscriptions) {
  220. new StompFrame({
  221. command: 'SUBSCRIBE',
  222. headers: self.subscriptions[queue].headers
  223. }).send(self.stream);
  224. }
  225. };
  226. StompClient.prototype.subscribe = function(queue, _headers, _callback) {
  227. // Allow _headers or callback in any order, for backwards compat: so headers
  228. // is whichever arg is not a function, callback is whatever is left over.
  229. var callback;
  230. if (typeof _headers === 'function') {
  231. callback = _headers;
  232. _headers = null;
  233. }
  234. if (typeof _callback === 'function') {
  235. callback = _callback;
  236. _callback = null;
  237. }
  238. // Error now, preventing errors thrown from inside the 'MESSAGE' event handler
  239. assert(callback, 'callback is mandatory on subscribe');
  240. var headers = _extend({}, _headers || _callback);
  241. headers.destination = queue;
  242. if (!(queue in this.subscriptions)) {
  243. this.subscriptions[queue] = {
  244. listeners: [],
  245. headers: headers
  246. };
  247. new StompFrame({
  248. command: 'SUBSCRIBE',
  249. headers: headers
  250. }).send(this.stream);
  251. }
  252. this.subscriptions[queue].listeners.push(callback);
  253. return this;
  254. };
  255. // no need to pass a callback parameter as there is no acknowledgment for
  256. // successful UNSUBSCRIBE from the STOMP server
  257. StompClient.prototype.unsubscribe = function (queue, headers) {
  258. headers = _extend({}, headers);
  259. headers.destination = queue;
  260. new StompFrame({
  261. command: 'UNSUBSCRIBE',
  262. headers: headers
  263. }).send(this.stream);
  264. delete this.subscriptions[queue];
  265. return this;
  266. };
  267. StompClient.prototype.publish = function(queue, message, headers) {
  268. headers = _extend({}, headers);
  269. headers.destination = queue;
  270. new StompFrame({
  271. command: 'SEND',
  272. headers: headers,
  273. body: message
  274. }).send(this.stream);
  275. return this;
  276. };
  277. function sendAckNack(acknack, messageId, subscription, transaction) {
  278. var headers = {
  279. 'message-id': messageId,
  280. 'subscription': subscription
  281. };
  282. if(transaction) {
  283. headers['transaction'] = transaction;
  284. }
  285. new StompFrame({
  286. command: acknack,
  287. headers: headers
  288. }).send(this.stream);
  289. }
  290. StompClient.prototype.ack = function(messageId, subscription, transaction) {
  291. sendAckNack.call(this, 'ACK', messageId, subscription, transaction);
  292. return this;
  293. };
  294. StompClient.prototype.nack = function(messageId, subscription, transaction) {
  295. sendAckNack.call(this, 'NACK', messageId, subscription, transaction);
  296. return this;
  297. };
  298. Object.defineProperty(StompClient.prototype, 'writable', {
  299. get: function(){
  300. return this.stream && this.stream.writable;
  301. }
  302. });
  303. module.exports = StompClient;
  304. module.exports.StompClient = StompClient;
  305. module.exports.Errors = {
  306. streamNotWritable: 15201
  307. };