mux.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. //
  2. //
  3. //
  4. 'use strict';
  5. // A Mux is an object into which other readable streams may be piped;
  6. // it then writes 'packets' from the upstreams to the given
  7. // downstream.
  8. var assert = require('assert');
  9. var schedule = (typeof setImmediate === 'function') ?
  10. setImmediate : process.nextTick;
  11. class Mux {
  12. constructor (downstream) {
  13. this.newStreams = [];
  14. this.oldStreams = [];
  15. this.blocked = false;
  16. this.scheduledRead = false;
  17. this.out = downstream;
  18. var self = this;
  19. downstream.on('drain', function () {
  20. self.blocked = false;
  21. self._readIncoming();
  22. });
  23. }
  24. // There are 2 states we can be in:
  25. // - waiting for outbound capacity, which will be signalled by a
  26. // - 'drain' event on the downstream; or,
  27. // - no packets to send, waiting for an inbound buffer to have
  28. // packets, which will be signalled by a 'readable' event
  29. // If we write all packets available whenever there is outbound
  30. // capacity, we will either run out of outbound capacity (`#write`
  31. // returns false), or run out of packets (all calls to an
  32. // `inbound.read()` have returned null).
  33. _readIncoming () {
  34. // We may be sent here speculatively, if an incoming stream has
  35. // become readable
  36. if (this.blocked) return;
  37. var accepting = true;
  38. var out = this.out;
  39. // Try to read a chunk from each stream in turn, until all streams
  40. // are empty, or we exhaust our ability to accept chunks.
  41. function roundrobin (streams) {
  42. var s;
  43. while (accepting && (s = streams.shift())) {
  44. var chunk = s.read();
  45. if (chunk !== null) {
  46. accepting = out.write(chunk);
  47. streams.push(s);
  48. }
  49. }
  50. }
  51. roundrobin(this.newStreams);
  52. // Either we exhausted the new queues, or we ran out of capacity. If
  53. // we ran out of capacity, all the remaining new streams (i.e.,
  54. // those with packets left) become old streams. This effectively
  55. // prioritises streams that keep their buffers close to empty over
  56. // those that are constantly near full.
  57. if (accepting) { // all new queues are exhausted, write as many as
  58. // we can from the old streams
  59. assert.equal(0, this.newStreams.length);
  60. roundrobin(this.oldStreams);
  61. }
  62. else { // ran out of room
  63. assert(this.newStreams.length > 0, "Expect some new streams to remain");
  64. Array.prototype.push.apply(this.oldStreams, this.newStreams);
  65. this.newStreams = [];
  66. }
  67. // We may have exhausted all the old queues, or run out of room;
  68. // either way, all we need to do is record whether we have capacity
  69. // or not, so any speculative reads will know
  70. this.blocked = !accepting;
  71. }
  72. _scheduleRead () {
  73. var self = this;
  74. if (!self.scheduledRead) {
  75. schedule(function () {
  76. self.scheduledRead = false;
  77. self._readIncoming();
  78. });
  79. self.scheduledRead = true;
  80. }
  81. }
  82. pipeFrom (readable) {
  83. var self = this;
  84. function enqueue () {
  85. self.newStreams.push(readable);
  86. self._scheduleRead();
  87. }
  88. function cleanup () {
  89. readable.removeListener('readable', enqueue);
  90. readable.removeListener('error', cleanup);
  91. readable.removeListener('end', cleanup);
  92. readable.removeListener('unpipeFrom', cleanupIfMe);
  93. }
  94. function cleanupIfMe (dest) {
  95. if (dest === self) cleanup();
  96. }
  97. readable.on('unpipeFrom', cleanupIfMe);
  98. readable.on('end', cleanup);
  99. readable.on('error', cleanup);
  100. readable.on('readable', enqueue);
  101. }
  102. unpipeFrom (readable) {
  103. readable.emit('unpipeFrom', this);
  104. }
  105. }
  106. module.exports.Mux = Mux;