rpcWithExpress.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. var path = require('path');
  2. var express = require('express');
  3. var fs = require('fs');
  4. var co = require('co');
  5. var logs_obj = require('../libs/logs.js');
  6. var logs = new logs_obj();
  7. var C = require('../config/index');
  8. var F = require('../common/function');
  9. function isNull(obj) {
  10. if (obj == null || typeof(obj) == "undefined" || obj.length == 0) {
  11. return true;
  12. }
  13. if(typeof(obj) == 'object' && Object.keys(obj).length == 0) {
  14. return true;
  15. }
  16. return false;
  17. };
  18. function rpcWithExpress(opt) {
  19. this.exp = express();
  20. if (!isNull(opt) && "ssl" in opt) {
  21. var opts = {
  22. key: fs.readFileSync('/etc/nginx/server.key'),
  23. cert: fs.readFileSync('/etc/nginx/server.crt')
  24. };
  25. this.svr = require('https').createServer(opts,this.exp);
  26. } else {
  27. this.svr = require('http').createServer(this.exp);
  28. }
  29. this.io = require('socket.io')(this.svr);
  30. this.static_cb_map = {};
  31. var that = this;
  32. that.io.route = function (route, fn) {
  33. that.static_cb_map[route] = fn;
  34. };
  35. var desDecode = function (ctx,route,msg) {
  36. try {
  37. if ("ed" in msg && F.isNull(msg.is_websocket)) { // 说明是socket.io加密的
  38. ctx.ed = 1;
  39. let encode_msg = msg.ed;
  40. let decode_msg = F.decipher(encode_msg);
  41. msg = JSON.parse(decode_msg);
  42. F.addOtherLogs('proxy/proxy',["receive data(sockio):",decode_msg]);
  43. }
  44. } catch(e) {
  45. F.addOtherLogs('proxy/proxy',["socket.io decode err:",msg]);
  46. }
  47. return msg;
  48. }
  49. var desEncode = function (ctx,route,msg) {
  50. try {
  51. if (!F.isNull(ctx.ed)) { // 说明是socket.io加密的
  52. let encode_msg = F.cipher(JSON.stringify(msg));
  53. msg = {"ed":encode_msg};
  54. }
  55. } catch(e) {
  56. F.addOtherLogs('proxy/proxy',["socket.io encode err:",msg]);
  57. }
  58. return msg;
  59. }
  60. that.main = function* (ctx,route,msg,fn) {
  61. try {
  62. ctx.up_time = new Date().getTime();
  63. let start_time = new Date().getTime();
  64. if (that.beforeCallback) {
  65. var before_res = yield that.beforeCallback(route,ctx,msg);
  66. if (!before_res) return;
  67. }
  68. msg.start_time = start_time;
  69. yield fn(null, ctx, msg, null);
  70. let end_time = new Date().getTime();
  71. if (end_time > start_time+C.slow_log_delta) {
  72. logs.addLogs("slow/slow",[`usetime:${end_time-start_time}`,"msg:",msg]);
  73. }
  74. } catch (e) {
  75. if (that.catchException) {
  76. yield that.catchException(route, ctx, msg, e);
  77. }
  78. console.log(e.stack);
  79. }
  80. };
  81. that.io.on('connection', function (socket) {
  82. //F.addOtherLogs('proxydebug/proxydebug',[" socket connect:",socket.request.url]);
  83. var ctx = socket;
  84. ctx.up_time = new Date().getTime();
  85. ctx.socket = socket; // 为了统一koa接口
  86. var ping_msg = {
  87. id:0,
  88. route:'heartbeat',
  89. req_data:{}
  90. }
  91. var timerid = setInterval(function() {
  92. var cur_time = new Date().getTime();
  93. var time_delta = cur_time - ctx.up_time;
  94. //if (ctx.up_time + 18*1000 <= cur_time) {
  95. // F.addDebugLogs(['send ping msg',time_delta]);
  96. // ctx.emit('*',ping_msg);
  97. //}
  98. if (ctx.up_time + 30*1000 <= cur_time) {
  99. F.addDebugLogs(['was disconnect by cleaner',time_delta]);
  100. ctx.disconnect();
  101. }
  102. },1000*10);
  103. if (that.onconnect) co(that.onconnect(null,ctx));
  104. ctx.on('disconnect', function (reason) {
  105. clearInterval(timerid);
  106. if (that.ondisconnect) co(that.ondisconnect(null,ctx,reason));
  107. });
  108. for (var key in that.static_cb_map) {
  109. (function(){
  110. var k = key;
  111. var fn = that.static_cb_map[key];
  112. ctx.on(k, function (msg) {
  113. msg = desDecode(ctx,null,msg);
  114. ctx.up_time = new Date().getTime();
  115. //F.addDebugLogs(['up time for:',ctx.up_time]);
  116. co(that.main(ctx,k,msg,fn));
  117. });
  118. })();
  119. }
  120. ctx.on("all#route", function (msg) {
  121. msg = desDecode(ctx,null,msg);
  122. (function(){
  123. var key = "unmatchRoute";
  124. if (msg.route in that.static_cb_map) key = msg.route;
  125. var fn = that.static_cb_map[key];
  126. ctx.up_time = new Date().getTime();
  127. //F.addDebugLogs(['up time for:',ctx.up_time]);
  128. co(that.main(ctx,msg.route,msg,fn));
  129. })();
  130. });
  131. });
  132. that.emit = function(cur_ctx,route,msg) {
  133. msg = desEncode(cur_ctx,route,msg);
  134. cur_ctx.emit('*', msg);
  135. };
  136. }
  137. module.exports = rpcWithExpress;