mysql.js 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. 'use strict';
  2. /**
  3. * Module dependencies.
  4. */
  5. var thunkify = require('thunkify-wrap');
  6. var ready = require('ready');
  7. var mysql = require('mysql');
  8. var config = require('../config');
  9. var logs_obj = require('../libs/logs.js');
  10. var C = require('../config/index');
  11. var logs = new logs_obj();
  12. var server = config.mysqlServers[0];
  13. var innerCon = function(connection) {
  14. var con = connection;
  15. this.has_release = false;
  16. var innerquery = function (sql, values, cb) {
  17. if (typeof values === 'function') {
  18. cb = values;
  19. values = null;
  20. }
  21. if (typeof(values) == 'object') {
  22. if (logs.isNull(C.STOP_DEBUG)) logs.addLogs("debug/debug",[sql,values]);
  23. } else {
  24. if (logs.isNull(C.STOP_DEBUG)) logs.addLogs("debug/debug",[sql]);
  25. }
  26. con.query(sql,values, function(err,rows){
  27. // if (sql.toLocaleLowerCase().indexOf("duplicate") >= 0 && !logs.isNull(rows)) {
  28. // logs.addLogs("debug/debug",["db mod duplicate:",rows]);
  29. // if (rows.insertId == 0 && rows.affectedRows == 1) rows.affectedRows = 0;
  30. // }
  31. // if (sql.toLocaleLowerCase().indexOf("update") == 0 && !logs.isNull(rows)) {
  32. // logs.addLogs("debug/debug",["db update:",rows]);
  33. // if (rows.message.indexOf("Changed: 0") >= 0) rows.affectedRows = 0;
  34. // }
  35. cb(err,[rows,null]);
  36. });
  37. };
  38. this.query = thunkify(innerquery);
  39. this.end = function () {
  40. if (true == this.has_release) return;
  41. con.end();
  42. this.has_release = true;
  43. }
  44. this.release = function (name) {
  45. if (true == this.has_release) return;
  46. con.release();
  47. this.has_release = true;
  48. }
  49. }
  50. // TODO: query timeout
  51. module.exports = function (ext_config=null){
  52. var poolconfig = {
  53. host: server.host,
  54. port: server.port,
  55. user: server.user,
  56. password: server.password,
  57. database: config.mysqlDatabase,
  58. connectionLimit: config.mysqlMaxConnections,
  59. charset: 'utf8mb4_unicode_ci',
  60. // charset: 'utf8_unicode_ci',
  61. multipleStatements: true,
  62. waitForConnections: true,
  63. acquireTimeout: config.mysqlQueryTimeout,
  64. connectTimeout: config.mysqlQueryTimeout
  65. };
  66. if (ext_config != null) poolconfig = ext_config;
  67. this.conf = poolconfig;
  68. var pool = mysql.createPool(poolconfig);
  69. var tranpoolconfig = poolconfig;
  70. var tranpool = mysql.createPool(tranpoolconfig);
  71. this.pool = pool;
  72. this.tranpool = tranpool;
  73. var that = this;
  74. this.addDebugLogs = function (data) {
  75. //logs.addLogs("debug/debug",data);
  76. };
  77. this.addLogs = function (data) {
  78. logs.addLogs("debug/debug",data);
  79. logs.addLogs("sys/sys",data);
  80. };
  81. this.addErrLogs = function (data) {
  82. logs.addLogs("debug/debug",data);
  83. logs.addLogs("sys/sys",data);
  84. logs.addLogs("err/err",data);
  85. };
  86. this.query = thunkify(function (sql, values, cb) {
  87. if (typeof(values) == 'object') {
  88. if (logs.isNull(C.STOP_DEBUG)) logs.addLogs("debug/debug",[sql,values]);
  89. } else {
  90. if (logs.isNull(C.STOP_DEBUG)) logs.addLogs("debug/debug",[sql]);
  91. }
  92. let start_time = new Date().getTime();
  93. if (typeof values === 'function') {
  94. cb = values;
  95. values = null;
  96. }
  97. pool.query(sql, values, function (err, rows) {
  98. let end_time = new Date().getTime();
  99. if (end_time >= start_time+C.slow_log_delta) {
  100. if (typeof(values) == 'object') {
  101. if (end_time - start_time > C.slow_log_delta) logs.addLogs("slow/slow",[`usetime:${end_time-start_time}`,sql,values]);
  102. } else {
  103. if (end_time - start_time > C.slow_log_delta) logs.addLogs("slow/slow",[`usetime:${end_time-start_time}`,sql]);
  104. }
  105. }
  106. if (!logs.isNull(err)) {
  107. that.addErrLogs([`usetime:${end_time-start_time}`,sql,err]);
  108. }
  109. // if (sql.toLocaleLowerCase().indexOf("duplicate") >= 0 && !logs.isNull(rows)) {
  110. // logs.addLogs("debug/debug",["mod duplicate:",rows]);
  111. // if (rows.insertId == 0 && rows.affectedRows == 1) rows.affectedRows = 0;
  112. // }
  113. // if (sql.toLocaleLowerCase().indexOf("update") == 0 && !logs.isNull(rows)) {
  114. // logs.addLogs("debug/debug",["update:",rows]);
  115. // if (rows.message.indexOf("Changed: 0") >= 0) rows.affectedRows = 0;
  116. // }
  117. cb(err, rows);
  118. });
  119. });
  120. this.getConnection = thunkify(function (name,cb) {
  121. tranpool.getConnection(function(err, connection) {
  122. var new_con = new innerCon(connection);
  123. cb(err,new_con);
  124. });
  125. });
  126. this.queryOne = thunkify(function (sql, values, cb) {
  127. if (typeof values === 'function') {
  128. cb = values;
  129. values = null;
  130. }
  131. that.query(sql, values, function (err, rows) {
  132. if (rows) {
  133. rows = rows[0];
  134. }
  135. cb(err, rows);
  136. });
  137. });
  138. this.escape = function (val) {
  139. return pool.escape(val);
  140. };
  141. ready(that);
  142. function init() {
  143. that.query('show tables', function (err, rows) {
  144. if (err) {
  145. console.error('[%s] [worker:%s] mysql init error: %s', Date(), process.pid, err);
  146. setTimeout(init, 1000);
  147. return;
  148. }
  149. console.log('[%s] [worker:%s] mysql ready, got %d tables', Date(), process.pid, rows.length);
  150. that.ready(true);
  151. });
  152. }
  153. init();
  154. };