redisSub.js 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. 'use strict';
  2. var C = require('../config');
  3. var F = require('../common/function');
  4. var _ = require('underscore');
  5. _.str = require('underscore.string');
  6. var redis = require('redis');
  7. var co = require('co');
  8. var wrapper = require('co-redis');
  9. function redisSubMgr(app, common_mgr) {
  10. var model_map = app.model_mgr.model_map;
  11. var mgr_map = common_mgr.mgr_map;
  12. this.redisSub;
  13. this.redisPush;
  14. var that = this;
  15. this.init = function (){
  16. try {
  17. that.redisSub = redis.createClient(C.redis.port,C.redis.host,C.redis.options);
  18. that.redisSub.select(C.redis.db_sub,function(){
  19. console.log('redis subscribe select db is '+C.redis.db_sub);
  20. });
  21. that.redisPush = redis.createClient(C.redis.port,C.redis.host,C.redis.options);
  22. that.redisPush.select(C.redis.db_sub,function(){
  23. });
  24. that.redisSub.on("message", function (channel, message) {
  25. that.onMessage(channel, message);
  26. });
  27. that.redisSub.on("error", function (error) {
  28. console.log("redis subscribe error. message: " + error);
  29. });
  30. that.redisSub.on("subscribe", function (channel, count) {
  31. console.log("redis subscribe:client subscribe. channel:" + channel + " count:" + count);
  32. });
  33. that.redisSub.on("unsubscribe", function (channel, count) {
  34. console.log("redis subscribe:client unsubscribe. channel:" + channel + " count:" + count);
  35. });
  36. } catch(e) {
  37. F.log("err", "redis subscribe connect fiale");
  38. console.log(e);
  39. }
  40. }
  41. this.init();
  42. this.onMessage = function (channel, message) {
  43. try {
  44. var prefix = C.redisPre.subscribe_prefix.replace('%s', '');
  45. var uniid = channel.replace(prefix, '');
  46. var res = JSON.parse(message);
  47. co(mgr_map.im.emitLocal(uniid, res['route'], res['msg'], 'redisSub'));
  48. } catch(e) {
  49. F.log("err", "redis subscribe on message fiale");
  50. console.log(e);
  51. }
  52. };
  53. this.subscribe = function (uniid) {
  54. if(F.isNull(uniid)) return false;
  55. var channel = _.str.sprintf(C.redisPre.subscribe_prefix, uniid);
  56. that.redisSub.subscribe(channel);
  57. }
  58. this.unsubscribe = function (uniid) {
  59. if(F.isNull(uniid)) return false;
  60. var channel = _.str.sprintf(C.redisPre.subscribe_prefix, uniid);
  61. that.redisSub.unsubscribe(channel);
  62. }
  63. this.publish = function (uniid, route, msg, err_cb) {
  64. var channel = _.str.sprintf(C.redisPre.subscribe_prefix, uniid);
  65. var send_data = {
  66. "route":route,
  67. "msg":msg
  68. };
  69. F.addOtherLogs("imrw/imrw",['##im send by publish: uniid:',uniid," route:",route]);
  70. that.redisPush.publish(channel, JSON.stringify(send_data), function (err, result) {
  71. if(err || result == 0) {
  72. common_mgr.addLogs(["publish err:",err,'channel:',channel,"uniid:",uniid]);
  73. if(!F.isNull(err_cb)) {
  74. try{
  75. err_cb(uniid, route, send_data);
  76. }catch(e) {
  77. common_mgr.addLogs(["publish errcb err:",e]);
  78. }
  79. }
  80. }
  81. });
  82. }
  83. }
  84. module.exports = redisSubMgr;