server.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. var net = require('net');
  2. var fs = require('fs');
  3. var util = require('util');
  4. var crypto = require('crypto');
  5. var StompFrame = require('./frame').StompFrame;
  6. var StompFrameEmitter = require('./parser').StompFrameEmitter;
  7. //var privateKey = fs.readFileSync('CA/newkeyopen.pem', 'ascii');
  8. //var certificate = fs.readFileSync('CA/newcert.pem', 'ascii');
  9. //var certificateAuthority = fs.readFileSync('CA/demoCA/private/cakey.pem', 'ascii');
  10. /*
  11. var credentials = crypto.createCredentials({
  12. key: privateKey,
  13. cert: certificate,
  14. ca: certificateAuthority,
  15. });
  16. */
  17. var StompClientCommands = ['CONNECT', 'SEND', 'SUBSCRIBE', 'UNSUBSCRIBE', 'BEGIN', 'COMMIT', 'ACK', 'ABORT', 'DISCONNECT'];
  18. function StompSubscription(stream, session, ack) {
  19. this.ack = ack;
  20. this.session = session;
  21. this.stream = stream;
  22. }
  23. StompSubscription.prototype.send = function(stompFrame) {
  24. stompFrame.send(this.stream);
  25. };
  26. function StompQueueManager() {
  27. this.queues = {};
  28. this.msgId = 0;
  29. this.sessionId = 0;
  30. }
  31. StompQueueManager.prototype.generateMessageId = function() {
  32. return this.msgId++;
  33. };
  34. StompQueueManager.prototype.generateSessionId = function() {
  35. return this.sessionId++;
  36. };
  37. StompQueueManager.prototype.subscribe = function(queue, stream, session, ack) {
  38. if (!(queue in this.queues)) {
  39. this.queues[queue] = [];
  40. }
  41. this.queues[queue].push(new StompSubscription(stream, session, ack));
  42. };
  43. StompQueueManager.prototype.publish = function(queue, message) {
  44. if (!(queue in this.queues)) {
  45. throw new StompFrame({
  46. command: 'ERROR',
  47. headers: {
  48. message: 'Queue does not exist'
  49. },
  50. body: 'Queue "' + frame.headers.destination + '" does not exist'
  51. });
  52. }
  53. var message = new StompFrame({
  54. command: 'MESSAGE',
  55. headers: {
  56. 'destination': queue,
  57. 'message-id': this.generateMessageId(),
  58. },
  59. body: message,
  60. });
  61. this.queues[queue].map(function(subscription) {
  62. subscription.send(message);
  63. });
  64. };
  65. StompQueueManager.prototype.unsubscribe = function(queue, session) {
  66. if (!(queue in this.queues)) {
  67. throw new StompFrame({
  68. command: 'ERROR',
  69. headers: {
  70. message: 'Queue does not exist'
  71. },
  72. body: 'Queue "' + frame.headers.destination + '" does not exist'
  73. });
  74. }
  75. // TODO: Profile this
  76. this.queues[queue] = this.queues[queue].filter(function(subscription) {
  77. return (subscription.session != session);
  78. });
  79. };
  80. function StompStreamHandler(stream, queueManager) {
  81. var frameEmitter = new StompFrameEmitter(StompClientCommands);
  82. var authenticated = false;
  83. var sessionId = -1;
  84. var subscriptions = [];
  85. var transactions = {};
  86. stream.on('data', function(data) {
  87. frameEmitter.handleData(data);
  88. });
  89. stream.on('end', function() {
  90. subscriptions.map(function(queue) {
  91. queueManager.unsubscribe(queue, sessionId);
  92. });
  93. stream.end();
  94. });
  95. frameEmitter.on('frame', function(frame) {
  96. if (!authenticated && frame.command != 'CONNECT') {
  97. new StompFrame({
  98. command: 'ERROR',
  99. headers: {
  100. message: 'Not connected'
  101. },
  102. body: 'You must first issue a CONNECT command'
  103. }).send(stream);
  104. return;
  105. }
  106. if (frame.command != 'CONNECT' && 'receipt' in frame.headers) {
  107. new StompFrame({
  108. command: 'RECEIPT',
  109. headers: {
  110. 'receipt-id': frame.headers.receipt
  111. }
  112. }).send(stream);
  113. }
  114. try {
  115. switch (frame.command) {
  116. case 'CONNECT':
  117. // TODO: Actual authentication
  118. authenticated = true;
  119. sessionId = queueManager.generateSessionId();
  120. new StompFrame({
  121. command: 'CONNECTED',
  122. headers: {
  123. session: sessionId
  124. }
  125. }).send(stream);
  126. break;
  127. case 'SUBSCRIBE':
  128. queueManager.subscribe(frame.headers.destination, stream, sessionId, frame.headers.ack || "auto");
  129. subscriptions.push(frame.headers.destination);
  130. break;
  131. case 'UNSUBSCRIBE':
  132. queueManager.unsubscribe(frame.headers.destination, sessionId);
  133. break;
  134. case 'SEND':
  135. queueManager.publish(frame.headers.destination, frame.body);
  136. break;
  137. case 'BEGIN':
  138. if (frame.headers.transaction in transactions) {
  139. throw new StompFrame({
  140. command: 'ERROR',
  141. headers: {
  142. message: 'Transaction already exists'
  143. },
  144. body: 'Transaction "' + frame.headers.transaction + '" already exists'
  145. });
  146. }
  147. transactions[frame.headers.transaction] = [];
  148. break;
  149. case 'COMMIT':
  150. // TODO: Actually apply the transaction, this is just an abort
  151. delete transactions[frame.headers.transaction];
  152. break;
  153. case 'ABORT':
  154. delete transactions[frame.headers.transaction];
  155. break;
  156. case 'DISCONECT':
  157. subscriptions.map(function(queue) {
  158. queueManager.unsubscribe(queue, sessionId);
  159. });
  160. stream.end();
  161. break;
  162. }
  163. } catch (e) {
  164. e.send(stream);
  165. }
  166. });
  167. frameEmitter.on('error', function(err) {
  168. var response = new StompFrame();
  169. response.setCommand('ERROR');
  170. response.setHeader('message', err['message']);
  171. if ('details' in err) {
  172. response.appendToBody(err['details']);
  173. }
  174. response.send(stream);
  175. });
  176. }
  177. function StompServer(port) {
  178. this.port = port;
  179. var queueManager = new StompQueueManager();
  180. this.server = net.createServer(function(stream) {
  181. stream.on('connect', function() {
  182. console.log('Received Unsecured Connection');
  183. new StompStreamHandler(stream, queueManager);
  184. });
  185. });
  186. }
  187. function SecureStompServer(port, credentials) {
  188. StompServer.call(this);
  189. var queueManager = new StompQueueManager();
  190. this.port = port;
  191. this.server = net.createServer(function(stream) {
  192. stream.on('connect', function() {
  193. console.log('Received Connection, securing');
  194. stream.setSecure(credentials);
  195. });
  196. stream.on('secure', function() {
  197. new StompStreamHandler(stream, queueManager);
  198. });
  199. });
  200. }
  201. util.inherits(SecureStompServer, StompServer);
  202. StompServer.prototype.listen = function() {
  203. this.server.listen(this.port, 'localhost');
  204. };
  205. StompServer.prototype.stop = function(port) {
  206. this.server.close();
  207. };
  208. //new SecureStompServer(8124, credentials).listen();
  209. exports.createStompServer = function(port) {
  210. return new StompServer(port);
  211. };