123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- var path = require('path');
- var express = require('express');
- var fs = require('fs');
- var co = require('co');
- var logs_obj = require('../libs/logs.js');
- var logs = new logs_obj();
- var C = require('../config/index');
- var F = require('../common/function');
- function isNull(obj) {
- if (obj == null || typeof(obj) == "undefined" || obj.length == 0) {
- return true;
- }
- if(typeof(obj) == 'object' && Object.keys(obj).length == 0) {
- return true;
- }
- return false;
- };
- function rpcWithExpress(opt) {
- this.exp = express();
- if (!isNull(opt) && "ssl" in opt) {
- var opts = {
- key: fs.readFileSync('/etc/nginx/server.key'),
- cert: fs.readFileSync('/etc/nginx/server.crt')
- };
- this.svr = require('https').createServer(opts,this.exp);
- } else {
- this.svr = require('http').createServer(this.exp);
- }
- this.io = require('socket.io')(this.svr);
- this.static_cb_map = {};
- var that = this;
- that.io.route = function (route, fn) {
- that.static_cb_map[route] = fn;
- };
- var desDecode = function (ctx,route,msg) {
- try {
- if ("ed" in msg && F.isNull(msg.is_websocket)) { // 说明是socket.io加密的
- ctx.ed = 1;
- let encode_msg = msg.ed;
- let decode_msg = F.decipher(encode_msg);
- msg = JSON.parse(decode_msg);
- F.addOtherLogs('proxy/proxy',["receive data(sockio):",decode_msg]);
- }
- } catch(e) {
- F.addOtherLogs('proxy/proxy',["socket.io decode err:",msg]);
- }
- return msg;
- }
- var desEncode = function (ctx,route,msg) {
- try {
- if (!F.isNull(ctx.ed)) { // 说明是socket.io加密的
- let encode_msg = F.cipher(JSON.stringify(msg));
- msg = {"ed":encode_msg};
- }
- } catch(e) {
- F.addOtherLogs('proxy/proxy',["socket.io encode err:",msg]);
- }
- return msg;
- }
- that.main = function* (ctx,route,msg,fn) {
- try {
- ctx.up_time = new Date().getTime();
- let start_time = new Date().getTime();
- if (that.beforeCallback) {
- var before_res = yield that.beforeCallback(route,ctx,msg);
- if (!before_res) return;
- }
- msg.start_time = start_time;
- yield fn(null, ctx, msg, null);
- let end_time = new Date().getTime();
- if (end_time > start_time+C.slow_log_delta) {
- logs.addLogs("slow/slow",[`usetime:${end_time-start_time}`,"msg:",msg]);
- }
- } catch (e) {
- if (that.catchException) {
- yield that.catchException(route, ctx, msg, e);
- }
- console.log(e.stack);
- }
- };
- that.io.on('connection', function (socket) {
- //F.addOtherLogs('proxydebug/proxydebug',[" socket connect:",socket.request.url]);
- var ctx = socket;
- ctx.up_time = new Date().getTime();
- ctx.socket = socket; // 为了统一koa接口
- var ping_msg = {
- id:0,
- route:'heartbeat',
- req_data:{}
- }
- var timerid = setInterval(function() {
- var cur_time = new Date().getTime();
- var time_delta = cur_time - ctx.up_time;
- //if (ctx.up_time + 18*1000 <= cur_time) {
- // F.addDebugLogs(['send ping msg',time_delta]);
- // ctx.emit('*',ping_msg);
- //}
- if (ctx.up_time + 30*1000 <= cur_time) {
- F.addDebugLogs(['was disconnect by cleaner',time_delta]);
- ctx.disconnect();
- }
- },1000*10);
- if (that.onconnect) co(that.onconnect(null,ctx));
- ctx.on('disconnect', function (reason) {
- clearInterval(timerid);
- if (that.ondisconnect) co(that.ondisconnect(null,ctx,reason));
- });
- for (var key in that.static_cb_map) {
- (function(){
- var k = key;
- var fn = that.static_cb_map[key];
- ctx.on(k, function (msg) {
- msg = desDecode(ctx,null,msg);
- ctx.up_time = new Date().getTime();
- //F.addDebugLogs(['up time for:',ctx.up_time]);
- co(that.main(ctx,k,msg,fn));
- });
- })();
- }
- ctx.on("all#route", function (msg) {
- msg = desDecode(ctx,null,msg);
- (function(){
- var key = "unmatchRoute";
- if (msg.route in that.static_cb_map) key = msg.route;
- var fn = that.static_cb_map[key];
- ctx.up_time = new Date().getTime();
- //F.addDebugLogs(['up time for:',ctx.up_time]);
- co(that.main(ctx,msg.route,msg,fn));
- })();
- });
- });
- that.emit = function(cur_ctx,route,msg) {
- msg = desEncode(cur_ctx,route,msg);
- cur_ctx.emit('*', msg);
- };
- }
- module.exports = rpcWithExpress;
|