123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230 |
- var staticCache = require('koa-static-cache');
- var koa = require('koa.io');
- var path = require('path');
- var fs = require('fs');
- var co = require('co');
- function rpcServer() {
- this.app = koa();
- this.cid = 0;
- this.response_cb_map = {};
- this.static_cb_map = {};
- this.err_cb_map = {};
- this.all_ctx_dic = {};
- this.BeatTimerId = -1;
- var that = this;
- var rpc_svr = this;
- this.nextId = function() {
- var new_id = ++rpc_svr.cid;
- if (rpc_svr.cid > 999999999) {
- rpc_svr.cid = 0;
- }
- return new_id;
- };
- this.setBeatTime = function(beat_time, timeout) {
- if (that.BeatTimerId > 0) {
- clearInterval(that.BeatTimerId);
- that.BeatTimerId = -1;
- }
- if (beat_time <= 0) return;
- var timeout = timeout || 20000;
- that.BeatTimerId = setInterval(function(){
- for (var key in that.all_ctx_dic) {
- (function(){
- var cur_ctx = that.all_ctx_dic[key];
- that.emit(cur_ctx,'rpc_beat',"",{
- "success": function*(){
- //console.log("rpc server beat suc");
- },
- "timeout_time": timeout,
- "timeout_cb": function*(){
- console.log('server beat timeout');
- cur_ctx.timeout = true;
- cur_ctx.disconnect();
- },
- "error": function* (){console.log('server beat err');}
- });
- })();
- }
- }, beat_time);
- };
- this.app.io.route("rpc_response", function* (next,data){
- if ("rpc_cid" in data && data.rpc_cid in that.response_cb_map) {
- var suc_cb = that.response_cb_map[data.rpc_cid];
- var suc_data = data.rpc_data || {};
- yield suc_cb(suc_data);
- }
- });
- this.app.io.route("rpc_request", function* (next,data){
- if (!("rpc_route" in data)) {
- return;
- }
- var rpc_route = data.rpc_route;
- if (!(rpc_route in that.static_cb_map)) {
- rpc_route = '*';
- }
- if (rpc_route in that.static_cb_map) {
- var route_cb = that.static_cb_map[rpc_route];
- var rpc_data = data.rpc_data || {};
- var ctx = this;
- try {
- if (that.beforeCallback) {
- var before_res = yield that.beforeCallback(rpc_route,this,rpc_data);
- if (!before_res) return;
- }
- yield route_cb(next, this, rpc_data, function(response) {
- var response_json = {};
- response_json["rpc_cid"] = data.rpc_cid;
- response_json["rpc_data"] = response;
- ctx.emit('rpc_response', response_json);
- });
- } catch (e) {
- if (that.catchException) {
- yield that.catchException(rpc_route, this, rpc_data, e);
- }
- console.log(e.stack);
- }
- }
- });
- function internalio() {
- this.use = function(cb) {
- that.app.io.use(function* (next) {
- var sockid = this.socket.id;
- that.all_ctx_dic[sockid] = this;
- yield cb(next, this);
- delete that.all_ctx_dic[sockid];
- if (sockid in that.err_cb_map) {
- var sub_map = that.err_cb_map[sockid];
- for (var key in sub_map) {
- var err_cb = sub_map[key];
- yield err_cb();
- }
- delete that.err_cb_map[sockid];
- }
- });
- };
- this.route = function(route, cb) {
- that.static_cb_map[route] = cb;
- that.app.io.route(route, function* (next,msg) {
- try {
- if (that.beforeCallback) {
- var before_res = yield that.beforeCallback(route,this,msg);
- if (!before_res) return;
- }
- yield cb(next,this,msg);
- } catch (e) {
- if (that.catchException) {
- yield that.catchException(route,this,msg,e);
- }
- console.log(e.stack);
- }
- });
- };
- // for beat
- this.route("rpc_beat", function* (next,ctx,msg,cb){
- if (cb) cb("");
- });
- }
- this.io = new internalio();
- this.use = function(options) {
- that.app.use(options);
- }
- this.listen = function(port, options) {
- that.app.listen(port, options);
- }
- this.sleep = function(ms) {
- return function(done) {
- setTimeout(done,ms);
- }
- }
- //options:
- // success: callback for success response
- // error: callback for error
- // if has success callback:
- // timeout_time: time for timeout ms
- // timeout_cb: callback for timeout
- this.emit = function(ctx,route,msg,options) {
- options = options || {};
- //if (!options.success && !options.error) {// no callback
- if (false) {// no callback
- if (msg) {
- ctx.emit(route,msg);
- } else {
- ctx.emit(route);
- }
- } else {
- var sid = ctx.socket.id;
- var new_id =String(that.nextId());
- var timeout_id;
- if (options.success) {
- // timeout callback
- if (options.timeout_cb) {
- var timeout = options.timeout_time || 10000;
- timeout_id = setTimeout(function() {
- var sub_err_map = that.err_cb_map[sid] || {};
- if (new_id in sub_err_map) {delete sub_err_map[new_id];} // rm err cb
- if (new_id in that.response_cb_map) {
- delete that.response_cb_map[new_id]; // rm success cb
- if(typeof(options.timeout_cb)=="function"){
- co(options.timeout_cb()); // exe timeout cb
- //options.timeout_cb();
- }else{
- console.log(options.timeout_cb+'is not a function');
- }
- }
- }, timeout);
- }
- // success callback
- var suc_cb_cb = function*(suc_data) {
- if (new_id in that.response_cb_map) {delete that.response_cb_map[new_id];}
- var sub_err_map = that.err_cb_map[sid] || {};
- if (new_id in sub_err_map) {delete sub_err_map[new_id];} // rm err cb
- if (timeout_id) {clearTimeout(timeout_id);} // rm timeout cb
- yield options.success(suc_data); // exe success cb
- };
- that.response_cb_map[new_id] = suc_cb_cb;
- }
- // err callback
- if (options.error) {
- var e_cb_cb = function*() {
- if (new_id in that.response_cb_map) {delete that.response_cb_map[new_id];} // rm success cb
- if (timeout_id) {clearTimeout(timeout_id);} // rm timeout cb
- yield options.error(); // exe error cb
- }
- var sub_cb_map = that.err_cb_map[sid] || {};
- sub_cb_map[new_id] = e_cb_cb;
- that.err_cb_map[sid] = sub_cb_map;
- }
- var new_msg = {};
- new_msg["rpc_cid"] = new_id;
- new_msg["rpc_route"] = route;
- if (msg) {
- new_msg["rpc_data"] = msg;
- ctx.emit("rpc_request",new_msg);
- } else {
- ctx.emit("rpc_request",new_msg);
- }
- }
- };
- }
- module.exports = rpcServer;
|