mux.js 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. 'use strict';
  2. var assert = require('assert');
  3. var Mux = require('../lib/mux').Mux;
  4. var PassThrough = require('stream').PassThrough;
  5. var latch = require('./util').latch;
  6. var schedule = require('./util').schedule;
  7. function stream() {
  8. return new PassThrough({objectMode: true});
  9. }
  10. function readAllObjects(s, cb) {
  11. var objs = [];
  12. function read() {
  13. var v = s.read();
  14. while (v !== null) {
  15. objs.push(v);
  16. v = s.read();
  17. }
  18. }
  19. s.on('end', function() { cb(objs); });
  20. s.on('readable', read);
  21. read();
  22. }
  23. test("single input", function(done) {
  24. var input = stream();
  25. var output = stream();
  26. input.on('end', function() { output.end() });
  27. var mux = new Mux(output);
  28. mux.pipeFrom(input);
  29. var data = [1,2,3,4,5,6,7,8,9];
  30. // not 0, it's treated specially by PassThrough for some reason. By
  31. // 'specially' I mean it breaks the stream. See e.g.,
  32. // https://github.com/isaacs/readable-stream/pull/55
  33. data.forEach(function (chunk) { input.write(chunk); });
  34. readAllObjects(output, function(vals) {
  35. assert.deepEqual(data, vals);
  36. done();
  37. });
  38. input.end();
  39. });
  40. test("single input, resuming stream", function(done) {
  41. var input = stream();
  42. var output = stream();
  43. input.on('end', function() { output.end() });
  44. var mux = new Mux(output);
  45. mux.pipeFrom(input);
  46. // Streams might be blocked and become readable again, simulate this
  47. // using a special read function and a marker
  48. var data = [1,2,3,4,'skip',6,7,8,9];
  49. var oldRead = input.read;
  50. input.read = function(size) {
  51. var val = oldRead.call(input, size)
  52. if (val === 'skip') {
  53. input.emit('readable');
  54. return null
  55. }
  56. return val;
  57. }
  58. data.forEach(function (chunk) { input.write(chunk); });
  59. readAllObjects(output, function(vals) {
  60. assert.deepEqual([1,2,3,4,6,7,8,9], vals);
  61. done();
  62. });
  63. input.end();
  64. });
  65. test("two sequential inputs", function(done) {
  66. var input1 = stream();
  67. var input2 = stream();
  68. var output = stream();
  69. var mux = new Mux(output);
  70. mux.pipeFrom(input1);
  71. mux.pipeFrom(input2);
  72. var data = [1,2,3,4,5,6,7,8,9];
  73. data.forEach(function(v) { input1.write(v); });
  74. input1.on('end', function() {
  75. data.forEach(function (v) { input2.write(v); });
  76. input2.end();
  77. });
  78. input2.on('end', function() { output.end(); });
  79. input1.end();
  80. readAllObjects(output, function(vs) {
  81. assert.equal(2 * data.length, vs.length);
  82. done();
  83. });
  84. });
  85. test("two interleaved inputs", function(done) {
  86. var input1 = stream();
  87. var input2 = stream();
  88. var output = stream();
  89. var mux = new Mux(output);
  90. mux.pipeFrom(input1);
  91. mux.pipeFrom(input2);
  92. var endLatch = latch(2, function() { output.end(); });
  93. input1.on('end', endLatch);
  94. input2.on('end', endLatch);
  95. var data = [1,2,3,4,5,6,7,8,9];
  96. data.forEach(function(v) { input1.write(v); });
  97. input1.end();
  98. data.forEach(function(v) { input2.write(v); });
  99. input2.end();
  100. readAllObjects(output, function(vs) {
  101. assert.equal(2 * data.length, vs.length);
  102. done();
  103. });
  104. });
  105. test("unpipe", function(done) {
  106. var input = stream();
  107. var output = stream();
  108. var mux = new Mux(output);
  109. var pipedData = [1,2,3,4,5];
  110. var unpipedData = [6,7,8,9];
  111. mux.pipeFrom(input);
  112. schedule(function() {
  113. pipedData.forEach(function (chunk) { input.write(chunk); });
  114. schedule(function() {
  115. mux.unpipeFrom(input);
  116. schedule(function() {
  117. unpipedData.forEach(function(chunk) { input.write(chunk); });
  118. input.end();
  119. schedule(function() {
  120. // exhaust so that 'end' fires
  121. var v; while (v = input.read());
  122. });
  123. });
  124. });
  125. });
  126. input.on('end', function() {
  127. output.end();
  128. });
  129. readAllObjects(output, function(vals) {
  130. try {
  131. assert.deepEqual(pipedData, vals);
  132. done();
  133. }
  134. catch (e) { done(e); }
  135. });
  136. });
  137. test("roundrobin", function(done) {
  138. var input1 = stream();
  139. var input2 = stream();
  140. var output = stream();
  141. var mux = new Mux(output);
  142. mux.pipeFrom(input1);
  143. mux.pipeFrom(input2);
  144. var endLatch = latch(2, function() { output.end(); });
  145. input1.on('end', endLatch);
  146. input2.on('end', endLatch);
  147. var ones = [1,1,1,1,1];
  148. ones.forEach(function(v) { input1.write(v); });
  149. input1.end();
  150. var twos = [2,2,2,2,2];
  151. twos.forEach(function(v) { input2.write(v); });
  152. input2.end();
  153. readAllObjects(output, function(vs) {
  154. assert.deepEqual([1,2,1,2,1,2,1,2,1,2], vs);
  155. done();
  156. });
  157. });