_stream_writable.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526
  1. // A bit simpler than readable streams.
  2. // Implement an async ._write(chunk, encoding, cb), and it'll handle all
  3. // the drain event emission and buffering.
  4. 'use strict';
  5. module.exports = Writable;
  6. /*<replacement>*/
  7. var processNextTick = require('process-nextick-args');
  8. /*</replacement>*/
  9. /*<replacement>*/
  10. var asyncWrite = !process.browser && ['v0.10', 'v0.9.'].indexOf(process.version.slice(0, 5)) > -1 ? setImmediate : processNextTick;
  11. /*</replacement>*/
  12. Writable.WritableState = WritableState;
  13. /*<replacement>*/
  14. var util = require('core-util-is');
  15. util.inherits = require('inherits');
  16. /*</replacement>*/
  17. /*<replacement>*/
  18. var internalUtil = {
  19. deprecate: require('util-deprecate')
  20. };
  21. /*</replacement>*/
  22. /*<replacement>*/
  23. var Stream;
  24. (function () {
  25. try {
  26. Stream = require('st' + 'ream');
  27. } catch (_) {} finally {
  28. if (!Stream) Stream = require('events').EventEmitter;
  29. }
  30. })();
  31. /*</replacement>*/
  32. var Buffer = require('buffer').Buffer;
  33. /*<replacement>*/
  34. var bufferShim = require('buffer-shims');
  35. /*</replacement>*/
  36. util.inherits(Writable, Stream);
  37. function nop() {}
  38. function WriteReq(chunk, encoding, cb) {
  39. this.chunk = chunk;
  40. this.encoding = encoding;
  41. this.callback = cb;
  42. this.next = null;
  43. }
  44. var Duplex;
  45. function WritableState(options, stream) {
  46. Duplex = Duplex || require('./_stream_duplex');
  47. options = options || {};
  48. // object stream flag to indicate whether or not this stream
  49. // contains buffers or objects.
  50. this.objectMode = !!options.objectMode;
  51. if (stream instanceof Duplex) this.objectMode = this.objectMode || !!options.writableObjectMode;
  52. // the point at which write() starts returning false
  53. // Note: 0 is a valid value, means that we always return false if
  54. // the entire buffer is not flushed immediately on write()
  55. var hwm = options.highWaterMark;
  56. var defaultHwm = this.objectMode ? 16 : 16 * 1024;
  57. this.highWaterMark = hwm || hwm === 0 ? hwm : defaultHwm;
  58. // cast to ints.
  59. this.highWaterMark = ~ ~this.highWaterMark;
  60. this.needDrain = false;
  61. // at the start of calling end()
  62. this.ending = false;
  63. // when end() has been called, and returned
  64. this.ended = false;
  65. // when 'finish' is emitted
  66. this.finished = false;
  67. // should we decode strings into buffers before passing to _write?
  68. // this is here so that some node-core streams can optimize string
  69. // handling at a lower level.
  70. var noDecode = options.decodeStrings === false;
  71. this.decodeStrings = !noDecode;
  72. // Crypto is kind of old and crusty. Historically, its default string
  73. // encoding is 'binary' so we have to make this configurable.
  74. // Everything else in the universe uses 'utf8', though.
  75. this.defaultEncoding = options.defaultEncoding || 'utf8';
  76. // not an actual buffer we keep track of, but a measurement
  77. // of how much we're waiting to get pushed to some underlying
  78. // socket or file.
  79. this.length = 0;
  80. // a flag to see when we're in the middle of a write.
  81. this.writing = false;
  82. // when true all writes will be buffered until .uncork() call
  83. this.corked = 0;
  84. // a flag to be able to tell if the onwrite cb is called immediately,
  85. // or on a later tick. We set this to true at first, because any
  86. // actions that shouldn't happen until "later" should generally also
  87. // not happen before the first write call.
  88. this.sync = true;
  89. // a flag to know if we're processing previously buffered items, which
  90. // may call the _write() callback in the same tick, so that we don't
  91. // end up in an overlapped onwrite situation.
  92. this.bufferProcessing = false;
  93. // the callback that's passed to _write(chunk,cb)
  94. this.onwrite = function (er) {
  95. onwrite(stream, er);
  96. };
  97. // the callback that the user supplies to write(chunk,encoding,cb)
  98. this.writecb = null;
  99. // the amount that is being written when _write is called.
  100. this.writelen = 0;
  101. this.bufferedRequest = null;
  102. this.lastBufferedRequest = null;
  103. // number of pending user-supplied write callbacks
  104. // this must be 0 before 'finish' can be emitted
  105. this.pendingcb = 0;
  106. // emit prefinish if the only thing we're waiting for is _write cbs
  107. // This is relevant for synchronous Transform streams
  108. this.prefinished = false;
  109. // True if the error was already emitted and should not be thrown again
  110. this.errorEmitted = false;
  111. // count buffered requests
  112. this.bufferedRequestCount = 0;
  113. // allocate the first CorkedRequest, there is always
  114. // one allocated and free to use, and we maintain at most two
  115. this.corkedRequestsFree = new CorkedRequest(this);
  116. }
  117. WritableState.prototype.getBuffer = function writableStateGetBuffer() {
  118. var current = this.bufferedRequest;
  119. var out = [];
  120. while (current) {
  121. out.push(current);
  122. current = current.next;
  123. }
  124. return out;
  125. };
  126. (function () {
  127. try {
  128. Object.defineProperty(WritableState.prototype, 'buffer', {
  129. get: internalUtil.deprecate(function () {
  130. return this.getBuffer();
  131. }, '_writableState.buffer is deprecated. Use _writableState.getBuffer ' + 'instead.')
  132. });
  133. } catch (_) {}
  134. })();
  135. var Duplex;
  136. function Writable(options) {
  137. Duplex = Duplex || require('./_stream_duplex');
  138. // Writable ctor is applied to Duplexes, though they're not
  139. // instanceof Writable, they're instanceof Readable.
  140. if (!(this instanceof Writable) && !(this instanceof Duplex)) return new Writable(options);
  141. this._writableState = new WritableState(options, this);
  142. // legacy.
  143. this.writable = true;
  144. if (options) {
  145. if (typeof options.write === 'function') this._write = options.write;
  146. if (typeof options.writev === 'function') this._writev = options.writev;
  147. }
  148. Stream.call(this);
  149. }
  150. // Otherwise people can pipe Writable streams, which is just wrong.
  151. Writable.prototype.pipe = function () {
  152. this.emit('error', new Error('Cannot pipe, not readable'));
  153. };
  154. function writeAfterEnd(stream, cb) {
  155. var er = new Error('write after end');
  156. // TODO: defer error events consistently everywhere, not just the cb
  157. stream.emit('error', er);
  158. processNextTick(cb, er);
  159. }
  160. // If we get something that is not a buffer, string, null, or undefined,
  161. // and we're not in objectMode, then that's an error.
  162. // Otherwise stream chunks are all considered to be of length=1, and the
  163. // watermarks determine how many objects to keep in the buffer, rather than
  164. // how many bytes or characters.
  165. function validChunk(stream, state, chunk, cb) {
  166. var valid = true;
  167. var er = false;
  168. // Always throw error if a null is written
  169. // if we are not in object mode then throw
  170. // if it is not a buffer, string, or undefined.
  171. if (chunk === null) {
  172. er = new TypeError('May not write null values to stream');
  173. } else if (!Buffer.isBuffer(chunk) && typeof chunk !== 'string' && chunk !== undefined && !state.objectMode) {
  174. er = new TypeError('Invalid non-string/buffer chunk');
  175. }
  176. if (er) {
  177. stream.emit('error', er);
  178. processNextTick(cb, er);
  179. valid = false;
  180. }
  181. return valid;
  182. }
  183. Writable.prototype.write = function (chunk, encoding, cb) {
  184. var state = this._writableState;
  185. var ret = false;
  186. if (typeof encoding === 'function') {
  187. cb = encoding;
  188. encoding = null;
  189. }
  190. if (Buffer.isBuffer(chunk)) encoding = 'buffer';else if (!encoding) encoding = state.defaultEncoding;
  191. if (typeof cb !== 'function') cb = nop;
  192. if (state.ended) writeAfterEnd(this, cb);else if (validChunk(this, state, chunk, cb)) {
  193. state.pendingcb++;
  194. ret = writeOrBuffer(this, state, chunk, encoding, cb);
  195. }
  196. return ret;
  197. };
  198. Writable.prototype.cork = function () {
  199. var state = this._writableState;
  200. state.corked++;
  201. };
  202. Writable.prototype.uncork = function () {
  203. var state = this._writableState;
  204. if (state.corked) {
  205. state.corked--;
  206. if (!state.writing && !state.corked && !state.finished && !state.bufferProcessing && state.bufferedRequest) clearBuffer(this, state);
  207. }
  208. };
  209. Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
  210. // node::ParseEncoding() requires lower case.
  211. if (typeof encoding === 'string') encoding = encoding.toLowerCase();
  212. if (!(['hex', 'utf8', 'utf-8', 'ascii', 'binary', 'base64', 'ucs2', 'ucs-2', 'utf16le', 'utf-16le', 'raw'].indexOf((encoding + '').toLowerCase()) > -1)) throw new TypeError('Unknown encoding: ' + encoding);
  213. this._writableState.defaultEncoding = encoding;
  214. return this;
  215. };
  216. function decodeChunk(state, chunk, encoding) {
  217. if (!state.objectMode && state.decodeStrings !== false && typeof chunk === 'string') {
  218. chunk = bufferShim.from(chunk, encoding);
  219. }
  220. return chunk;
  221. }
  222. // if we're already writing something, then just put this
  223. // in the queue, and wait our turn. Otherwise, call _write
  224. // If we return false, then we need a drain event, so set that flag.
  225. function writeOrBuffer(stream, state, chunk, encoding, cb) {
  226. chunk = decodeChunk(state, chunk, encoding);
  227. if (Buffer.isBuffer(chunk)) encoding = 'buffer';
  228. var len = state.objectMode ? 1 : chunk.length;
  229. state.length += len;
  230. var ret = state.length < state.highWaterMark;
  231. // we must ensure that previous needDrain will not be reset to false.
  232. if (!ret) state.needDrain = true;
  233. if (state.writing || state.corked) {
  234. var last = state.lastBufferedRequest;
  235. state.lastBufferedRequest = new WriteReq(chunk, encoding, cb);
  236. if (last) {
  237. last.next = state.lastBufferedRequest;
  238. } else {
  239. state.bufferedRequest = state.lastBufferedRequest;
  240. }
  241. state.bufferedRequestCount += 1;
  242. } else {
  243. doWrite(stream, state, false, len, chunk, encoding, cb);
  244. }
  245. return ret;
  246. }
  247. function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  248. state.writelen = len;
  249. state.writecb = cb;
  250. state.writing = true;
  251. state.sync = true;
  252. if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  253. state.sync = false;
  254. }
  255. function onwriteError(stream, state, sync, er, cb) {
  256. --state.pendingcb;
  257. if (sync) processNextTick(cb, er);else cb(er);
  258. stream._writableState.errorEmitted = true;
  259. stream.emit('error', er);
  260. }
  261. function onwriteStateUpdate(state) {
  262. state.writing = false;
  263. state.writecb = null;
  264. state.length -= state.writelen;
  265. state.writelen = 0;
  266. }
  267. function onwrite(stream, er) {
  268. var state = stream._writableState;
  269. var sync = state.sync;
  270. var cb = state.writecb;
  271. onwriteStateUpdate(state);
  272. if (er) onwriteError(stream, state, sync, er, cb);else {
  273. // Check if we're actually ready to finish, but don't emit yet
  274. var finished = needFinish(state);
  275. if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
  276. clearBuffer(stream, state);
  277. }
  278. if (sync) {
  279. /*<replacement>*/
  280. asyncWrite(afterWrite, stream, state, finished, cb);
  281. /*</replacement>*/
  282. } else {
  283. afterWrite(stream, state, finished, cb);
  284. }
  285. }
  286. }
  287. function afterWrite(stream, state, finished, cb) {
  288. if (!finished) onwriteDrain(stream, state);
  289. state.pendingcb--;
  290. cb();
  291. finishMaybe(stream, state);
  292. }
  293. // Must force callback to be called on nextTick, so that we don't
  294. // emit 'drain' before the write() consumer gets the 'false' return
  295. // value, and has a chance to attach a 'drain' listener.
  296. function onwriteDrain(stream, state) {
  297. if (state.length === 0 && state.needDrain) {
  298. state.needDrain = false;
  299. stream.emit('drain');
  300. }
  301. }
  302. // if there's something in the buffer waiting, then process it
  303. function clearBuffer(stream, state) {
  304. state.bufferProcessing = true;
  305. var entry = state.bufferedRequest;
  306. if (stream._writev && entry && entry.next) {
  307. // Fast case, write everything using _writev()
  308. var l = state.bufferedRequestCount;
  309. var buffer = new Array(l);
  310. var holder = state.corkedRequestsFree;
  311. holder.entry = entry;
  312. var count = 0;
  313. while (entry) {
  314. buffer[count] = entry;
  315. entry = entry.next;
  316. count += 1;
  317. }
  318. doWrite(stream, state, true, state.length, buffer, '', holder.finish);
  319. // doWrite is almost always async, defer these to save a bit of time
  320. // as the hot path ends with doWrite
  321. state.pendingcb++;
  322. state.lastBufferedRequest = null;
  323. if (holder.next) {
  324. state.corkedRequestsFree = holder.next;
  325. holder.next = null;
  326. } else {
  327. state.corkedRequestsFree = new CorkedRequest(state);
  328. }
  329. } else {
  330. // Slow case, write chunks one-by-one
  331. while (entry) {
  332. var chunk = entry.chunk;
  333. var encoding = entry.encoding;
  334. var cb = entry.callback;
  335. var len = state.objectMode ? 1 : chunk.length;
  336. doWrite(stream, state, false, len, chunk, encoding, cb);
  337. entry = entry.next;
  338. // if we didn't call the onwrite immediately, then
  339. // it means that we need to wait until it does.
  340. // also, that means that the chunk and cb are currently
  341. // being processed, so move the buffer counter past them.
  342. if (state.writing) {
  343. break;
  344. }
  345. }
  346. if (entry === null) state.lastBufferedRequest = null;
  347. }
  348. state.bufferedRequestCount = 0;
  349. state.bufferedRequest = entry;
  350. state.bufferProcessing = false;
  351. }
  352. Writable.prototype._write = function (chunk, encoding, cb) {
  353. cb(new Error('not implemented'));
  354. };
  355. Writable.prototype._writev = null;
  356. Writable.prototype.end = function (chunk, encoding, cb) {
  357. var state = this._writableState;
  358. if (typeof chunk === 'function') {
  359. cb = chunk;
  360. chunk = null;
  361. encoding = null;
  362. } else if (typeof encoding === 'function') {
  363. cb = encoding;
  364. encoding = null;
  365. }
  366. if (chunk !== null && chunk !== undefined) this.write(chunk, encoding);
  367. // .end() fully uncorks
  368. if (state.corked) {
  369. state.corked = 1;
  370. this.uncork();
  371. }
  372. // ignore unnecessary end() calls.
  373. if (!state.ending && !state.finished) endWritable(this, state, cb);
  374. };
  375. function needFinish(state) {
  376. return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
  377. }
  378. function prefinish(stream, state) {
  379. if (!state.prefinished) {
  380. state.prefinished = true;
  381. stream.emit('prefinish');
  382. }
  383. }
  384. function finishMaybe(stream, state) {
  385. var need = needFinish(state);
  386. if (need) {
  387. if (state.pendingcb === 0) {
  388. prefinish(stream, state);
  389. state.finished = true;
  390. stream.emit('finish');
  391. } else {
  392. prefinish(stream, state);
  393. }
  394. }
  395. return need;
  396. }
  397. function endWritable(stream, state, cb) {
  398. state.ending = true;
  399. finishMaybe(stream, state);
  400. if (cb) {
  401. if (state.finished) processNextTick(cb);else stream.once('finish', cb);
  402. }
  403. state.ended = true;
  404. stream.writable = false;
  405. }
  406. // It seems a linked list but it is not
  407. // there will be only 2 of these for each stream
  408. function CorkedRequest(state) {
  409. var _this = this;
  410. this.next = null;
  411. this.entry = null;
  412. this.finish = function (err) {
  413. var entry = _this.entry;
  414. _this.entry = null;
  415. while (entry) {
  416. var cb = entry.callback;
  417. state.pendingcb--;
  418. cb(err);
  419. entry = entry.next;
  420. }
  421. if (state.corkedRequestsFree) {
  422. state.corkedRequestsFree.next = _this;
  423. } else {
  424. state.corkedRequestsFree = _this;
  425. }
  426. };
  427. }