cronJobLogic.js 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. 'use strict';
  2. var C = require('../config');
  3. var F = require('../common/function');
  4. var _ = require('underscore');
  5. _.str = require('underscore.string');
  6. var async_request = require('request');
  7. var crypto = require('crypto');
  8. var co = require('co');
  9. module.exports = function (app, common_mgr) {
  10. var model_map = app.model_mgr.model_map;
  11. var mgr_map = common_mgr.mgr_map;
  12. var db_timer_db = model_map.db_timer;
  13. var db_timer_bak_db = model_map.db_timer_bak;
  14. var dbTimerPrefix = F.isNull(C.dbTimerPrefix)?"":C.dbTimerPrefix;
  15. var last_allow_time = 0;
  16. let to_delete_job_ids = [];
  17. var that = this;
  18. this.get_stop_flag = function* () {
  19. // constant 加了 stop_cron 则不执行db定时器
  20. // let stop_cron = yield model_map.constant.query('', {
  21. // fields: 'id',
  22. // where: '`key` = ?',
  23. // values: ['stop_cron']
  24. // });
  25. // return stop_cron;
  26. let job_stop_flag = yield mgr_map.redis.getJobStopFlag();
  27. return !F.isNull(job_stop_flag);
  28. };
  29. this.canExeJob = function* () {
  30. let job_count = 100;
  31. let cur_time = new Date().getTime();
  32. if (cur_time > last_allow_time + 10*1000) { // 超过10秒redis控制失效 解决mysql redis不同步问题
  33. } else {
  34. job_count = yield mgr_map.redis.checkJobs(cur_time+15000); // redis 控制 查看15秒内有无可执行任务
  35. if (F.isNull(job_count) || 0 == job_count) return 0;
  36. F.addDebugLogs(["job_count:",job_count]);
  37. job_count = job_count + 10;
  38. }
  39. last_allow_time = cur_time;
  40. return job_count;
  41. }
  42. this.getTimerNextExeJob = function* (job_count) {
  43. let jobs = [];
  44. let cur_time = new Date().getTime() + 15000; // 抽出15秒内要执行的job
  45. var dbConnection = null;
  46. try {
  47. let job_list = yield db_timer_db.query(dbConnection,{
  48. fields: '*',
  49. order: 'next_exe_timestamp',
  50. where: 'dbTimerPrefix = ? and `status` = ? and next_exe_timestamp <= ?',
  51. values: [dbTimerPrefix, db_timer_db.status.noexcute,cur_time],
  52. //forUpdate: true,
  53. //limit: job_count
  54. });
  55. if (F.isNull(job_list)) return jobs;
  56. let job_id_list = [];
  57. for (let i = 0; i < job_list.length; i++) {
  58. job_id_list.push(job_list[i].id);
  59. }
  60. let upres = yield db_timer_db.execute_raw(
  61. "update db_timer set `status` = ? where id in (?) and `status` = ?",
  62. [db_timer_db.status.excuting,job_id_list,db_timer_db.status.noexcute]);
  63. for (let i = 0; i < job_list.length; i++) {
  64. if (i + 1 == job_list.length) yield mgr_map.redis.delJob(job_list[i].id);
  65. else co(mgr_map.redis.delJob(job_list[i].id));
  66. }
  67. jobs = job_list;
  68. F.addOtherLogs('job/job',["exec job:",job_id_list]);
  69. } catch (e) {
  70. console.trace(e.stack);
  71. F.addErrLogs(["db_timer_exe err:",e.stack]);
  72. F.throwErr(e.toString());
  73. }
  74. return jobs;
  75. }
  76. this.db_timer_exe_detail = function* (next_job) {
  77. let err = '';
  78. try {
  79. let job_data = JSON.parse(next_job.data);
  80. let fn = eval("mgr_map."+next_job.key);
  81. yield fn(job_data);
  82. } catch (e) {
  83. err = e.stack;
  84. F.addErrLogs(["db_timer_exe_detail err:",e.stack]);
  85. if (F.isNull(e.code)) yield mgr_map.notice.sendDingTalk(err); // 钉钉预警
  86. }
  87. return err;
  88. };
  89. this.update_job_status = function* (next_job,err) {
  90. let new_status = F.isNull(err)?db_timer_db.status.excuteSuc:db_timer_db.status.excuteFail;
  91. if (!F.isNull(err)) {
  92. yield db_timer_bak_db.insert(null,{
  93. '`key`':next_job.key,
  94. 'oldid':next_job.id,
  95. '`data`':next_job.data,
  96. 'next_exe_timestamp':next_job.next_exe_timestamp,
  97. 'next_exe_time':next_job.next_exe_time,
  98. '`status`':new_status,
  99. 'fail_reason':C.inner_host+":"+app.port+" "+err,
  100. 'dbTimerPrefix':dbTimerPrefix
  101. });
  102. }
  103. to_delete_job_ids.push(next_job.id);
  104. //yield db_timer_db.delete(null,{
  105. // where: 'id = ?',
  106. // values: [next_job.id]
  107. //})
  108. }
  109. this.realDeteleJob = function* () {
  110. if (to_delete_job_ids.length > 0) {
  111. let jobIds = JSON.parse(JSON.stringify(to_delete_job_ids));
  112. let delLength = jobIds.length;// 这一步很关键因为下面一步会被协程切换
  113. to_delete_job_ids.splice(0, delLength);
  114. co(function* () {
  115. F.addOtherLogs('job/job',["del job:",jobIds]);
  116. yield db_timer_db.execute_raw(
  117. "delete from db_timer where id in (?)",[jobIds]);
  118. });
  119. }
  120. }
  121. this.async_run = function* (next_job) {
  122. let err = yield that.db_timer_exe_detail(next_job);
  123. yield that.update_job_status(next_job,err);
  124. }
  125. // 每秒轮询执行db定时器得任务
  126. this.db_timer_exe = function* (cur_time) {
  127. co(that.realDeteleJob()); // 删除已完成的任务
  128. try {
  129. let wait_time = F.RandomNumBoth(0,800);
  130. yield F.sleep(wait_time); // 保证每个进程的job getter都有随机机会抢到执行
  131. let lock = yield mgr_map.redis.setnx("dbtimer_lock", 60*60);
  132. if(lock == false) return true; // 一刻只保留一个job getter就够了
  133. let job_count = yield that.canExeJob();
  134. if (job_count > 0) {
  135. let next_job_list = yield that.getTimerNextExeJob(job_count);
  136. for (let i = 0; i < next_job_list.length; i++) {
  137. let next_job = next_job_list[i];
  138. co(function* () {
  139. let cur_time = new Date().getTime();
  140. let wait_time = next_job.next_exe_timestamp - cur_time;
  141. if (wait_time > 0) {
  142. yield F.sleep(wait_time); // 还没到执行时间
  143. }
  144. yield that.async_run(next_job);
  145. });
  146. }
  147. }
  148. yield mgr_map.redis.del("dbtimer_lock");
  149. } catch (e) {
  150. F.addErrLogs(["db_timer_exe err:",e.stack]);
  151. yield mgr_map.redis.del("dbtimer_lock");
  152. }
  153. };
  154. this.batchAddJob = function() {
  155. // db 定时器
  156. mgr_map.cronJob.addJob("*-*-* *:*:*", that.db_timer_exe);
  157. // 删除一个月前的db_timer_bak 每天执行一次 03:25:00
  158. mgr_map.cronJob.addJob("*-*-* 12:25:00", mgr_map.dbTimer.dealDbTimeerBakData);
  159. //mgr_map.cronJob.addJob("*-*-* *:*:*/3", mgr_map.dbTimer.exeBakRtmpJpg);
  160. //mgr_map.cronJob.addJob("*-*-* *:*:*", mgr_map.dbTimer.yace);
  161. //mgr_map.cronJob.addJob("*-*-* *:*:*/5", mgr_map.room.testpush);
  162. //mgr_map.cronJob.addJob("*-*-* 03:05:00", mgr_map.growing.batchLowerIntimacy);
  163. //// 定时QN私信汇报 每10分钟
  164. //mgr_map.cronJob.addJob("*-*-* *:*/10:00", mgr_map.consume.timerToQnCollect);
  165. //// 每20秒检查公告并发送
  166. //mgr_map.cronJob.addJob("*-*-* *:*:*/20", mgr_map.room.periodsendAnnouncement);
  167. //
  168. //// 删除创建10天前的游客账号 每天执行一次 03:15:00
  169. //mgr_map.cronJob.addJob("*-*-* 03:15:00", mgr_map.collect.dealVisitorAccount);
  170. //// 删除10天前的login_event 每天执行一次 03:30:00
  171. //mgr_map.cronJob.addJob("*-*-* 03:30:00", mgr_map.loginEvent.dealLoginEventData);
  172. //// 格式化观众每日被动私信新关系数 每天执行一次 00:00:00
  173. //mgr_map.cronJob.addJob("*-*-* 00:00:00", mgr_map.user.initDayMessageNum);
  174. };
  175. };