model_base.js 38 KB


  1. 'use strict';
  2. var mysqlLib = require('../libs/mysql.js');
  3. var C = require('../config');
  4. var F = require('../common/function.js');
  5. var _ = require('underscore');
  6. _.str = require('underscore.string');
  7. var redis = require('../libs/redis');
  8. var redisLib = require('../libs/redis');
  9. var redisClient = redisLib.redisClient;
  10. var redisCo = redisLib.redisCo;
  11. var dbcacheCli = redisLib.dbcacheCli;
  12. var dbcacheCo = redisLib.dbcacheCo;
  13. var server = C.mysqlServers[0];
  14. var com_mysql = new mysqlLib();
  15. function model_base() {
  16. this.table = '';
  17. this.uni_index = '';
  18. this.mysql = com_mysql;
  19. this.table_default_value_map = {};
  20. var that = this;
  21. this.getNextInsertId = function* (uniid) {
  22. var min = 99999999;
  23. var key = "mycatinsertid";
  24. var next_id = yield redisCo.HINCRBY(key,uniid,1);
  25. next_id = parseInt(next_id);
  26. if (next_id > 9007199254730993 && next_id % 100000 == 0) {
  27. yield redisCo.hmset(key,uniid,1);
  28. }
  29. return min+next_id;
  30. };
  31. this.getNextTransactionId = function* () {
  32. var key = "mysql_transaction_incr_key";
  33. var field = 'mysql_transaction';
  34. var next_id = yield redisCo.HINCRBY(key,field,1);
  35. next_id = parseInt(next_id);
  36. if (next_id > 9999999999999 && next_id % 100000 == 0) {
  37. yield redisCo.hmset(key,field,1);
  38. }
  39. return next_id;
  40. };
  41. /*
  42. * 查询
  43. * options object 查询操作,属性包括:
  44. ** fields string 默认*
  45. ** where string 默认空
  46. ** values array 默认空数组
  47. ** limit string 默认空
  48. ** order string 默认空
  49. ** use_redis bool 是否使用缓存 默认false
  50. ** join 链表语句 默认空
  51. ** as 表别名 默认空
  52. ** sub_table 子查询
  53. */
  54. this.query = function* (connection, options) {
  55. if(typeof options != 'object') F.throwErr('mysql query sql error:options is not object.');
  56. if(F.isNull(options.fields)) options.fields = '*';
  57. var key;
  58. /*
  59. if(F.isNull(connection) && F.isNull(options.forUpdate) && !F.isNull(options.use_redis) && F.isNull(options.join)
  60. && options.use_redis == true && C.is_open_sql_redis == true) {
  61. this.checkWhereToRedis(options);
  62. var fields_redis = this.formatFieldsToRedis(options.fields);console.log(fields_redis);
  63. var where_redis = this.formatWhereToRedis(options.where, options.values);console.log(where_redis);
  64. key = this.getQueryRedisKey(fields_redis, where_redis + '&', options);console.log(key);
  65. var res = yield redis.redisCo.get(key);
  66. if(!F.isNull(res)) {
  67. F.addDebugLogs(["redis get:",key]);
  68. return JSON.parse(res);
  69. }
  70. }*/
  71. var table = F.isNull(options.sub_table) ? this.table : options.sub_table;
  72. var server = C.mysqlServers[0];
  73. if(!F.isNull(server.is_mycat) && !F.isNull(options.group)) { // 兼容mycat 语法
  74. let group_dic = this.commaStrToDic(options.group);
  75. let field_dic = this.commaStrToDic(options.fields);
  76. for (let k in group_dic) {
  77. if (!(k in field_dic)) {
  78. options.fields += "," + k;
  79. }
  80. }
  81. }
  82. if(!F.isNull(server.is_mycat) && !F.isNull(options.order)) { // 兼容mycat 语法
  83. let order_dic = this.commaStrToDic(options.order);
  84. let field_dic = this.commaStrToDic(options.fields);
  85. for (let k in order_dic) {
  86. if (!(k in field_dic)) {
  87. options.fields += "," + k;
  88. }
  89. }
  90. }
  91. var sql = `SELECT ${options.fields} FROM ${table}`;
  92. if(!F.isNull(options.as)) {
  93. sql += ` as ${options.as}`;
  94. }
  95. if(!F.isNull(options.join)) {
  96. sql += ` ${options.join}`;
  97. }
  98. if(!F.isNull(options.where)) {
  99. sql += ` WHERE ${options.where}`;
  100. }else {
  101. options.values = new Array();
  102. }
  103. if(!F.isNull(options.group)) {
  104. sql += ` GROUP BY ${options.group}`;
  105. }
  106. if(!F.isNull(options.having)) {
  107. sql += ` HAVING ${options.having}`;
  108. }
  109. if(!F.isNull(options.order)) {
  110. sql += ` ORDER BY ${options.order}`;
  111. }
  112. if(!F.isNull(options.limit)) {
  113. sql += ` LIMIT ${options.limit}`;
  114. }
  115. if(!F.isNull(options.before)) {
  116. sql = options.before + ';' + sql;
  117. }
  118. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  119. let start_time = new Date().getTime();
  120. var res = yield exec_conn.query(sql, options.values);
  121. if(!F.isNull(connection)) {
  122. let end_time = new Date().getTime();
  123. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,options.values]);
  124. if (end_time - start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,options.values]);
  125. }
  126. if (!F.isNull(connection)) res = res[0];
  127. /*
  128. if(F.isNull(connection) && F.isNull(options.forUpdate) && !F.isNull(options.use_redis)
  129. && options.use_redis == true && C.is_open_sql_redis == true) {
  130. if (F.isNull(res)) {
  131. F.addErrLogs(["redis get:",key,"is null"]);
  132. return res;
  133. }
  134. yield redis.redisCo.set(key, JSON.stringify(res));
  135. var expire = 180;
  136. if (!F.isNull(options.expire)) expire = options.expire;
  137. yield redis.redisCo.expire(key, expire);
  138. yield this.setWhereGroup(options.where);
  139. }*/
  140. return res;
  141. }
  142. this.commaStrToDic = function (str) {
  143. let new_str = str;
  144. let item_list = new_str.split(',')
  145. let dic = {}
  146. for (let oi = 0; oi < item_list.length; oi++) {
  147. let item = item_list[oi];
  148. item = _.str.trim(item, ' ');
  149. if (F.isNull(item)) continue;
  150. let low_item = item;
  151. if (low_item.indexOf(" as ") >= 0) {
  152. let item_sub_list = low_item.split(" as ");
  153. let as_item = item_sub_list.pop();
  154. as_item = _.str.trim(as_item, ' ');
  155. dic[as_item] = 1;
  156. item = item.split('as')[0]
  157. }
  158. if (low_item.indexOf(" AS ") >= 0) {
  159. let item_sub_list = low_item.split(" AS ");
  160. let as_item = item_sub_list.pop();
  161. as_item = _.str.trim(as_item, ' ');
  162. dic[as_item] = 1;
  163. item = item.split('AS')[0]
  164. }
  165. if (item.indexOf("(") >= 0 || item.indexOf(")") >= 0) {
  166. continue;
  167. }
  168. dic[item.split(' ')[0]] = 1;
  169. }
  170. return dic;
  171. }
  172. /* 返回一条结果 */
  173. this.queryOne = function* (connection, options) {
  174. let res = yield this.query(connection, options);
  175. return F.isNull(res) ? {} : res[0];
  176. }
  177. /*
  178. * 插入
  179. * values object (such as:{"name":"test","age":23})
  180. */
  181. this.insert = function* (connection, values){
  182. if(F.isNull(values)) F.throwErr('values is null');
  183. if(!F.isNull(that.preInsert)) {
  184. values = yield that.preInsert(connection, values);
  185. }
  186. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  187. var server = C.mysqlServers[0];
  188. if (!F.isNull(that.auto_index) && !(that.auto_index in values) && !F.isNull(server.is_mycat)) {
  189. values[that.auto_index] = 'next value for MYCATSEQ_GLOBAL';//yield that.getNextInsertId(that.table);
  190. }
  191. var keys = Object.keys(values);
  192. var vals = F.values(values);
  193. var keys_str = keys.join();
  194. var vals_str = "";
  195. for (var i = 0; i < keys.length; i++) {
  196. vals_str += "?,";
  197. }
  198. vals_str = _.str.trim(vals_str, ',');
  199. var sql = `INSERT INTO ${this.table}(${keys_str}) VALUES(${vals_str})`;
  200. let start_time = new Date().getTime();
  201. var res = yield exec_conn.query(sql, vals);
  202. var insert_res;
  203. if(!F.isNull(connection)) {
  204. let end_time = new Date().getTime();
  205. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  206. if (end_time-start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  207. insert_res = res[0].insertId;
  208. }else {
  209. insert_res = res.insertId;
  210. }
  211. if (C.is_open_sql_redis == true && insert_res > 0) {
  212. let to_de_redkey = yield this.getInsertRedisDelKey(values);
  213. yield this.delDbcache(to_de_redkey, connection);
  214. }
  215. return res;
  216. }
  217. /**
  218. * 存在更新,不存在插入(注意,必须存在唯一索引才可使用)
  219. * @param connection
  220. * @param values 插入的数据values object (such as:{"name":"test","age":23})
  221. */
  222. this.insertOnUpdate = function* (connection, values){
  223. if(F.isNull(values)) F.throwErr('values is null');
  224. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  225. var server = C.mysqlServers[0];
  226. if (!F.isNull(that.auto_index) && !(that.auto_index in values) && !F.isNull(server.is_mycat)) {
  227. values[that.auto_index] = 'next value for MYCATSEQ_GLOBAL';//yield that.getNextInsertId(that.table);
  228. }
  229. var keys = Object.keys(values);
  230. var vals = F.values(values);
  231. vals = vals.concat(vals);
  232. var keys_str = keys.join();
  233. var vals_str = "";
  234. var up_vals_str = "";
  235. for (var i = 0; i < keys.length; i++) {
  236. vals_str += "?,";
  237. }
  238. vals_str = _.str.trim(vals_str, ',');
  239. for(let key of keys){
  240. up_vals_str += `${key}=?,`;
  241. }
  242. up_vals_str = _.str.trim(up_vals_str, ',');
  243. var sql = `INSERT INTO ${this.table}(${keys_str}) VALUES(${vals_str}) ON DUPLICATE KEY UPDATE ${up_vals_str}`;
  244. let start_time = new Date().getTime();
  245. var res = yield exec_conn.query(sql, vals);
  246. var insert_res;
  247. if(!F.isNull(connection)) {
  248. let end_time = new Date().getTime();
  249. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  250. if (end_time-start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  251. insert_res = res[0].insertId;
  252. }else {
  253. insert_res = res.insertId;
  254. }
  255. if (C.is_open_sql_redis == true && insert_res > 0) {
  256. let to_de_redkey = yield this.getInsertRedisDelKey(values);
  257. yield this.delDbcache(to_de_redkey, connection);
  258. }
  259. return res;
  260. }
  261. /*
  262. * 批量插入
  263. * values object (such as:[{"name":"test1","age":23},{"name":"test2","age":23},{"name":"test3","age":23}])
  264. */
  265. this.insertAll = function* (connection, values){
  266. if(F.isNull(values)) F.throwErr('keys or values is null');
  267. var keys = Object.keys(values[0]);
  268. var vals =[];
  269. var vals_str = '';
  270. var keys_str = keys.join();
  271. for(var j=0; j<values.length; j++){
  272. vals = vals.concat(F.values(values[j]));
  273. vals_str += "(";
  274. for (var i = 0; i < keys.length; i++) {
  275. vals_str += "?,";
  276. }
  277. vals_str = _.str.trim(vals_str, ',');
  278. vals_str += "),";
  279. }
  280. vals_str = _.str.trim(vals_str, ',');
  281. var sql = `INSERT INTO ${this.table}(${keys_str}) VALUES ${vals_str}`;
  282. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  283. let start_time = new Date().getTime();
  284. var res = yield exec_conn.query(sql, vals);
  285. if(!F.isNull(connection)) {
  286. let end_time = new Date().getTime();
  287. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  288. if (end_time-start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql,vals]);
  289. }
  290. return res;
  291. }
  292. this.getUpdateAfterDelRedisKeys = function* (option, before_values) {
  293. let keys = {};
  294. if(!F.isNull(option.update_values)) {
  295. keys = JSON.parse(JSON.stringify(option.update_values));
  296. }
  297. if(!F.isNull(option.update_string)) {
  298. let fields_arr = option.update_string.split(',');
  299. let index = 0;
  300. for (var i = 0; i < fields_arr.length; i++) {
  301. let field_arr = fields_arr[i].split('=');
  302. if(field_arr[1].indexOf('+') != -1 || field_arr[1].indexOf('-') != -1) continue;
  303. let field_name = _.str.trim(field_arr[0], ' ');
  304. let field_value;
  305. if(field_arr[1].indexOf('?') != -1) {
  306. field_value = before_values[index++];
  307. }else {
  308. field_value = _.str.trim(field_arr[1], ' ');
  309. }
  310. keys[field_name] = field_value;
  311. }
  312. }
  313. return yield this.getInsertRedisDelKey(keys, false);
  314. }
  315. this.getInsertRedisDelKey = function* (values, is_select_default = true) {
  316. let redis_key = new Array();
  317. let insert_flag = yield redisLib.redisCo.get("dbcache_insert_flag#"+this.table);
  318. if (F.isNull(insert_flag)) return redis_key;
  319. let keys = {};
  320. if(is_select_default == true) {
  321. let default_value = yield this.getFieldDefaultValue(this.table);
  322. for (var field in default_value) {
  323. if(F.isNull(values[field])) {
  324. if(default_value[field] != 'db_null') {
  325. keys[field] = default_value[field];
  326. }
  327. }else {
  328. keys[field] = values[field];
  329. }
  330. }
  331. }else {
  332. keys = values;
  333. }
  334. for (var field in keys) {
  335. redis_key.push(F.vsprintf("dbcache_insert#%s#%s",[this.table + '.' + field, keys[field]]));
  336. }
  337. return redis_key;
  338. }
  339. /* 获取表字段默认值
  340. * table 表名
  341. * field 查询的字段名 为空则查询所有
  342. */
  343. this.getFieldDefaultValue = function* (table, field = '') {
  344. if(field == '') {
  345. if(!F.isNull(this.table_default_value_map[table])) return this.table_default_value_map[table];
  346. let table_info = yield that.mysql.query('desc ' + table);
  347. if(F.isNull(table_info)) F.throwErr('ER_NO_SUCH_TABLE');
  348. let field_info = {};
  349. for (var i = 0; i < table_info.length; i++) {
  350. field_info[table_info[i].Field] = table_info[i].Default != null ? table_info[i].Default : 'db_null';
  351. }
  352. this.table_default_value_map[table] = field_info;
  353. return field_info;
  354. }
  355. try{
  356. let field_default = yield that.mysql.query(`select default(${field}) as default_value from ${table} limit 1`);
  357. if(F.isNull(field_default)) F.throwErr('ER_BAD_FIELD_ERROR or ER_NO_SUCH_TABLE');
  358. return field_default[0].default_value;
  359. }catch(e) {
  360. return 'db_null';
  361. }
  362. }
  363. this.getCond = function* (res,to_de_redkey) {
  364. var index_list = this.uni_index.split(",");
  365. var cond = "(";
  366. for (var i = 0; i < index_list.length; i++) {
  367. if (index_list[i] in res) {
  368. var idval = eval("res."+index_list[i]);
  369. cond = cond + "'"+idval+"',";
  370. let ckey = F.vsprintf("dbcache#%s.%s#%s",[this.table,index_list[i],idval]);
  371. to_de_redkey.push(ckey);
  372. } else {
  373. cond = cond +"null,"
  374. }
  375. }
  376. cond = _.str.trim(cond, ',');
  377. cond = cond + "),";
  378. return cond;
  379. }
  380. this.get_uni_index = function* (connection=null) {
  381. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  382. var usage_table = 'KEY_COLUMN_USAGE';
  383. var server = C.mysqlServers[0];
  384. if (F.isNull(server.is_mycat)) {
  385. usage_table = "INFORMATION_SCHEMA.KEY_COLUMN_USAGE";
  386. }
  387. var sql = "SELECT COLUMN_NAME FROM " + usage_table +
  388. " where CONSTRAINT_SCHEMA = '"+that.mysql.conf.database+"' and TABLE_NAME = '"+this.table+"' and CONSTRAINT_NAME = 'PRIMARY'"
  389. var res = yield exec_conn.query(sql);
  390. if (!F.isNull(connection)) res = res[0];
  391. console.log("uniindex:",res);
  392. var COLUMN_NAME = '';
  393. for (var i = 0; i < res.length; i++) {
  394. COLUMN_NAME = COLUMN_NAME + res[i].COLUMN_NAME + ",";
  395. }
  396. COLUMN_NAME = _.str.trim(COLUMN_NAME, ',');
  397. return COLUMN_NAME;
  398. }
  399. this.getWhereSqlByUniIndex = function* (connection=null,options,to_de_redkey,update_data=null) {
  400. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  401. var sql = `select ${this.uni_index} from ${this.table} where ${options.where}`
  402. if(!F.isNull(options.limit)) {
  403. sql += " limit " + options.limit;
  404. }
  405. var res = yield exec_conn.query(sql, options.values);
  406. if (!F.isNull(connection)) res = res[0];
  407. var in_str = ` where (${this.uni_index}) in (`;
  408. if (res.length > 0) {
  409. for (var i = 0; i < res.length; i++) {
  410. in_str += yield this.getCond(res[i],to_de_redkey);
  411. }
  412. } else {
  413. in_str += yield this.getCond({},to_de_redkey);
  414. }
  415. in_str = _.str.trim(in_str, ',');
  416. in_str = in_str + ")";
  417. if (!F.isNull(update_data)) {
  418. in_str = in_str + " and " + options.where; // 把update where条件中加入 主键索引 避免死锁
  419. update_data.push.apply(update_data, options.values);
  420. }
  421. return in_str;
  422. }
  423. this.splitUpdateStr = function* (options,update_key) {
  424. if(F.isNull(options.update_string)) return [];
  425. // find update key
  426. var sub_str = options.update_string.split(',');
  427. for (var j in sub_str) {
  428. var up_str = sub_str[j];
  429. if (F.isNull(up_str)) continue;
  430. var vlist = up_str.split("=");
  431. var key = vlist[0];
  432. key = _.str.trim(key, ' ');
  433. key = _.str.trim(key, ',');
  434. key = _.str.trim(key, ' ');
  435. update_key.push(this.table+"."+key);
  436. }
  437. var update_string = options.update_string;
  438. var re = new RegExp("\\?","g");
  439. var arr = update_string.match(re);
  440. if (F.isNull(arr)) return [];
  441. var arr_len = arr.length;
  442. var before_where_values = [];
  443. var where_values = [];
  444. for (var i = 0; i < options.values.length; i++) {
  445. if (i<arr_len) before_where_values.push(options.values[i]);
  446. else where_values.push(options.values[i]);
  447. }
  448. options.values = where_values;
  449. return before_where_values;
  450. }
  451. this.updateKeyMatchCond = function (cond,up_key_arr) {
  452. let cond_list = cond.split(",");
  453. for (var j = 0; j < cond_list.length; j++) {
  454. var cond_item = cond_list[j];
  455. for (var i = 0; i < up_key_arr.length; i++) {
  456. var up_key = up_key_arr[i];
  457. if (cond_item.indexOf(up_key) >= 0) {
  458. return true;
  459. }
  460. }
  461. }
  462. return false;
  463. }
  464. this.delTimeoutKey = function*(idx_value_key,cur_time) {
  465. let timeout_key_list = yield dbcacheCo.zrangebyscore([idx_value_key,0,cur_time]);
  466. for (var i = 0; i < timeout_key_list.length; i++) {
  467. let key_cond = timeout_key_list[i];
  468. dbcacheCli.zrem(idx_value_key,key_cond);
  469. }
  470. };
  471. this.delDbcache = function* (to_de_redkey,connection,update_key=[]) {
  472. let cur_time = new Date().getTime()/1000 - 600; // 因为服务是集群,担心时间有误差,给个10分钟缓冲
  473. let del_time = yield F.getNextNoRoundId(dbcacheCo,"dbcache"); //删除redis 时间 必须在sql执行之后
  474. for (let j = 0; j < to_de_redkey.length; j++) {
  475. let idx_value_key = to_de_redkey[j]; // idx
  476. yield this.delTimeoutKey(idx_value_key,cur_time);
  477. let del_value_list = yield dbcacheCo.zrangebyscore([idx_value_key,"("+cur_time,'+inf']);
  478. for (let k = 0; k < del_value_list.length; k++) {
  479. let key_cond = del_value_list[k];
  480. let key_cond_list = key_cond.split("#$&*^%dj53#");
  481. if (key_cond_list.length <= 1) continue;
  482. let real_key = key_cond_list[0];
  483. let cond = key_cond_list[1];
  484. if (F.isNull(cond)) continue;
  485. if (F.isNull(update_key) || this.updateKeyMatchCond(cond,update_key)) {
  486. if (F.isNull(connection)) {
  487. dbcacheCli.zrem(idx_value_key,key_cond);
  488. //dbcacheCli.expire(real_key, 600);// 因为hmset不会修改timeout,可能执行完下一句hmset立马timeout,所以得提前设置一下
  489. dbcacheCli.hmset(real_key,"del_time",del_time, "value", "");
  490. //dbcacheCli.expire(real_key, 600);
  491. //TODO//F.addDebugLogs(["dbcache del:",real_key]);
  492. } else {
  493. let delay_rem_arr = [];
  494. if (!F.isNull(connection.delay_rem_arr)) delay_rem_arr = connection.delay_rem_arr;
  495. delay_rem_arr.push(idx_value_key+"#&*%24dijk#"+key_cond);
  496. connection.delay_rem_arr = delay_rem_arr;
  497. let delay_del_arr = [];
  498. if (!F.isNull(connection.delay_del_arr)) delay_del_arr = connection.delay_del_arr;
  499. delay_del_arr.push(real_key);
  500. connection.delay_del_arr = delay_del_arr;
  501. }
  502. }
  503. }
  504. }
  505. };
  506. this.save = function* (dbConnection, option) {
  507. let res = yield this.update(dbConnection, option);
  508. return F.isNull(dbConnection) ? res.affectedRows : res[0].affectedRows;
  509. };
  510. /*
  511. * 更新
  512. * options object 查询操作,属性包括:
  513. ** where string 不能为空
  514. ** values array (such as:["test",23]) 默认空数组
  515. ** update_values object (such as:{"name":"test","age":23}) 默认空
  516. ** update_string string 自定义操作 (such as:"login_times = login_times + 1") 默认空
  517. */
  518. this.update = function* (connection, options) {
  519. var update_data = new Array();
  520. var update_key = new Array();
  521. var update_sql = `UPDATE ${this.table} SET `;
  522. try {
  523. if (F.isNull(this.uni_index)) {
  524. this.uni_index = yield this.get_uni_index(connection);
  525. }
  526. if(typeof options != 'object') F.throwErr('mysql update sql error:options is not object.');
  527. if(F.isNull(options.where)) F.throwErr('mysql update sql error:where can not null.');
  528. if(F.isNull(options.values)) options.values = new Array();
  529. var has_update_values = F.isNull(options.update_values) ? false : true;
  530. if(has_update_values == true) {
  531. for(var field in options.update_values) {
  532. update_data.push(options.update_values[field]);
  533. update_sql += field + "=?,";
  534. update_key.push(this.table+"."+field);
  535. }
  536. update_sql = _.str.trim(update_sql, ',');
  537. }
  538. if(!F.isNull(options.update_string)) {
  539. if(has_update_values == true) {
  540. update_sql += ',' + options.update_string;
  541. }else{
  542. update_sql += options.update_string;
  543. }
  544. }
  545. //update_sql += ` WHERE (${this.uni_index}) in (select ${this.uni_index} from ( select ${this.uni_index} from ${this.table} where ${options.where}) as unidtmp)`;
  546. var before_values = yield this.splitUpdateStr(options,update_key);
  547. var to_de_after_redkey = yield this.getUpdateAfterDelRedisKeys(options, before_values);
  548. if (!F.isNull(before_values)) update_data.push.apply(update_data, before_values);
  549. var to_de_redkey = [];
  550. update_sql += yield this.getWhereSqlByUniIndex(connection,options,to_de_redkey,update_data);
  551. //update_data.push.apply(update_data, options.values);
  552. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  553. let start_time = new Date().getTime();
  554. var res = yield exec_conn.query(update_sql, update_data);
  555. if(!F.isNull(connection)) {
  556. let end_time = new Date().getTime();
  557. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,update_sql,update_data]);
  558. if (end_time - start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,update_sql,update_data]);
  559. }
  560. var affect_msg = '';
  561. if (!F.isNull(connection)) affect_msg = res[0].message;
  562. else affect_msg = res.message;
  563. //F.addDebugLogs(["dbcache bug del sql:",update_sql,update_data]);
  564. if (C.is_open_sql_redis == true && affect_msg.indexOf('Changed: 0 Warnings') == -1) {
  565. yield this.delDbcache(to_de_redkey,connection,update_key);
  566. yield this.delDbcache(to_de_after_redkey,connection);
  567. }
  568. // if (!F.isNull(connection)) {
  569. // res[0].affectedRows = res[0].changedRows;
  570. // } else {
  571. // res.affectedRows = res.changedRows;
  572. // }
  573. return res; // 最后返回 防止执行sql报错而删除redis
  574. } catch (e) {
  575. var use_conn = !F.isNull(connection)?'true':'false';
  576. F.addErrLogs(["update err:",use_conn,update_sql,update_data,e.stack]);
  577. throw e;
  578. }
  579. // if(F.isNull(connection) && C.is_open_sql_redis == true) {
  580. // // 获取更新缓存keys
  581. // var del_keys = yield this.getDeleteRedisKey(options.where, options.values);
  582. // if(!F.isNull(del_keys)) {
  583. // // 删除keys
  584. // yield redis.redisCo.del(del_keys);
  585. // }
  586. // }
  587. }
  588. this.safeUpdate = function* (connection, options) {
  589. var res = yield this.update(connection, options);
  590. if (!F.isNull(connection)) res = res[0];
  591. return res;
  592. }
  593. /*
  594. * 删除
  595. * options object 查询操作,属性包括:
  596. ** where string
  597. ** values array (such as:["test",23]) 默认空数组
  598. ** limit string 默认空
  599. */
  600. this.delete = function* (connection, options) {
  601. if (F.isNull(this.uni_index)) {
  602. this.uni_index = yield this.get_uni_index(connection);
  603. }
  604. if(typeof options != 'object') F.throwErr('mysql delete sql error:options is not object.');
  605. if(F.isNull(options.where)) F.throwErr('mysql delete sql error:where can not null.');
  606. if(F.isNull(options.values)) options.values = new Array();
  607. //var sql = `DELETE FROM ${this.table} WHERE (${this.uni_index}) in (select ${this.uni_index} from (select ${this.uni_index} from ${this.table} where ${options.where}) as unidtmp)`;
  608. var sql = `DELETE FROM ${this.table} `;
  609. var to_de_redkey = [];
  610. sql += yield this.getWhereSqlByUniIndex(connection,options,to_de_redkey);
  611. // if(!F.isNull(options.limit)) {
  612. // sql += " limit " + options.limit;
  613. // }
  614. var exec_conn = F.isNull(connection) ? that.mysql : connection;
  615. //return yield exec_conn.query(sql, options.values);
  616. let start_time = new Date().getTime();
  617. var res = yield exec_conn.query(sql);
  618. if(!F.isNull(connection)) {
  619. let end_time = new Date().getTime();
  620. F.addDebugLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql]);
  621. if (end_time-start_time > C.slow_log_delta) F.addSlowLogs([`con key:${connection.name}`,`usetime:${end_time-start_time}`,sql]);
  622. }
  623. var affect_row = 0;
  624. if (!F.isNull(connection)) affect_row = res[0].affectedRows;
  625. else affect_row = res.affectedRows;
  626. if (C.is_open_sql_redis == true && affect_row > 0) {
  627. yield this.delDbcache(to_de_redkey,connection);
  628. }
  629. return res;
  630. }
  631. this.execute_raw = function* (sql,parameter) {
  632. return yield that.mysql.query(sql,parameter);
  633. };
  634. // 事务相关
  635. this.getConnection = function* (name) {
  636. return yield that.mysql.getConnection(name);
  637. };
  638. this.startTransaction = function* (key,name='') {
  639. if (!F.isNull(key)) name = key;
  640. //F.addLogs(['enter start TRANSACTION ',name]);
  641. //F.addErrLogs(["connection flag get ",name]);
  642. var conn = null;
  643. if (that.mysql.conf.waitForConnections) {
  644. conn = yield this.getConnection(name);
  645. } else {
  646. var trycount = 0;
  647. while (++trycount < 100) {
  648. try {
  649. conn = yield mysqllib.getConnection(name);
  650. } catch(e) {
  651. //F.addErrLogs(['try get conn fail ', name]);
  652. yield F.sleep(10);
  653. }
  654. }
  655. if (F.isNull(conn)) return conn;
  656. }
  657. let tr_id = yield that.getNextTransactionId();
  658. try {
  659. //F.addLogs(['success get TRANSACTION']);
  660. conn.start_time = new Date().getTime();
  661. yield conn.query("START TRANSACTION");
  662. //F.addLogs(['success START TRANSACTION. key:', key]);
  663. if (!F.isNull(key)) { // 加排它锁
  664. F.addLogs(["locks: start ","key:",key]);
  665. //yield conn.query(`insert into lockcontrol (lockkey) values ('${key}') on DUPLICATE key UPDATE lockkey = lockkey`);
  666. yield conn.query(`insert into lockcontrol (lockkey) values ('${key}')`);
  667. F.addLogs(["locks: get lock suc ","key:",key]);
  668. conn.key = key;
  669. conn.name = key;
  670. }
  671. if (!F.isNull(name)) conn.name = name;
  672. conn.tr_id = tr_id;
  673. F.addLogs(['TRANSACTION log start', {tr_id:tr_id, start_time:conn.start_time, name:name, key:key}]);
  674. return conn;
  675. } catch(e) {
  676. if (!F.isNull(conn)) {
  677. yield conn.query(`DELETE from lockcontrol WHERE lockkey = '${key}';`);
  678. yield this.rollback(conn);
  679. }
  680. F.addErrLogs(["get connection err:",e.stack]);
  681. F.addLogs(['TRANSACTION log start_err', {tr_id:tr_id, name:name, key:key}]);
  682. throw e;
  683. }
  684. };
  685. this.addSlowLog = function* (conn,status) {
  686. var end_time = new Date().getTime();
  687. if (end_time-conn.start_time > C.slow_log_delta) F.addSlowLogs([`usetime:${end_time-conn.start_time}`,"conn name:",conn.name,"status:",status]);
  688. delete conn.start_time;
  689. delete conn.name;
  690. };
  691. this.commit = function* (conn) {
  692. if (F.isNull(conn)) return;
  693. if (conn.has_release == true) {
  694. F.addErrLogs(["conn commit again:",conn.name]);
  695. return;
  696. }
  697. var name = conn.name;
  698. var tr_id = conn.tr_id;
  699. try {
  700. if (!F.isNull(conn.key)) {
  701. //F.addLogs(["locks: commit ","key:",conn.key]);
  702. yield conn.query(`DELETE from lockcontrol WHERE lockkey = '${conn.key}';`);
  703. delete conn.key;
  704. }
  705. yield conn.query("commit");
  706. if (!F.isNull(conn.delay_rem_arr)) {
  707. for (let i = 0; i < conn.delay_rem_arr.length; i++) {
  708. let del_key = conn.delay_rem_arr[i];
  709. let key_list = del_key.split("#&*%24dijk#");
  710. let idx_value_key = key_list[0];
  711. let key_cond = key_list[1];
  712. dbcacheCli.zrem(idx_value_key,key_cond);
  713. //TODO//F.addDebugLogs(["del dbcache:",idx_value_key,key_cond]);
  714. }
  715. }
  716. if (!F.isNull(conn.delay_del_arr)) {
  717. let del_time = yield F.getNextNoRoundId(dbcacheCo,"dbcache"); //删除redis 时间 必须在sql执行之后
  718. for (let i = 0; i < conn.delay_del_arr.length; i++) {
  719. let del_key = conn.delay_del_arr[i];
  720. //dbcacheCli.expire(del_key, 600); // 因为hmset不会修改timeout,可能执行完下一句hmset立马timeout,所以得提前设置一下
  721. dbcacheCli.hmset(del_key,"del_time",del_time, "value", "");
  722. //dbcacheCli.expire(del_key, 600);
  723. //TODO//F.addDebugLogs(["del dbcache:",del_key]);
  724. }
  725. }
  726. yield this.addSlowLog(conn,"commit");
  727. conn.release(name);
  728. F.addLogs(['TRANSACTION log commit', {tr_id:tr_id}]);
  729. //F.addLogs(['success end TRANSACTION commit']);
  730. } catch (e) {
  731. F.addErrLogs(["commit err:",e.stack]);
  732. F.addLogs(['TRANSACTION log commit_err', {tr_id:tr_id}]);
  733. conn.release(name);
  734. //F.addLogs(['success end TRANSACTION commit']);
  735. throw e;
  736. }
  737. };
  738. this.rollback = function* (conn) {
  739. if (F.isNull(conn)) return;
  740. if (conn.has_release == true) {
  741. F.addErrLogs(["conn rollback again:",conn.name]);
  742. return;
  743. }
  744. var name = conn.name;
  745. var tr_id = conn.tr_id;
  746. try {
  747. if (!F.isNull(conn.key)) {
  748. F.addLogs(["locks: rollback ","key:",conn.key]);
  749. delete conn.key;
  750. }
  751. yield conn.query("rollback");
  752. yield this.addSlowLog(conn,"rollback");
  753. conn.release(name);
  754. F.addLogs(['TRANSACTION log rollback', {tr_id:tr_id}]);
  755. //F.addLogs(['success end TRANSACTION rollback']);
  756. } catch (e) {
  757. F.addErrLogs(["rollback err:",e.stack]);
  758. F.addLogs(['TRANSACTION log rollback_err', {tr_id:tr_id}]);
  759. conn.release(name);
  760. //F.addLogs(['success end TRANSACTION rollback']);
  761. throw e;
  762. }
  763. };
  764. // key: 锁得唯一名 fn昰要加锁的部分 必须是异步函数
  765. this.lock = function* (key,fn) {
  766. let dbConnection = yield this.startTransaction(key); //开启事物处理
  767. try {
  768. yield fn();
  769. yield this.commit(dbConnection);
  770. } catch (e) {
  771. F.addErrLogs(["error: happen in lock:",e]);
  772. yield this.rollback(dbConnection);
  773. }
  774. }
  775. /* 格式化fields
  776. * fields string
  777. */
  778. this.formatFieldsToRedis = function (fields) {
  779. // 去掉多余的空格
  780. fields = fields.replace(new RegExp(' ','g'), '');
  781. var fields_array = fields.split(',');
  782. fields_array.sort();
  783. return fields_array.join(',');
  784. }
  785. /* 格式化where
  786. * where string
  787. * values array
  788. */
  789. this.formatWhereToRedis = function (where, values) {
  790. var where_array = this.getPregWhereArray(where, false);
  791. if(where_array.length != values.length) F.throwErr('sql error.where must use bind params.');
  792. for (var i = 0; i < where_array.length; i++) {
  793. where_array[i] += '=' + encodeURIComponent(values[i]);
  794. }
  795. where_array.sort();
  796. return where_array.join('&');
  797. }
  798. this.getPregWhereArray = function (where, sort = true) {
  799. // 根据' and '分解成数组
  800. var where_array = where.split(' and ');
  801. var fields_array = new Array();
  802. for (var i = 0; i < where_array.length; i++) {
  803. // 根据'='分解
  804. var field = where_array[i].substr(0, where_array[i].indexOf('='));
  805. field = _.str.trim(field, ' ');
  806. fields_array.push(field);
  807. }
  808. return sort == true ? fields_array.sort() : fields_array;
  809. }
  810. /* 获取mysql query redis key */
  811. this.getQueryRedisKey = function (fields, where, options) {
  812. var key = _.str.sprintf(C.redisPre.sql_redis_prefix, this.table);
  813. key += `#fields:${fields}#where:${where}#`;
  814. if(!F.isNull(options.limit)) key += `limit:${options.limit}#`;
  815. if(!F.isNull(options.order)) key += `limit:${options.order}#`;
  816. if(!F.isNull(options.group)) key += `limit:${options.group}#`;
  817. if(!F.isNull(options.having)) key += `limit:${options.having}#`;
  818. return key;
  819. }
  820. /* 设置where查询组合 */
  821. this.setWhereGroup = function* (where) {
  822. var where_array = this.getPregWhereArray(where);console.log(where_array);
  823. var where = where_array.join(',');
  824. var key = _.str.sprintf(C.sql_table_redis_prefix, this.table);
  825. yield redis.redisCo.sadd(key, where);
  826. }
  827. /* 获取需要更新的redis key */
  828. this.getDeleteRedisKey = function* (where, values) {
  829. var key = _.str.sprintf(C.sql_table_redis_prefix, this.table);
  830. var redis_key_group = yield redis.redisCo.smembers(key);
  831. if(F.isNull(redis_key_group)) return new Array();
  832. var sql = `select distinct %s from ${this.table} where ${where}`;
  833. var preg_where_array = new Array();
  834. for (var i = 0; i < redis_key_group.length; i++) {
  835. sql = _.str.sprintf(sql, redis_key_group[i]);
  836. var sql_res = yield that.mysql.query(sql, values);console.log(sql_res);
  837. if(!F.isNull(sql_res)) preg_where_array.push.apply(preg_where_array, sql_res);
  838. }
  839. console.log(preg_where_array);
  840. if(F.isNull(preg_where_array)) return new Array();
  841. var search_keys = new Array();
  842. for (var i = 0; i < preg_where_array.length; i++) {
  843. var search_key = '*#where:';
  844. for(var j in preg_where_array[i]) {
  845. search_key += j + '=' + encodeURIComponent(preg_where_array[i][j]) + '&';
  846. }console.log(search_key);
  847. var search_res = yield redis.redisCo.keys(search_key + '*');
  848. if(!F.isNull(search_res)) search_keys.push.apply(search_keys, search_res);
  849. console.log(search_key);
  850. console.log(search_res);
  851. }
  852. return search_keys;
  853. }
  854. /* 设置缓存检测 */
  855. this.checkWhereToRedis = function (options) {
  856. if(F.isNull(options)) F.throwErr('check set mysql redis error.options is null.');
  857. if(F.isNull(options.values)) F.throwErr('check set mysql redis error.options.values is null.');
  858. if(F.isNull(options.where)) F.throwErr('check set mysql redis error.options.where is null.');
  859. if(options.where.indexOf('(') >= 0) F.throwErr('check set mysql redis error.options.where can not has "()、<>、or"');
  860. if(options.where.indexOf('<') >= 0) F.throwErr('check set mysql redis error.options.where can not has "()、<>、or"');
  861. if(options.where.indexOf('>') >= 0) F.throwErr('check set mysql redis error.options.where can not has "()、<>、or"');
  862. if(options.where.indexOf(' or ') >= 0) F.throwErr('check set mysql redis error.options.where can not has "()、<>、or"');
  863. }
  864. }
  865. module.exports = model_base;