|
- /*!
- * Cluster - Master
- * Copyright(c) 2011 LearnBoost <dev@learnboost.com>
- * MIT Licensed
- */
- /**
- * Module dependencies.
- */
- var Worker = require('./worker')
- , EventEmitter = require('events').EventEmitter
- , dirname = require('path').dirname
- , spawn = require('child_process').spawn
- , utils = require('./utils')
- , fsBinding = process.binding('fs')
- , netBinding = process.binding('net')
- , bind = netBinding.bind
- , listen = netBinding.listen
- , socket = netBinding.socket
- , socketpair = netBinding.socketpair
- , close = netBinding.close
- , unlink = fsBinding.unlink
- , dgram = require('dgram')
- , tty = require('tty')
- , net = require('net')
- , fs = require('fs')
- , os = require('os');
- /**
- * Node binary.
- */
- var node = process.execPath;
- /**
- * Start a new `Master` with the given `server` or filename to
- * a node module exporting a server.
- *
- * Options:
- *
- * - `workers` Number of workers to spawn, defaults to the number of CPUs
- * - 'working directory` Working directory defaulting to the script's dir
- * - 'backlog` Connection backlog, defaulting to 128
- * - 'socket port` Master socket port defaulting to `8989`
- * - 'timeout` Worker shutdown timeout in milliseconds, defaulting to 60,000
- * - 'user` User id / name
- * - 'group` Group id / name
- * - `title` Master process title, defaults to "cluster master"
- * - `worker title` Worker process title, defaults to "cluster worker {n}"
- *
- * Events:
- *
- * - `start`. When the IPC server is prepped
- * - `worker`. When a worker is spawned, passing the `worker`
- * - `listening`. When the server is listening for connections
- * - `closing`. When master is shutting down
- * - `close`. When master has completed shutting down
- * - `worker killed`. When a worker has died
- * - `worker exception`. Worker uncaughtException. Receives the worker / exception
- * - `worker removed`. Worker removed via `spawn(-n)`
- * - `kill`. When a `signal` is being sent to all workers
- * - `restarting`. Restart requested by REPL or signal. Receives an object
- * which can be patched in order to preserve plugin state.
- * - `restart`. Restart complete, new master established, previous died.
- * Receives an object with state preserved by the `restarting` event.
- *
- * Signals:
- *
- * - `SIGINT` hard shutdown
- * - `SIGTERM` hard shutdown
- * - `SIGQUIT` graceful shutdown
- * - `SIGUSR2` graceful restart
- *
- * @param {net.Server|String} server
- * @return {Master}
- * @api public
- */
- var Master = module.exports = function Master(server) {
- var self = this;
- this.server = server;
- this.plugins = [];
- this.children = [];
- this.state = 'active';
- this.startup = new Date;
- this._killed = 0;
- // grab server root
- this.cmd = process.argv.slice(1);
- this.dir = dirname(this.cmd[0]);
- // environment
- this.env = process.env.NODE_ENV || 'development';
- // defaults
- this.options = {
- 'backlog': 128
- , 'working directory': this.dir
- , 'socket port': 8989
- , 'socket addr': '127.0.0.1'
- , 'timeout': 60000
- , 'restart threshold': 'development' == this.env ? 5000 : 60000
- , 'restart timeout': 'development' == this.env ? 5000 : 60000
- , 'title': 'cluster'
- , 'worker title': 'cluster worker'
- };
- // parent master pid
- this.ppid = process.env.CLUSTER_PARENT_PID
- ? parseInt(process.env.CLUSTER_PARENT_PID, 10)
- : null;
- // process is a worker
- this.isWorker = !! process.env.CLUSTER_MASTER_PID;
- // process is a child (worker or master replacement)
- this.isChild = this.isWorker || !! process.env.CLUSTER_REPLACEMENT_MASTER;
- // process is master
- this.isMaster = ! this.isWorker;
- // process id
- this.pid = process.pid;
- if (this.isMaster) process.env.CLUSTER_MASTER_PID = this.pid;
- // custom worker fds, defaults to std{out,err}
- this.customFds = [1, 2];
- // resolve server filename
- if (this.isWorker && 'string' == typeof this.server) {
- this.server = require(this.resolve(this.server));
- }
- // IPC is prepped
- this.on('start', function(){
- process.chdir(self.options['working directory']);
- });
- // spawn our workers
- this.on('listening', function(){
- self.spawn(self.options.workers);
- self.listening = true;
- });
- // kill children on master exception
- if (this.isMaster) {
- process.on('uncaughtException', function(err){
- self.kill('SIGKILL');
- console.error(err.stack || String(err));
- process.exit(1);
- });
- }
- };
- /**
- * Interit from `EventEmitter.prototype`.
- */
- Master.prototype.__proto__ = EventEmitter.prototype;
- /**
- * Worker is a receiver.
- */
- require('./mixins/receiver')(Master.prototype);
- /**
- * Resolve `path` relative to the server file being executed.
- *
- * @param {String} path
- * @return {String}
- * @api public
- */
- Master.prototype.resolve = function(path){
- return '/' == path[0]
- ? path
- : this.dir + '/' + path;
- };
- /**
- * Return `true` when the environment set by `Master#in()`
- * matches __NODE_ENV__.
- *
- * @return {Boolean}
- * @api private
- */
- Master.prototype.__defineGetter__('environmentMatches', function(){
- if (this._env)
- return this.env == this._env || 'all' == this._env;
- return true;
- });
- /**
- * Invoke masters's `method` with worker `id`. (called from Worker)
- *
- * @param {Number} id
- * @param {String} method
- * @param {...} args
- * @api private
- */
- Master.prototype.call = function(id, method){
- this.sock = this.sock || dgram.createSocket('udp4');
- var msg = new Buffer(utils.frame({
- args: utils.toArray(arguments, 2)
- , method: method
- , id: id
- }));
- this.sock.send(
- msg
- , 0
- , msg.length
- , this.options['socket port']
- , this.options['socket addr']);
- };
- /**
- * Perform setup tasks then invoke `fn()` when present.
- *
- * @param {Function} fn
- * @return {Master} for chaining
- * @api public
- */
- Master.prototype.start = function(fn){
- var self = this;
- // deferred title
- process.title = this.options.title;
- // prevent listen
- if (this.preventDefault) return this;
- // env match
- if (this.environmentMatches) {
- // worker process
- if (this.isWorker) {
- this.worker = new Worker(this);
- this.worker.start();
- // master process
- } else if (fn) {
- fn();
- // standalone
- } else {
- this.on('start', function(){ self.emit('listening'); });
- if (this.isChild) this.acceptFd();
- this.setupIPC();
- }
- }
- return this;
- };
- /**
- * Defer `http.Server#listen()` call.
- *
- * @param {Number|String} port or unix domain socket path
- * @param {String|Function} host or callback
- * @param {Function} callback
- * @return {Master} for chaining
- * @api public
- */
- Master.prototype.listen = function(port, host, callback){
- var self = this;
- if (!this.environmentMatches) return this;
- if ('function' == typeof host) callback = host, host = null;
- this.port = port;
- this.host = host;
- this.callback = callback;
- return this.start(function(){
- self.on('start', function(){
- self.startListening(!self.isChild);
- });
- if (self.isChild) {
- self.acceptFd();
- } else {
- self.createSocket(function(err, fd){
- if (err) throw err;
- self.fd = fd;
- self.setupIPC();
- });
- }
- });
- };
- /**
- * Create / return IPC socket.
- *
- * @api private
- */
- Master.prototype.IPCSocket = function(){
- var self = this;
- if (this._sock) return this._sock;
- this._sock = dgram.createSocket('udp4');
- this._sock.on('message', function(msg, info){
- try {
- msg = JSON.parse(msg.toString('ascii'));
- self.invoke(msg.method, msg.args, self.children[msg.id]);
- } catch (err) {
- console.error(err.stack || String(err));
- }
- });
- return this._sock;
- };
- /**
- * Setup IPC.
- *
- * @api private
- */
- Master.prototype.setupIPC = function(){
- var self = this;
- // signal handlers
- this.registerSignalHandlers();
- // Default worker to the # of cpus
- this.defaultWorkers();
- // udp server for IPC
- this.IPCSocket().on('listening', function(){
- process.nextTick(function(){
- self.emit('start');
- });
- });
- // bind
- this.IPCSocket().bind(
- this.options['socket port']
- , this.options['socket addr']);
- };
- /**
- * Conditionally perform the following action, if
- * __NODE_ENV__ matches `env`.
- *
- * Examples:
- *
- * cluster(server)
- * .in('development').use(cluster.debug())
- * .in('development').listen(3000)
- * .in('production').listen(80);
- *
- * @param {String} env
- * @return {Master} self or stubs
- * @api public
- */
- Master.prototype.in = function(env){
- this._env = env;
- return this;
- };
- /**
- * Set option `key` to `val`.
- *
- * @param {String} key
- * @param {Mixed} val
- * @return {Master} for chaining
- * @api public
- */
- Master.prototype.set = function(key, val){
- if (this.environmentMatches) this.options[key] = val;
- return this;
- };
- /**
- * Invoke `fn(master)`.
- *
- * @param {Function} fn
- * @api public
- */
- Master.prototype.do = function(fn){
- if (this.environmentMatches) fn.call(this, this);
- return this;
- };
- /**
- * Check if `option` has been set.
- *
- * @param {String} option
- * @return {Boolean}
- * @api public
- */
- Master.prototype.has = function(option){
- return !! this.options[option];
- };
- /**
- * Use the given `plugin`.
- *
- * @param {Function} plugin
- * @return {Master} for chaining
- * @api public
- */
- Master.prototype.use = function(plugin){
- if (this.environmentMatches) {
- this.plugins.push(plugin);
- if (this.isWorker) {
- plugin.enableInWorker && plugin(this);
- } else {
- plugin(this);
- }
- }
- return this;
- };
- /**
- * Create listening socket and callback `fn(err, fd)`.
- *
- * @return {Function} fn
- * @api private
- */
- Master.prototype.createSocket = function(fn){
- var self = this
- , ipv;
- // explicit host
- if (this.host) {
- // ip
- if (ipv = net.isIP(this.host)) {
- fn(null, socket('tcp' + ipv));
- // lookup
- } else {
- require('dns').lookup(this.host, function(err, ip, ipv){
- if (err) return fn(err);
- self.host = ip;
- fn(null, socket('tcp' + ipv));
- });
- }
- // local socket
- } else if ('string' == typeof this.port) {
- fn(null, socket('unix'));
- // only port
- } else if ('number' == typeof this.port) {
- fn(null, socket('tcp4'));
- }
- };
- /**
- * Register signal handlers.
- *
- * @api private
- */
- Master.prototype.registerSignalHandlers = function(){
- var self = this;
- process.on('SIGINT', this.destroy.bind(this));
- process.on('SIGTERM', this.destroy.bind(this));
- process.on('SIGQUIT', this.close.bind(this));
- process.on('SIGUSR2', this.attemptRestart.bind(this));
- process.on('SIGCHLD', this.maintainWorkerCount.bind(this));
- };
- /**
- * Default workers to the number of cpus available.
- *
- * @api private
- */
- Master.prototype.defaultWorkers = function(){
- if (!this.has('workers')) {
- this.set('workers', os
- ? os.cpus().length
- : 1);
- }
- };
- /**
- * Restart workers only, sending `signal` defaulting
- * to __SIGQUIT__.
- *
- * @param {Type} name
- * @return {Type}
- * @api public
- */
- Master.prototype.restartWorkers = function(signal){
- this.kill(signal || 'SIGQUIT');
- };
- /**
- * Maintain worker count, re-spawning if necessary.
- *
- * @api private
- */
- Master.prototype.maintainWorkerCount = function(){
- this.children.forEach(function(worker){
- var pid = worker.proc.pid;
- if (!pid) this.workerKilled(worker);
- }, this);
- };
- /**
- * Remove `n` workers with `signal`
- * defaulting to __SIGQUIT__.
- *
- * @param {Number} n
- * @param {String} signal
- * @api public
- */
- Master.prototype.remove = function(n, signal){
- if (!arguments.length) n = 1;
- var len = this.children.length
- , worker;
- // cap at worker len
- if (n > len) n = len;
- // remove the workers
- while (n--) {
- worker = this.children.pop();
- worker.proc.kill(signal || 'SIGQUIT');
- this.emit('worker removed', worker);
- this.removeWorker(worker.id);
- }
- };
- /**
- * Remove worker `id`.
- *
- * @param {Number} id
- * @api public
- */
- Master.prototype.removeWorker = function(id){
- var worker = this.children[id];
- if (!worker) return;
- if (worker.fds) {
- close(worker.fds[0]);
- close(worker.fds[1]);
- }
- delete this.children[id];
- };
- /**
- * Spawn `n` workers.
- *
- * @param {Number} n
- * @api public
- */
- Master.prototype.spawn = function(n){
- if (!arguments.length) n = 1;
- while (n--) this.spawnWorker();
- };
- /**
- * Spawn a worker with optional `id`.
- *
- * @param {Number} id
- * @return {Worker}
- * @api private
- */
- Master.prototype.spawnWorker = function(id){
- var worker;
- // id given
- if ('number' == typeof id) {
- worker = new Worker(this).spawn(id)
- this.children[id] = worker;
- worker.id = id;
- // generate an id
- } else {
- worker = new Worker(this).spawn(this.children.length);
- this.children.push(worker);
- }
- var obj = {
- method: 'connect'
- , args: [worker.id, this.options]
- };
- worker.sock.write(utils.frame(obj), 'ascii', this.fd);
- // emit
- this.emit('worker', worker);
- return worker;
- };
- /**
- * Graceful shutdown, wait for all workers
- * to reply before exiting.
- *
- * @api public
- */
- Master.prototype.close = function(){
- this.state = 'graceful shutdown';
- this.emit('closing');
- this.kill('SIGQUIT');
- this.pendingDeaths = this.children.length;
- };
- /**
- * Hard shutdwn, immediately kill all workers.
- *
- * @api public
- */
- Master.prototype.destroy = function(){
- this.state = 'hard shutdown';
- this.emit('closing');
- this.kill('SIGKILL');
- this._destroy();
- };
- /**
- * Attempt restart, while respecting the `restart threshold`
- * setting, to help prevent recursive restarts.
- *
- * @param {String} sig
- * @api private
- */
- Master.prototype.attemptRestart = function(sig){
- var uptime = new Date - this.startup
- , threshold = this.options['restart threshold']
- , timeout = this.options['restart timeout'];
- if (this.__restarting) return;
- if (uptime < threshold) {
- this.__restarting = true;
- this.emit('cyclic restart');
- setTimeout(function(self){
- self.restart(sig);
- }, timeout, this);
- } else {
- this.restart(sig);
- }
- };
- /**
- * Restart all workers, by sending __SIGQUIT__
- * or `sig` to them, enabling master to re-spawn.
- *
- * @param {String} sig
- * @return {ChildProcess} replacement master process
- * @api public
- */
- Master.prototype.restart = function(sig){
- var data = {}
- , proc = this.spawnMaster();
- // pass object to plugins, allowing them
- // to patch it, and utilize the data in
- // the new Master
- this.emit('restarting', data);
- proc.sock.write(utils.frame({
- method: 'connectMaster'
- , args: [sig || 'SIGQUIT']
- }), 'ascii', this.fd);
- this.on('close', function(){
- proc.sock.write(utils.frame({
- method: 'masterKilled'
- , args: [data]
- }), 'ascii');
- });
- return proc;
- };
- /**
- * Spawn a new master process.
- *
- * @return {ChildProcess}
- * @api private
- */
- Master.prototype.spawnMaster = function(){
- var fds = socketpair()
- , customFds = [fds[0], 1, 2]
- , env = {};
- // merge current env
- for (var key in process.env) {
- env[key] = process.env[key];
- }
- delete env.CLUSTER_MASTER_PID;
- env.CLUSTER_REPLACEMENT_MASTER = 1;
- env.CLUSTER_PARENT_PID = this.pid;
- // spawn new master process
- var proc = spawn(node, this.cmd, {
- customFds: customFds
- , env: env
- });
-
- // unix domain socket for ICP + fd passing
- proc.sock = new net.Socket(fds[1], 'unix');
- return proc;
- };
- /**
- * Master replacement connected.
- *
- * @param {String} sig
- * @api private
- */
- Master.prototype.connectMaster = function(sig){
- var self = this;
- function kill(){
- process.kill(self.ppid, sig);
- }
- if (this.listening) return kill();
- this.on('listening', kill);
- };
- /**
- * Original master has died aka 'retired',
- * we now fire the 'restart' event.
- *
- * @param {Object} data
- * @api private
- */
- Master.prototype.masterKilled = function(data){
- this.emit('restart', data);
- };
- /**
- * Accept fd from parent master, then `setupIPC()`.
- *
- * @api private
- */
- Master.prototype.acceptFd = function(){
- var self = this
- , stdin = new net.Socket(0, 'unix');
- // set fd and start master
- stdin.setEncoding('ascii');
- stdin.on('fd', function(fd){
- self.fd = fd;
- self.setupIPC();
- });
- // frame commands from the parent master
- stdin.on('data', this.frame.bind(this));
- stdin.resume();
- };
- /**
- * Close servers and emit 'close' before exiting.
- *
- * @api private
- */
- Master.prototype._destroy = function(){
- this.IPCSocket().close();
- if (this.fd) close(this.fd);
- this.emit('close');
- process.nextTick(process.exit.bind(process));
- };
- /**
- * Worker is connected.
- *
- * @param {Worker} worker
- * @api private
- */
- Master.prototype.connect = function(worker){
- this.emit('worker connected', worker);
- };
- /**
- * Start listening, when `shouldBind` is `true` the socket
- * will be bound, and will start listening for connections.
- *
- * @param {Boolean} shouldBind
- * @api private
- */
- Master.prototype.startListening = function(shouldBind){
- var self = this;
- // remove unix domain socket
- if ('string' == typeof this.port && shouldBind) {
- fs.unlink(this.port, function(err){
- if (err && 'ENOENT' != err.code) throw err;
- startListening();
- });
- } else {
- startListening();
- }
- // bind / listen
- function startListening() {
- if (shouldBind) {
- try {
- bind(self.fd, self.port, self.host);
- listen(self.fd, self.options.backlog);
- } catch(e) {
- self.kill('SIGKILL');
- throw e;
- }
- }
- self.callback && self.callback();
- self.emit('listening');
- }
- };
- /**
- * The given `worker` has been killed.
- * Emit the "worker killed" event, remove
- * the worker, and re-spawn depending on
- * the master state.
- *
- * @api private
- */
- Master.prototype.workerKilled = function(worker){
- // if we have many failing workers at boot
- // then we likely have a serious issue.
- if (new Date - this.startup < 20000) {
- if (++this._killed == 20) {
- console.error('');
- console.error('Cluster detected over 20 worker deaths in the first');
- console.error('20 seconds of life, there is most likely');
- console.error('a serious issue with your server.');
- console.error('');
- console.error('aborting.');
- console.error('');
- process.exit(1);
- }
- }
- // emit event
- this.emit('worker killed', worker);
- // always remove worker
- this.removeWorker(worker.id);
- // state specifics
- switch (this.state) {
- case 'hard shutdown':
- break;
- case 'graceful shutdown':
- --this.pendingDeaths || this._destroy();
- break;
- default:
- this.spawnWorker(worker.id);
- }
- };
- /**
- * `worker` received exception `err`.
- *
- * @api private
- */
- Master.prototype.workerException = function(worker, err){
- this.emit('worker exception', worker, err);
- };
- /**
- * Received worker timeout.
- *
- * @api private
- */
- Master.prototype.workerTimeout = function(worker, timeout){
- this.emit('worker timeout', worker, timeout);
- };
- /**
- * Worker waiting on `connections` to close.
- *
- * @api private
- */
- Master.prototype.workerWaiting = function(worker, connections){
- this.emit('worker waiting', worker, connections);
- };
- /**
- * Send `sig` to all worker processes, defaults to __SIGTERM__.
- *
- * @param {String} sig
- * @api public
- */
- Master.prototype.kill = function(sig){
- var self = this;
- this.emit('kill', sig);
- this.children.forEach(function(worker){
- worker.proc.kill(sig);
- });
- };
|