im.js 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. 'use strict';
  2. var C = require('../config');
  3. var F = require('../common/function');
  4. var _ = require('underscore');
  5. _.str = require('underscore.string');
  6. _.v = require('validator');
  7. var co = require('co');
  8. var redis = require('../libs/redis');
  9. var redisClient = redis.redisClient;
  10. var redisCo = redis.redisCo;
  11. var dbcacheCli = redis.dbcacheCli;
  12. var dbcacheCo = redis.dbcacheCo;
  13. function imMgr(app, common_mgr) {
  14. var model_map = app.model_mgr.model_map;
  15. var mgr_map = common_mgr.mgr_map;
  16. var that = this;
  17. this.app = app;
  18. this.socket_dic = {};
  19. this.start_time = new Date().getTime();
  20. this.socket_prefix = _.str.sprintf("socket#%s:%s#%s_", C.inner_host, app.port, that.start_time);
  21. this.clear_prefix = _.str.sprintf("socket#%s:%s#*", C.inner_host, app.port);
  22. this.process_id = _.str.sprintf("%s:%s", C.inner_host, app.port); // 锁用进程标识
  23. // 清除之前没关闭的锁
  24. this.clearLock = function* () {
  25. let cur_time = new Date().getTime();
  26. let lock_prefix = "locker_" + this.process_id;
  27. let lock_list = yield mgr_map.redis.zrange(lock_prefix, 0, cur_time, "WITHSCORES");
  28. console.log("############lock_list", lock_list);
  29. if (lock_list.length % 2 == 1) {
  30. F.addErrLogs(["get old socket list err:", lock_list]);
  31. }
  32. for (let i = 0; i + 1 < lock_list.length; i += 2) {
  33. let key = lock_list[i];
  34. let time = lock_list[i + 1].toString(); // 字符串格式
  35. let lock = {
  36. "suc": 1,
  37. "time": time,
  38. "key": key,
  39. "pid": that.process_id,
  40. "svrRestartTime": cur_time
  41. }
  42. co(mgr_map.redis.releaseLock(lock));
  43. }
  44. }
  45. // 清除启动之前socket
  46. this.clearSocket = function* () {
  47. // dbcacheCli.flushdb(); // dbcache 模块应该立马清楚
  48. yield F.sleep(5000); // 3秒后 不要挤到一起
  49. var socket_his_list = yield redisCo.keys(that.clear_prefix);
  50. F.addLogs(["socket_his_list:", socket_his_list]); // 清除旧socket
  51. for (var i = 0; i < socket_his_list.length; i++) {
  52. var socket_id = socket_his_list[i];
  53. if (socket_id.indexOf(that.socket_prefix) == -1) {
  54. var id_list = socket_id.split('_');
  55. id_list.splice(0, 1);
  56. var uniid = id_list.join('_');
  57. try {
  58. var ctx = {};
  59. ctx.uid = id_list[id_list.length - 1];
  60. let page_name = id_list[id_list.length - 2];
  61. yield mgr_map.im.delSvrMap(uniid, false);
  62. yield mgr_map.room.abnormalClose(uniid, ctx, page_name);
  63. } catch (e) {
  64. F.addErrLogs(["clear socket err:", e.stack]);
  65. }
  66. yield redisCo.del(socket_id);
  67. F.addLogs(["del socket:", uniid]);
  68. }
  69. }
  70. };
  71. this.setSocket = function* (uniid, ctx) {
  72. yield redisCo.set(that.socket_prefix + uniid, uniid);
  73. that.socket_dic[uniid] = ctx;
  74. };
  75. this.delSocket = function* (uniid) {
  76. delete that.socket_dic[uniid];
  77. yield redisCo.del(that.socket_prefix + uniid);
  78. };
  79. this.setSvrMap = function* (uniid, inner_host, ctx, page_name) {
  80. var socket_id = ctx.socket.id;
  81. delete ctx.has_del;
  82. yield that.setSocket(uniid, ctx);
  83. // yield mgr_map.redis.addUniidToBigRoom(uniid);
  84. mgr_map.redisSub.subscribe(uniid);
  85. // 添加redis登录状态
  86. // yield mgr_map.socketPage.insertOnlineSocket(ctx.userid, uniid, page_name);
  87. };
  88. this.delSvrMap = function* (uniid, kickoffold = true, db, kickoffold_info = null) {
  89. try {
  90. if (kickoffold) { // 向旧连接发送kickoff消息
  91. if (F.isNull(kickoffold_info)) {
  92. kickoffold_info = {'errno': 10037, 'errmsg': C.err_msg['10037']};
  93. }
  94. F.addLogs(['kickoffold', {uniid: uniid, kickoffold_info}]);
  95. yield that.sendReq(uniid, "kickoff", kickoffold_info);
  96. }
  97. // yield mgr_map.redis.delUniidFromBigRoom(uniid);
  98. yield mgr_map.redis.delPackNextId(uniid);
  99. mgr_map.redisSub.unsubscribe(uniid);
  100. yield that.delSocket(uniid);
  101. } catch (e) {
  102. F.addErrLogs(["delSvrMap:", e.stack]);
  103. }
  104. };
  105. // pack req data for client
  106. this.packReq = function* (uniid, route, data) {
  107. var send_data = {};
  108. send_data["id"] = yield mgr_map.redis.getNextReqId(uniid);
  109. send_data["route"] = route;
  110. send_data["req_data"] = data;
  111. return send_data;
  112. };
  113. // pack res data for client
  114. this.packRes = function* (id, route, data) {
  115. var send_data = {};
  116. send_data["id"] = parseInt(id);
  117. send_data["route"] = route;
  118. send_data["res_data"] = data;
  119. return send_data;
  120. };
  121. this.sendReq = function* (uniid, route, data, cb_data, err_cb) {
  122. var send_data = yield that.packReq(uniid, route, data);
  123. err_cb = F.isNull(err_cb) ? that.sendReqErrorCallBack : err_cb;
  124. var cbid = F.vsprintf("%s_%s", [uniid, send_data.id]);
  125. if (!F.isNull(cb_data)) yield mgr_map.redis.setSendReqCbData(cbid, cb_data);
  126. var res = yield that.emit(uniid, route, send_data, err_cb);
  127. return res;
  128. };
  129. this.getSendReqCbData = function* (uniid, msg) {
  130. var cbid = F.vsprintf("%s_%s", [uniid, msg.id]);
  131. return yield mgr_map.redis.getSendReqCbData(cbid);
  132. }
  133. this.sendRes = function* (uniid, message, data) {
  134. var lastId = message.id;
  135. var route = message.route;
  136. var send_data = yield that.packRes(lastId, route, data);
  137. send_data.ut = new Date().getTime() - message.start_time;
  138. var res = yield that.emit(uniid, route, send_data);
  139. return res;
  140. }
  141. this.emit = function* (uniid, route, msg, err_cb) {
  142. var res_emit = yield this.emitLocal(uniid, route, msg);
  143. if (res_emit == false) {
  144. mgr_map.redisSub.publish(uniid, route, msg, err_cb);
  145. }
  146. };
  147. this.emitLocal = function* (uniid, route, msg, from = '') {
  148. if (!F.isNull(uniid) && uniid in that.socket_dic) {
  149. var cur_ctx = that.socket_dic[uniid];
  150. msg = yield that.app.common_mgr.pregReplaceResJson(msg, uniid, cur_ctx);
  151. if (!F.isNull(msg.ut)) F.addOtherLogs("imrw/imrw", ['##im send: uniid:', uniid, " route:", route, msg]);
  152. that.app.emit(cur_ctx, route, msg);
  153. if (route == "kickoff") { // 如果是剔除,要打上剔除标识 从user_dic里删除
  154. cur_ctx.has_del = true;
  155. setTimeout(function () {
  156. cur_ctx.disconnect();
  157. }, 1000);
  158. yield that.delSocket(uniid);
  159. F.log("debug", "local remove old socket,uniid:%s", [uniid]);
  160. }
  161. return true;
  162. } else {
  163. if (!F.isNull(from)) F.addErrLogs(["##im send fail,sock not found:", uniid, route, msg]);
  164. return false;
  165. }
  166. }
  167. this.sendReqErrorCallBack = function (uniid, route, msg) {
  168. if ("res_data" in msg) {
  169. mgr_map.logs.addLogs("err/err", ["send res error:", route]);
  170. }
  171. if (F.isNull(uniid)) F.throwErr("send msg faile. uniid is null.");
  172. var id_list = uniid.split('_');
  173. var ctx = {};
  174. ctx.uid = id_list[id_list.length - 1];
  175. let page_name = id_list[id_list.length - 2];
  176. co(mgr_map.im.delSvrMap(uniid, false));
  177. //co(mgr_map.room.abnormalClose(uniid, ctx, page_name));
  178. mgr_map.logs.addLogs("err/err", "send msg to uniid:" + uniid + " faile. route:" + route + '. msg:' + JSON.stringify(msg));
  179. }
  180. };
  181. module.exports = imMgr;