rpcserver.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. var staticCache = require('koa-static-cache');
  2. var koa = require('koa.io');
  3. var path = require('path');
  4. var fs = require('fs');
  5. var co = require('co');
  6. function rpcServer() {
  7. this.app = koa();
  8. this.cid = 0;
  9. this.response_cb_map = {};
  10. this.static_cb_map = {};
  11. this.err_cb_map = {};
  12. this.all_ctx_dic = {};
  13. this.BeatTimerId = -1;
  14. var that = this;
  15. var rpc_svr = this;
  16. this.nextId = function() {
  17. var new_id = ++rpc_svr.cid;
  18. if (rpc_svr.cid > 999999999) {
  19. rpc_svr.cid = 0;
  20. }
  21. return new_id;
  22. };
  23. this.setBeatTime = function(beat_time, timeout) {
  24. if (that.BeatTimerId > 0) {
  25. clearInterval(that.BeatTimerId);
  26. that.BeatTimerId = -1;
  27. }
  28. if (beat_time <= 0) return;
  29. var timeout = timeout || 20000;
  30. that.BeatTimerId = setInterval(function(){
  31. for (var key in that.all_ctx_dic) {
  32. (function(){
  33. var cur_ctx = that.all_ctx_dic[key];
  34. that.emit(cur_ctx,'rpc_beat',"",{
  35. "success": function*(){
  36. //console.log("rpc server beat suc");
  37. },
  38. "timeout_time": timeout,
  39. "timeout_cb": function*(){
  40. console.log('server beat timeout');
  41. cur_ctx.timeout = true;
  42. cur_ctx.disconnect();
  43. },
  44. "error": function* (){console.log('server beat err');}
  45. });
  46. })();
  47. }
  48. }, beat_time);
  49. };
  50. this.app.io.route("rpc_response", function* (next,data){
  51. if ("rpc_cid" in data && data.rpc_cid in that.response_cb_map) {
  52. var suc_cb = that.response_cb_map[data.rpc_cid];
  53. var suc_data = data.rpc_data || {};
  54. yield suc_cb(suc_data);
  55. }
  56. });
  57. this.app.io.route("rpc_request", function* (next,data){
  58. if (!("rpc_route" in data)) {
  59. return;
  60. }
  61. var rpc_route = data.rpc_route;
  62. if (!(rpc_route in that.static_cb_map)) {
  63. rpc_route = '*';
  64. }
  65. if (rpc_route in that.static_cb_map) {
  66. var route_cb = that.static_cb_map[rpc_route];
  67. var rpc_data = data.rpc_data || {};
  68. var ctx = this;
  69. try {
  70. if (that.beforeCallback) {
  71. var before_res = yield that.beforeCallback(rpc_route,this,rpc_data);
  72. if (!before_res) return;
  73. }
  74. yield route_cb(next, this, rpc_data, function(response) {
  75. var response_json = {};
  76. response_json["rpc_cid"] = data.rpc_cid;
  77. response_json["rpc_data"] = response;
  78. ctx.emit('rpc_response', response_json);
  79. });
  80. } catch (e) {
  81. if (that.catchException) {
  82. yield that.catchException(rpc_route, this, rpc_data, e);
  83. }
  84. console.log(e.stack);
  85. }
  86. }
  87. });
  88. function internalio() {
  89. this.use = function(cb) {
  90. that.app.io.use(function* (next) {
  91. var sockid = this.socket.id;
  92. that.all_ctx_dic[sockid] = this;
  93. yield cb(next, this);
  94. delete that.all_ctx_dic[sockid];
  95. if (sockid in that.err_cb_map) {
  96. var sub_map = that.err_cb_map[sockid];
  97. for (var key in sub_map) {
  98. var err_cb = sub_map[key];
  99. yield err_cb();
  100. }
  101. delete that.err_cb_map[sockid];
  102. }
  103. });
  104. };
  105. this.route = function(route, cb) {
  106. that.static_cb_map[route] = cb;
  107. that.app.io.route(route, function* (next,msg) {
  108. try {
  109. if (that.beforeCallback) {
  110. var before_res = yield that.beforeCallback(route,this,msg);
  111. if (!before_res) return;
  112. }
  113. yield cb(next,this,msg);
  114. } catch (e) {
  115. if (that.catchException) {
  116. yield that.catchException(route,this,msg,e);
  117. }
  118. console.log(e.stack);
  119. }
  120. });
  121. };
  122. // for beat
  123. this.route("rpc_beat", function* (next,ctx,msg,cb){
  124. if (cb) cb("");
  125. });
  126. }
  127. this.io = new internalio();
  128. this.use = function(options) {
  129. that.app.use(options);
  130. }
  131. this.listen = function(port, options) {
  132. that.app.listen(port, options);
  133. }
  134. this.sleep = function(ms) {
  135. return function(done) {
  136. setTimeout(done,ms);
  137. }
  138. }
  139. //options:
  140. // success: callback for success response
  141. // error: callback for error
  142. // if has success callback:
  143. // timeout_time: time for timeout ms
  144. // timeout_cb: callback for timeout
  145. this.emit = function(ctx,route,msg,options) {
  146. options = options || {};
  147. //if (!options.success && !options.error) {// no callback
  148. if (false) {// no callback
  149. if (msg) {
  150. ctx.emit(route,msg);
  151. } else {
  152. ctx.emit(route);
  153. }
  154. } else {
  155. var sid = ctx.socket.id;
  156. var new_id =String(that.nextId());
  157. var timeout_id;
  158. if (options.success) {
  159. // timeout callback
  160. if (options.timeout_cb) {
  161. var timeout = options.timeout_time || 10000;
  162. timeout_id = setTimeout(function() {
  163. var sub_err_map = that.err_cb_map[sid] || {};
  164. if (new_id in sub_err_map) {delete sub_err_map[new_id];} // rm err cb
  165. if (new_id in that.response_cb_map) {
  166. delete that.response_cb_map[new_id]; // rm success cb
  167. if(typeof(options.timeout_cb)=="function"){
  168. co(options.timeout_cb()); // exe timeout cb
  169. //options.timeout_cb();
  170. }else{
  171. console.log(options.timeout_cb+'is not a function');
  172. }
  173. }
  174. }, timeout);
  175. }
  176. // success callback
  177. var suc_cb_cb = function*(suc_data) {
  178. if (new_id in that.response_cb_map) {delete that.response_cb_map[new_id];}
  179. var sub_err_map = that.err_cb_map[sid] || {};
  180. if (new_id in sub_err_map) {delete sub_err_map[new_id];} // rm err cb
  181. if (timeout_id) {clearTimeout(timeout_id);} // rm timeout cb
  182. yield options.success(suc_data); // exe success cb
  183. };
  184. that.response_cb_map[new_id] = suc_cb_cb;
  185. }
  186. // err callback
  187. if (options.error) {
  188. var e_cb_cb = function*() {
  189. if (new_id in that.response_cb_map) {delete that.response_cb_map[new_id];} // rm success cb
  190. if (timeout_id) {clearTimeout(timeout_id);} // rm timeout cb
  191. yield options.error(); // exe error cb
  192. }
  193. var sub_cb_map = that.err_cb_map[sid] || {};
  194. sub_cb_map[new_id] = e_cb_cb;
  195. that.err_cb_map[sid] = sub_cb_map;
  196. }
  197. var new_msg = {};
  198. new_msg["rpc_cid"] = new_id;
  199. new_msg["rpc_route"] = route;
  200. if (msg) {
  201. new_msg["rpc_data"] = msg;
  202. ctx.emit("rpc_request",new_msg);
  203. } else {
  204. ctx.emit("rpc_request",new_msg);
  205. }
  206. }
  207. };
  208. }
  209. module.exports = rpcServer;