proxy.js 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. var _ = require('underscore');
  2. _.str = require('underscore.string');
  3. _.v = require('validator');
  4. var F = require('./common/function');
  5. var crypto = require('crypto')
  6. ,fs = require('fs');
  7. // var heapdump = require('heapdump');
  8. var C = require('./config');
  9. var rpcClient = require('./rpcclient/rpcForSvr');
  10. var WS_PORT = C.websocket_port;
  11. var back_svr = _.str.sprintf("http://%s:%s", C.inner_host, WS_PORT+2);
  12. var io = require('socket.io-client');
  13. process.on('uncaughtException', function (err) {
  14. console.log(err);
  15. console.log(err.stack);
  16. F.addOtherLogs('proxy/proxy',["########err down:",err,err.stack]);
  17. });
  18. //加密
  19. function cipher(buf ,cb){
  20. algorithm = "aes-128-cbc";
  21. var encrypted = "";
  22. var key = new Buffer(C.aes_key);
  23. var iv = new Buffer(C.aes_iv);
  24. var cip = crypto.createCipheriv(algorithm, key,iv);
  25. //cip.setAutoPadding(false);
  26. // buf = customPadding(buf);
  27. encrypted += cip.update(buf, 'utf8', 'base64');
  28. encrypted += cip.final('base64');
  29. cb(encrypted);
  30. }
  31. //解密
  32. function decipher(encrypted,cb){
  33. algorithm = "aes-128-cbc";
  34. var decrypted = "";
  35. var key = new Buffer(C.aes_key);
  36. var iv = new Buffer(C.aes_iv);
  37. var decipher = crypto.createDecipheriv(algorithm, key, iv);
  38. //decipher.setAutoPadding(false);
  39. decrypted += decipher.update(encrypted, 'base64', 'utf8');
  40. decrypted += decipher.final('utf8');
  41. cb(decrypted);
  42. }
  43. // Websocket Server
  44. var socketServer = new (require('ws').Server)({port: WS_PORT});
  45. var connect_suc = 0;
  46. socketServer.on('connection', (socket)=> {
  47. //F.addOtherLogs('proxydebug/proxydebug',[" socket connect:",socket.upgradeReq.url]);
  48. socket.up_time = new Date().getTime();
  49. var param = {"reconnection":false,"force new connection":true,"transports":['websocket']};
  50. var rpc_client = io.connect(back_svr, param);
  51. F.addOtherLogs('proxy/proxy', ["after connect socket num:",socketServer.clients.length]);
  52. // svr event
  53. socket.on('close', function(code, message){
  54. F.addOtherLogs('proxy/proxy', ["after close socket num:",socketServer.clients.length]);
  55. //F.log("debug","close reason: code:%s, message:%s",[code,message]);
  56. rpc_client.disconnect();
  57. rpc_client.removeAllListeners();
  58. });
  59. socket.on('error', function(error) {
  60. console.log(error);
  61. F.addErrLogs(["proxy err:",error]);
  62. rpc_client.disconnect();
  63. rpc_client.removeAllListeners();
  64. });
  65. var emitToIMsvr = function(obj,str) {
  66. try {
  67. var route = obj['route'];
  68. if ("heartbeat" != route) {
  69. //F.addOtherLogs('proxy/proxy',["receive data:",str]);
  70. } else {
  71. //F.addOtherLogs('hb/hb',["receive data:",str]);
  72. }
  73. obj.is_websocket = 1;
  74. rpc_client.emit("all#route", obj);
  75. } catch(err) {
  76. F.addOtherLogs('proxy/proxy',[" emit err:"+err.stack,obj]);
  77. }
  78. }
  79. socket.on("message", function(str) {
  80. socket.up_time = new Date().getTime();
  81. try {
  82. var obj = JSON.parse (str);
  83. if ("ed" in obj) {
  84. socket.ed = true;
  85. let edata = obj["ed"];
  86. decipher(edata,function(decode_str) {
  87. try {
  88. var decode_obj = JSON.parse (decode_str);
  89. emitToIMsvr(decode_obj,decode_str);
  90. } catch(e) {
  91. F.addOtherLogs('proxy/proxy',["decipher parse err:"+e.stack,str]);
  92. }
  93. });
  94. } else {
  95. emitToIMsvr(obj,str);
  96. }
  97. } catch(err) {
  98. F.addOtherLogs('proxy/proxy',[" parse err:"+err.stack,str]);
  99. }
  100. });
  101. // client event
  102. rpc_client.on('connect', function() {
  103. ++connect_suc;
  104. console.log("client socket "+connect_suc+" connect suc");
  105. });
  106. rpc_client.on('disconnect', function() {
  107. console.log("client socket disconnect");
  108. //delete rpc_client.io;
  109. socket.close();
  110. });
  111. rpc_client.on('connect_error', function() {
  112. //console.log("client socket connect err");
  113. //delete rpc_client.io;
  114. socket.close();
  115. });
  116. rpc_client.on('connect_timeout', function() {
  117. //console.log("client socket connect_timeout");
  118. //delete rpc_client.io;
  119. socket.close();
  120. });
  121. var sendData = function(send_data) {
  122. try {
  123. socket.send(send_data);
  124. }
  125. catch(e) {
  126. F.addOtherLogs('proxy/proxy',["send proxy err:",e.stack,send_data]);
  127. try {
  128. socket.close();
  129. }
  130. catch(e2) {
  131. F.addOtherLogs('proxy/proxy',["close socket err",e2.stack]);
  132. }
  133. }
  134. }
  135. rpc_client.on('*', function(data) {
  136. socket.up_time = new Date().getTime();
  137. var send_data = data;
  138. if (typeof(data) == "object") {
  139. send_data = JSON.stringify(data);
  140. if ('heartbeat' == data.route) {
  141. //F.addOtherLogs('hb/hb',["send data:",send_data]);
  142. } else {
  143. //F.addOtherLogs('proxy/proxy',["send data:",send_data]);
  144. }
  145. }
  146. if (F.isNull(socket.ed)) {
  147. sendData(send_data);
  148. } else {
  149. try {
  150. cipher(send_data,function(encode_str) {
  151. let send_data = {"ed":encode_str}
  152. //F.addOtherLogs('proxy/proxy',["cipher data:",JSON.stringify(send_data)]);
  153. sendData(JSON.stringify(send_data));
  154. });
  155. } catch(e) {
  156. F.addOtherLogs('proxy/proxy',["cipher err:",send_data]);
  157. }
  158. }
  159. });
  160. });
  161. //var memwatch = require('memwatch');
  162. //
  163. //memwatch.on('leak', function(info) {
  164. // console.error('Memory leak detected: ', info);
  165. //});
  166. //setInterval(function(){
  167. // console.log("Before:",process.memoryUsage());
  168. // gc();
  169. // console.log("After:",process.memoryUsage());
  170. //},5000);
  171. //setInterval(function() {
  172. // for (var i=0;i<socketServer.clients.length;i++) {
  173. // var cur_time = new Date().getTime();
  174. // //console.log("curtime:",cur_time);
  175. // //console.log("connect_time:", socketServer.clients[i].connect_time);
  176. // if (cur_time > socketServer.clients[i].up_time + 60*1000) socketServer.clients[i].close();
  177. // }
  178. //}, 10000);
  179. console.log('Listening on PORT: '+WS_PORT);
  180. F.addOtherLogs('proxy/proxy',['Listening on PORT: '+WS_PORT]);