db_pool.js 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. var config = require('../config');
  2. var server = config.mysqlServers[0];
  3. var model_base = require('./model_base.js');
  4. const F = require('../common/function');
  5. var co = require('co');
  6. var redisLib = require('../libs/redis');
  7. var redisClient = redisLib.redisClient;
  8. var redisCo = redisLib.redisCo;
  9. var C = require('../config');
  10. var dbcacheCli = redisLib.dbcacheCli;
  11. var dbcacheCo = redisLib.dbcacheCo;
  12. var _ = require('underscore');
  13. _.str = require('underscore.string');
  14. function db_pool(table_name,model_mrg) {
  15. model_base.call(this);
  16. this.table = table_name;
  17. var model_map = model_mrg.model_map;
  18. this.addIdxDetailMap = function* (item,idx_list,idx_dbname_list,up_cond,expire,key) {
  19. for (let j = 0; j < idx_list.length; j++) {
  20. let idx = idx_list[j];
  21. let idx_dbname = idx_dbname_list[j];
  22. let idx_value = eval("item."+idx);
  23. if (F.isNull(idx_value) || F.isNull(idx_dbname) || F.isNull(up_cond)) {
  24. //TODO//F.addDebugLogs(["set dbcache null:",idx,idx_dbname,idx_value,up_cond]);
  25. return false;
  26. }
  27. // let ckey = F.vsprintf("dbcache#%s#%s#%s",[idx_dbname,idx_value,key]);
  28. // let cvalue = up_cond;
  29. // dbcacheCli.set(ckey, cvalue);
  30. // dbcacheCli.expire(ckey,expire);
  31. let ckey = F.vsprintf("dbcache#%s#%s",[idx_dbname,idx_value]);
  32. let cvalue = key+"#$&*^%dj53#"+up_cond;
  33. let expire_time = new Date().getTime()/1000 + expire;
  34. dbcacheCli.ZADD(ckey,expire_time,cvalue);
  35. dbcacheCli.expire(ckey,expire);
  36. }
  37. return true;
  38. }
  39. this.addIdxMap = function* (res,redis_param,expire,key) {
  40. if(F.isNull(redis_param.update_affect_cond)) return false;
  41. let idx_list = redis_param.idx.split(",");
  42. let idx_dbname_list = redis_param.idx_dbname.split(",");
  43. let up_cond = redis_param.update_affect_cond;
  44. if (Array.isArray(res)) {
  45. for (let i = 0; i < res.length; i++) {
  46. let item = res[i];
  47. let add_det_res = yield this.addIdxDetailMap(item,idx_list,idx_dbname_list,up_cond,expire,key);
  48. if (false == add_det_res) return false;
  49. }
  50. } else {
  51. let add_det_res = yield this.addIdxDetailMap(res,idx_list,idx_dbname_list,up_cond,expire,key);
  52. if (false == add_det_res) return false;
  53. }
  54. return true;
  55. }
  56. /**
  57. * 函数返回结果被redis缓存,注意:param_arr里不能传框架得对象,例如:不能包含事务connection
  58. * 例如 wrap('mgr_map.user_anchor.getIndexPageAnchorList',[ctx.I.start, ctx.I.step],redis_param,expire)
  59. * 因为mgr_map每个地方命名不一样,这里统一用这个命名
  60. * redis_param: {
  61. * idx: 数据集属性唯一索引 (如果返回值是数组,idx为每个元素(元素必须为字典)里的属性值,如果为字典,idx直接为字典里属性)
  62. * idx_dbname: idx 对应数据库名
  63. * update_affect_cond: 执行update时 如果这些属性名有变,则要删除该key
  64. * }
  65. */
  66. this.wrap = function* (fnname,param_arr=[],redis_param={},expire=300,cache_null=false) {
  67. let param_str = JSON.stringify(param_arr);
  68. let key = fnname+"("+param_str+")";
  69. let res = null;
  70. let next_sel_time = 0;
  71. if (C.is_open_sql_redis == true && !F.isNull(redis_param)) {
  72. let redis_res = yield dbcacheCo.hmget(key,["sel_time","del_time","value"]);
  73. if(!F.isNull(redis_res)) {
  74. let sel_time = redis_res[0];
  75. let del_time = redis_res[1];
  76. res = redis_res[2];
  77. let is_dirty = false; // 是否脏数据
  78. if (!F.isNull(sel_time) && !F.isNull(del_time) && parseInt(sel_time) < parseInt(del_time)) is_dirty = true;
  79. if (is_dirty == false && !F.isNull(res)) {
  80. return JSON.parse(res);
  81. }
  82. }
  83. dbcacheCli.expire(key, expire); //提前延迟超时避免这过程有del操作没记录
  84. next_sel_time = yield F.getNextNoRoundId(dbcacheCo,"dbcache"); // 使用redis自增id来标示顺序,必须在select执行之前
  85. }
  86. var exe_cmd = fnname+"(";
  87. for (var i = 0; i < param_arr.length; i++) {
  88. if (i+1 == param_arr.length) exe_cmd += "param_arr["+i+"]";
  89. else exe_cmd += "param_arr["+i+"],";
  90. }
  91. exe_cmd += ")";
  92. //F.addDebugLogs(["exe cmd:",exe_cmd]);
  93. res = yield eval(exe_cmd);
  94. if (C.is_open_sql_redis == true && !F.isNull(redis_param)) {
  95. if (F.isNull(res)) {
  96. if (cache_null) {
  97. } else {
  98. return res;
  99. }
  100. }
  101. if (!F.isNull(res)) {
  102. yield this.addIdxMap(res,redis_param,expire+60,key);
  103. //TODO//F.addDebugLogs(["dbcache bug set:",key]);
  104. //dbcacheCli.expire(key, expire);// 因为hmset不会修改timeout,可能执行完下一句hmset立马timeout,所以得提前设置一下
  105. yield this.dealRedisForInsert(redis_param, key, expire); // 处理insert redis的数据
  106. }
  107. yield dbcacheCo.hmset(key,"sel_time",next_sel_time, "value", JSON.stringify(res));
  108. dbcacheCli.expire(key, expire);
  109. }
  110. return res;
  111. }
  112. this.dealRedisForInsert = function* (redis_param, key, expire) {
  113. if(F.isNull(redis_param.insert_affect_cond) || F.isNull(redis_param.insert_affect_value)) return false;
  114. let where_fields_key = new Array();
  115. let where_fields_value = new Array();
  116. let insert_affect_cond_arr = redis_param.insert_affect_cond.split(',');
  117. for (var i = 0; i < insert_affect_cond_arr.length; i++) {
  118. where_fields_key.push(_.str.trim(insert_affect_cond_arr[i], ' '));
  119. }
  120. where_fields_value = redis_param.insert_affect_value;
  121. if(where_fields_key.length != where_fields_value.length) {
  122. F.addErrLogs(['wrap redis insert数组设置错误', {redis_param: redis_param}]);
  123. return false;
  124. }
  125. let insert_affect_table_dic = {};
  126. // 设置redis
  127. for (var i = 0; i < where_fields_key.length; i++) {
  128. let ckey = F.vsprintf("dbcache_insert#%s#%s",[where_fields_key[i], where_fields_value[i]]);
  129. let cvalue = key + "#$&*^%dj53#" + redis_param.update_affect_cond;
  130. let expire_time = new Date().getTime()/1000 + expire;
  131. dbcacheCli.ZADD(ckey, expire_time, cvalue);
  132. dbcacheCli.expire(ckey, expire);
  133. let table = where_fields_key[i].split('.')[0];
  134. if (table in insert_affect_table_dic) {
  135. } else {
  136. yield redisLib.redisCo.set("dbcache_insert_flag#"+table, "true"); // 添加此标志 db insert时才查redis并删除
  137. yield redisLib.redisCo.expire("dbcache_insert_flag#"+table, expire);
  138. insert_affect_table_dic[table] = 1;
  139. }
  140. }
  141. return true;
  142. }
  143. this.testwrap = function* (a0,a1,a2,a3,a4,a5,a6) {
  144. F.addDebugLogs(["test wrap:",a0,a1,a2,a3,a4,a5,a6]);
  145. return "model pool test wrap";
  146. }
  147. }
  148. module.exports = db_pool;