123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207 |
- 'use strict';
- var C = require('../config');
- var F = require('../common/function');
- var _ = require('underscore');
- _.str = require('underscore.string');
- _.v = require('validator');
- var co = require('co');
- var redis = require('../libs/redis');
- var redisClient = redis.redisClient;
- var redisCo = redis.redisCo;
- var dbcacheCli = redis.dbcacheCli;
- var dbcacheCo = redis.dbcacheCo;
- function imMgr(app, common_mgr) {
- var model_map = app.model_mgr.model_map;
- var mgr_map = common_mgr.mgr_map;
- var that = this;
- this.app = app;
- this.socket_dic = {};
- this.start_time = new Date().getTime();
- this.socket_prefix = _.str.sprintf("socket#%s:%s#%s_", C.inner_host, app.port, that.start_time);
- this.clear_prefix = _.str.sprintf("socket#%s:%s#*", C.inner_host, app.port);
- this.process_id = _.str.sprintf("%s:%s", C.inner_host, app.port); // 锁用进程标识
- // 清除之前没关闭的锁
- this.clearLock = function* () {
- let cur_time = new Date().getTime();
- let lock_prefix = "locker_" + this.process_id;
- let lock_list = yield mgr_map.redis.zrange(lock_prefix, 0, cur_time, "WITHSCORES");
- console.log("############lock_list", lock_list);
- if (lock_list.length % 2 == 1) {
- F.addErrLogs(["get old socket list err:", lock_list]);
- }
- for (let i = 0; i + 1 < lock_list.length; i += 2) {
- let key = lock_list[i];
- let time = lock_list[i + 1].toString(); // 字符串格式
- let lock = {
- "suc": 1,
- "time": time,
- "key": key,
- "pid": that.process_id,
- "svrRestartTime": cur_time
- }
- co(mgr_map.redis.releaseLock(lock));
- }
- }
- // 清除启动之前socket
- this.clearSocket = function* () {
- // dbcacheCli.flushdb(); // dbcache 模块应该立马清楚
- yield F.sleep(5000); // 3秒后 不要挤到一起
- var socket_his_list = yield redisCo.keys(that.clear_prefix);
- F.addLogs(["socket_his_list:", socket_his_list]); // 清除旧socket
- for (var i = 0; i < socket_his_list.length; i++) {
- var socket_id = socket_his_list[i];
- if (socket_id.indexOf(that.socket_prefix) == -1) {
- var id_list = socket_id.split('_');
- id_list.splice(0, 1);
- var uniid = id_list.join('_');
- try {
- var ctx = {};
- ctx.uid = id_list[id_list.length - 1];
- let page_name = id_list[id_list.length - 2];
- yield mgr_map.im.delSvrMap(uniid, false);
- yield mgr_map.room.abnormalClose(uniid, ctx, page_name);
- } catch (e) {
- F.addErrLogs(["clear socket err:", e.stack]);
- }
- yield redisCo.del(socket_id);
- F.addLogs(["del socket:", uniid]);
- }
- }
- };
- this.setSocket = function* (uniid, ctx) {
- yield redisCo.set(that.socket_prefix + uniid, uniid);
- that.socket_dic[uniid] = ctx;
- };
- this.delSocket = function* (uniid) {
- delete that.socket_dic[uniid];
- yield redisCo.del(that.socket_prefix + uniid);
- };
- this.setSvrMap = function* (uniid, inner_host, ctx, page_name) {
- var socket_id = ctx.socket.id;
- delete ctx.has_del;
- yield that.setSocket(uniid, ctx);
- // yield mgr_map.redis.addUniidToBigRoom(uniid);
- mgr_map.redisSub.subscribe(uniid);
- // 添加redis登录状态
- // yield mgr_map.socketPage.insertOnlineSocket(ctx.userid, uniid, page_name);
- };
- this.delSvrMap = function* (uniid, kickoffold = true, db, kickoffold_info = null) {
- try {
- if (kickoffold) { // 向旧连接发送kickoff消息
- if (F.isNull(kickoffold_info)) {
- kickoffold_info = {'errno': 10037, 'errmsg': C.err_msg['10037']};
- }
- F.addLogs(['kickoffold', {uniid: uniid, kickoffold_info}]);
- yield that.sendReq(uniid, "kickoff", kickoffold_info);
- }
- // yield mgr_map.redis.delUniidFromBigRoom(uniid);
- yield mgr_map.redis.delPackNextId(uniid);
- mgr_map.redisSub.unsubscribe(uniid);
- yield that.delSocket(uniid);
- } catch (e) {
- F.addErrLogs(["delSvrMap:", e.stack]);
- }
- };
- // pack req data for client
- this.packReq = function* (uniid, route, data) {
- var send_data = {};
- send_data["id"] = yield mgr_map.redis.getNextReqId(uniid);
- send_data["route"] = route;
- send_data["req_data"] = data;
- return send_data;
- };
- // pack res data for client
- this.packRes = function* (id, route, data) {
- var send_data = {};
- send_data["id"] = parseInt(id);
- send_data["route"] = route;
- send_data["res_data"] = data;
- return send_data;
- };
- this.sendReq = function* (uniid, route, data, cb_data, err_cb) {
- var send_data = yield that.packReq(uniid, route, data);
- err_cb = F.isNull(err_cb) ? that.sendReqErrorCallBack : err_cb;
- var cbid = F.vsprintf("%s_%s", [uniid, send_data.id]);
- if (!F.isNull(cb_data)) yield mgr_map.redis.setSendReqCbData(cbid, cb_data);
- var res = yield that.emit(uniid, route, send_data, err_cb);
- return res;
- };
- this.getSendReqCbData = function* (uniid, msg) {
- var cbid = F.vsprintf("%s_%s", [uniid, msg.id]);
- return yield mgr_map.redis.getSendReqCbData(cbid);
- }
- this.sendRes = function* (uniid, message, data) {
- var lastId = message.id;
- var route = message.route;
- var send_data = yield that.packRes(lastId, route, data);
- send_data.ut = new Date().getTime() - message.start_time;
- var res = yield that.emit(uniid, route, send_data);
- return res;
- }
- this.emit = function* (uniid, route, msg, err_cb) {
- var res_emit = yield this.emitLocal(uniid, route, msg);
- if (res_emit == false) {
- mgr_map.redisSub.publish(uniid, route, msg, err_cb);
- }
- };
- this.emitLocal = function* (uniid, route, msg, from = '') {
- if (!F.isNull(uniid) && uniid in that.socket_dic) {
- var cur_ctx = that.socket_dic[uniid];
- msg = yield that.app.common_mgr.pregReplaceResJson(msg, uniid, cur_ctx);
- if (!F.isNull(msg.ut)) F.addOtherLogs("imrw/imrw", ['##im send: uniid:', uniid, " route:", route, msg]);
- that.app.emit(cur_ctx, route, msg);
- if (route == "kickoff") { // 如果是剔除,要打上剔除标识 从user_dic里删除
- cur_ctx.has_del = true;
- setTimeout(function () {
- cur_ctx.disconnect();
- }, 1000);
- yield that.delSocket(uniid);
- F.log("debug", "local remove old socket,uniid:%s", [uniid]);
- }
- return true;
- } else {
- if (!F.isNull(from)) F.addErrLogs(["##im send fail,sock not found:", uniid, route, msg]);
- return false;
- }
- }
- this.sendReqErrorCallBack = function (uniid, route, msg) {
- if ("res_data" in msg) {
- mgr_map.logs.addLogs("err/err", ["send res error:", route]);
- }
- if (F.isNull(uniid)) F.throwErr("send msg faile. uniid is null.");
- var id_list = uniid.split('_');
- var ctx = {};
- ctx.uid = id_list[id_list.length - 1];
- let page_name = id_list[id_list.length - 2];
- co(mgr_map.im.delSvrMap(uniid, false));
- //co(mgr_map.room.abnormalClose(uniid, ctx, page_name));
- mgr_map.logs.addLogs("err/err", "send msg to uniid:" + uniid + " faile. route:" + route + '. msg:' + JSON.stringify(msg));
- }
- };
- module.exports = imMgr;
|