channel.js 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. // Test the channel machinery
  2. 'use strict';
  3. var assert = require('assert');
  4. var promisify = require('util').promisify;
  5. var Channel = require('../lib/channel').Channel;
  6. var Connection = require('../lib/connection').Connection;
  7. var util = require('./util');
  8. var succeed = util.succeed, fail = util.fail, latch = util.latch;
  9. var completes = util.completes;
  10. var defs = require('../lib/defs');
  11. var conn_handshake = require('./connection').connection_handshake;
  12. var OPEN_OPTS = require('./connection').OPEN_OPTS;
  13. var LOG_ERRORS = process.env.LOG_ERRORS;
  14. function baseChannelTest(client, server) {
  15. return function(done) {
  16. var bothDone = latch(2, done);
  17. var pair = util.socketPair();
  18. var c = new Connection(pair.client);
  19. if (LOG_ERRORS) c.on('error', console.warn);
  20. c.open(OPEN_OPTS, function(err, ok) {
  21. if (err === null) client(c, bothDone);
  22. else fail(bothDone);
  23. });
  24. pair.server.read(8); // discard the protocol header
  25. var s = util.runServer(pair.server, function(send, wait) {
  26. conn_handshake(send, wait)
  27. .then(function() {
  28. server(send, wait, bothDone);
  29. }, fail(bothDone));
  30. });
  31. };
  32. }
  33. function channelTest(client, server) {
  34. return baseChannelTest(
  35. function(conn, done) {
  36. var ch = new Channel(conn);
  37. if (LOG_ERRORS) ch.on('error', console.warn);
  38. client(ch, done, conn);
  39. },
  40. function(send, wait, done) {
  41. channel_handshake(send, wait)
  42. .then(function(ch) {
  43. return server(send, wait, done, ch);
  44. }).then(null, fail(done)); // so you can return a promise to let
  45. // errors bubble out
  46. }
  47. );
  48. };
  49. function channel_handshake(send, wait) {
  50. return wait(defs.ChannelOpen)()
  51. .then(function(open) {
  52. assert.notEqual(0, open.channel);
  53. send(defs.ChannelOpenOk, {channelId: Buffer.from('')}, open.channel);
  54. return open.channel;
  55. });
  56. }
  57. // fields for deliver and publish and get-ok
  58. var DELIVER_FIELDS = {
  59. consumerTag: 'fake',
  60. deliveryTag: 1,
  61. redelivered: false,
  62. exchange: 'foo',
  63. routingKey: 'bar',
  64. replyCode: defs.constants.NO_ROUTE,
  65. replyText: 'derp',
  66. };
  67. function open(ch) {
  68. ch.allocate();
  69. return promisify(function(cb) {
  70. ch._rpc(defs.ChannelOpen, {outOfBand: ''}, defs.ChannelOpenOk, cb);
  71. })();
  72. }
  73. suite("channel open and close", function() {
  74. test("open", channelTest(
  75. function(ch, done) {
  76. open(ch).then(succeed(done), fail(done));
  77. },
  78. function(send, wait, done) {
  79. done();
  80. }));
  81. test("bad server", baseChannelTest(
  82. function(c, done) {
  83. var ch = new Channel(c);
  84. open(ch).then(fail(done), succeed(done));
  85. },
  86. function(send, wait, done) {
  87. return wait(defs.ChannelOpen)()
  88. .then(function(open) {
  89. send(defs.ChannelCloseOk, {}, open.channel);
  90. }).then(succeed(done), fail(done));
  91. }));
  92. test("open, close", channelTest(
  93. function(ch, done) {
  94. open(ch)
  95. .then(function() {
  96. return new Promise(function(resolve) {
  97. ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS, resolve);
  98. });
  99. })
  100. .then(succeed(done), fail(done));
  101. },
  102. function(send, wait, done, ch) {
  103. return wait(defs.ChannelClose)()
  104. .then(function(close) {
  105. send(defs.ChannelCloseOk, {}, ch);
  106. }).then(succeed(done), fail(done));
  107. }));
  108. test("server close", channelTest(
  109. function(ch, done) {
  110. ch.on('error', function(error) {
  111. assert.strictEqual(504, error.code);
  112. assert.strictEqual(0, error.classId);
  113. assert.strictEqual(0, error.methodId);
  114. succeed(done)();
  115. });
  116. open(ch);
  117. },
  118. function(send, wait, done, ch) {
  119. send(defs.ChannelClose, {
  120. replyText: 'Forced close',
  121. replyCode: defs.constants.CHANNEL_ERROR,
  122. classId: 0, methodId: 0
  123. }, ch);
  124. wait(defs.ChannelCloseOk)()
  125. .then(succeed(done), fail(done));
  126. }));
  127. test("overlapping channel/server close", channelTest(
  128. function(ch, done, conn) {
  129. var both = latch(2, done);
  130. conn.on('error', succeed(both));
  131. ch.on('close', succeed(both));
  132. open(ch).then(function() {
  133. ch.closeBecause("Bye", defs.constants.REPLY_SUCCESS);
  134. }, fail(both));
  135. },
  136. function(send, wait, done, ch) {
  137. wait(defs.ChannelClose)()
  138. .then(function() {
  139. send(defs.ConnectionClose, {
  140. replyText: 'Got there first',
  141. replyCode: defs.constants.INTERNAL_ERROR,
  142. classId: 0, methodId: 0
  143. }, 0);
  144. })
  145. .then(wait(defs.ConnectionCloseOk))
  146. .then(succeed(done), fail(done));
  147. }));
  148. test("double close", channelTest(
  149. function(ch, done) {
  150. open(ch).then(function() {
  151. ch.closeBecause("First close", defs.constants.REPLY_SUCCESS);
  152. // NB no synchronisation, we do this straight away
  153. assert.throws(function() {
  154. ch.closeBecause("Second close", defs.constants.REPLY_SUCCESS);
  155. });
  156. }).then(succeed(done), fail(done));
  157. },
  158. function(send, wait, done, ch) {
  159. wait(defs.ChannelClose)()
  160. .then(function() {
  161. send(defs.ChannelCloseOk, {
  162. }, ch);
  163. })
  164. .then(succeed(done), fail(done));
  165. }));
  166. }); //suite
  167. suite("channel machinery", function() {
  168. test("RPC", channelTest(
  169. function(ch, done) {
  170. var rpcLatch = latch(3, done);
  171. open(ch).then(function() {
  172. function wheeboom(err, f) {
  173. if (err !== null) rpcLatch(err);
  174. else rpcLatch();
  175. }
  176. var fields = {
  177. prefetchCount: 10,
  178. prefetchSize: 0,
  179. global: false
  180. };
  181. ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
  182. ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
  183. ch._rpc(defs.BasicQos, fields, defs.BasicQosOk, wheeboom);
  184. }).then(null, fail(rpcLatch));
  185. },
  186. function(send, wait, done, ch) {
  187. function sendOk(f) {
  188. send(defs.BasicQosOk, {}, ch);
  189. }
  190. return wait(defs.BasicQos)()
  191. .then(sendOk)
  192. .then(wait(defs.BasicQos))
  193. .then(sendOk)
  194. .then(wait(defs.BasicQos))
  195. .then(sendOk)
  196. .then(succeed(done), fail(done));
  197. }));
  198. test("Bad RPC", channelTest(
  199. function(ch, done) {
  200. // We want to see the RPC rejected and the channel closed (with an
  201. // error)
  202. var errLatch = latch(2, done);
  203. ch.on('error', function(error) {
  204. assert.strictEqual(505, error.code);
  205. assert.strictEqual(60, error.classId);
  206. assert.strictEqual(72, error.methodId);
  207. succeed(errLatch)();
  208. });
  209. open(ch)
  210. .then(function() {
  211. ch._rpc(defs.BasicRecover, {requeue: true}, defs.BasicRecoverOk,
  212. function(err) {
  213. if (err !== null) errLatch();
  214. else errLatch(new Error('Expected RPC failure'));
  215. });
  216. }, fail(errLatch));
  217. },
  218. function(send, wait, done, ch) {
  219. return wait()()
  220. .then(function() {
  221. send(defs.BasicGetEmpty, {clusterId: ''}, ch);
  222. }) // oh wait! that was wrong! expect a channel close
  223. .then(wait(defs.ChannelClose))
  224. .then(function() {
  225. send(defs.ChannelCloseOk, {}, ch);
  226. }).then(succeed(done), fail(done));
  227. }));
  228. test("RPC on closed channel", channelTest(
  229. function(ch, done) {
  230. open(ch);
  231. var close = new Promise(function(resolve) {
  232. ch.on('error', function(error) {
  233. assert.strictEqual(504, error.code);
  234. assert.strictEqual(0, error.classId);
  235. assert.strictEqual(0, error.methodId);
  236. resolve();
  237. });
  238. });
  239. function failureCb(resolve, reject) {
  240. return function(err) {
  241. if (err !== null) resolve();
  242. else reject();
  243. }
  244. }
  245. var fail1 = new Promise(function(resolve, reject) {
  246. return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
  247. failureCb(resolve, reject));
  248. });
  249. var fail2 = new Promise(function(resolve, reject) {
  250. return ch._rpc(defs.BasicRecover, {requeue:true}, defs.BasicRecoverOk,
  251. failureCb(resolve, reject));
  252. });
  253. Promise.all([close, fail1, fail2])
  254. .then(succeed(done))
  255. .catch(fail(done));
  256. },
  257. function(send, wait, done, ch) {
  258. wait(defs.BasicRecover)()
  259. .then(function() {
  260. send(defs.ChannelClose, {
  261. replyText: 'Nuh-uh!',
  262. replyCode: defs.constants.CHANNEL_ERROR,
  263. methodId: 0, classId: 0
  264. }, ch);
  265. return wait(defs.ChannelCloseOk);
  266. })
  267. .then(succeed(done))
  268. .catch(fail(done));
  269. }));
  270. test("publish all < single chunk threshold", channelTest(
  271. function(ch, done) {
  272. open(ch)
  273. .then(function() {
  274. ch.sendMessage({
  275. exchange: 'foo', routingKey: 'bar',
  276. mandatory: false, immediate: false, ticket: 0
  277. }, {}, Buffer.from('foobar'));
  278. })
  279. .then(succeed(done), fail(done));
  280. },
  281. function(send, wait, done, ch) {
  282. wait(defs.BasicPublish)()
  283. .then(wait(defs.BasicProperties))
  284. .then(wait(undefined)) // content frame
  285. .then(function(f) {
  286. assert.equal('foobar', f.content.toString());
  287. }).then(succeed(done), fail(done));
  288. }));
  289. test("publish content > single chunk threshold", channelTest(
  290. function(ch, done) {
  291. open(ch);
  292. completes(function() {
  293. ch.sendMessage({
  294. exchange: 'foo', routingKey: 'bar',
  295. mandatory: false, immediate: false, ticket: 0
  296. }, {}, Buffer.alloc(3000));
  297. }, done);
  298. },
  299. function(send, wait, done, ch) {
  300. wait(defs.BasicPublish)()
  301. .then(wait(defs.BasicProperties))
  302. .then(wait(undefined)) // content frame
  303. .then(function(f) {
  304. assert.equal(3000, f.content.length);
  305. }).then(succeed(done), fail(done));
  306. }));
  307. test("publish method & headers > threshold", channelTest(
  308. function(ch, done) {
  309. open(ch);
  310. completes(function() {
  311. ch.sendMessage({
  312. exchange: 'foo', routingKey: 'bar',
  313. mandatory: false, immediate: false, ticket: 0
  314. }, {
  315. headers: {foo: Buffer.alloc(3000)}
  316. }, Buffer.from('foobar'));
  317. }, done);
  318. },
  319. function(send, wait, done, ch) {
  320. wait(defs.BasicPublish)()
  321. .then(wait(defs.BasicProperties))
  322. .then(wait(undefined)) // content frame
  323. .then(function(f) {
  324. assert.equal('foobar', f.content.toString());
  325. }).then(succeed(done), fail(done));
  326. }));
  327. test("publish zero-length message", channelTest(
  328. function(ch, done) {
  329. open(ch);
  330. completes(function() {
  331. ch.sendMessage({
  332. exchange: 'foo', routingKey: 'bar',
  333. mandatory: false, immediate: false, ticket: 0
  334. }, {}, Buffer.alloc(0));
  335. ch.sendMessage({
  336. exchange: 'foo', routingKey: 'bar',
  337. mandatory: false, immediate: false, ticket: 0
  338. }, {}, Buffer.alloc(0));
  339. }, done);
  340. },
  341. function(send, wait, done, ch) {
  342. wait(defs.BasicPublish)()
  343. .then(wait(defs.BasicProperties))
  344. // no content frame for a zero-length message
  345. .then(wait(defs.BasicPublish))
  346. .then(succeed(done), fail(done));
  347. }));
  348. test("delivery", channelTest(
  349. function(ch, done) {
  350. open(ch);
  351. ch.on('delivery', function(m) {
  352. completes(function() {
  353. assert.equal('barfoo', m.content.toString());
  354. }, done);
  355. });
  356. },
  357. function(send, wait, done, ch) {
  358. completes(function() {
  359. send(defs.BasicDeliver, DELIVER_FIELDS, ch, Buffer.from('barfoo'));
  360. }, done);
  361. }));
  362. test("zero byte msg", channelTest(
  363. function(ch, done) {
  364. open(ch);
  365. ch.on('delivery', function(m) {
  366. completes(function() {
  367. assert.deepEqual(Buffer.alloc(0), m.content);
  368. }, done);
  369. });
  370. },
  371. function(send, wait, done, ch) {
  372. completes(function() {
  373. send(defs.BasicDeliver, DELIVER_FIELDS, ch, Buffer.from(''));
  374. }, done);
  375. }));
  376. test("bad delivery", channelTest(
  377. function(ch, done) {
  378. var errorAndClose = latch(2, done);
  379. ch.on('error', function(error) {
  380. assert.strictEqual(505, error.code);
  381. assert.strictEqual(60, error.classId);
  382. assert.strictEqual(60, error.methodId);
  383. succeed(errorAndClose)();
  384. });
  385. ch.on('close', succeed(errorAndClose));
  386. open(ch);
  387. },
  388. function(send, wait, done, ch) {
  389. send(defs.BasicDeliver, DELIVER_FIELDS, ch);
  390. // now send another deliver without having sent the content
  391. send(defs.BasicDeliver, DELIVER_FIELDS, ch);
  392. return wait(defs.ChannelClose)()
  393. .then(function() {
  394. send(defs.ChannelCloseOk, {}, ch);
  395. }).then(succeed(done), fail(done));
  396. }));
  397. test("bad content send", channelTest(
  398. function(ch, done) {
  399. completes(function() {
  400. open(ch);
  401. assert.throws(function() {
  402. ch.sendMessage({routingKey: 'foo',
  403. exchange: 'amq.direct'},
  404. {}, null);
  405. });
  406. }, done);
  407. },
  408. function(send, wait, done, ch) {
  409. done();
  410. }));
  411. test("bad properties send", channelTest(
  412. function(ch, done) {
  413. completes(function() {
  414. open(ch);
  415. assert.throws(function() {
  416. ch.sendMessage({routingKey: 'foo',
  417. exchange: 'amq.direct'},
  418. {contentEncoding: 7},
  419. Buffer.from('foobar'));
  420. });
  421. }, done);
  422. },
  423. function(send, wait, done, ch) {
  424. done();
  425. }));
  426. test("bad consumer", channelTest(
  427. function(ch, done) {
  428. var errorAndClose = latch(2, done);
  429. ch.on('delivery', function() {
  430. throw new Error("I am a bad consumer");
  431. });
  432. ch.on('error', function(error) {
  433. assert.strictEqual(541, error.code);
  434. assert.strictEqual(undefined, error.classId);
  435. assert.strictEqual(undefined, error.methodId);
  436. succeed(errorAndClose)();
  437. });
  438. ch.on('close', succeed(errorAndClose));
  439. open(ch);
  440. },
  441. function(send, wait, done, ch) {
  442. send(defs.BasicDeliver, DELIVER_FIELDS, ch, Buffer.from('barfoo'));
  443. return wait(defs.ChannelClose)()
  444. .then(function() {
  445. send(defs.ChannelCloseOk, {}, ch);
  446. }).then(succeed(done), fail(done));
  447. }));
  448. test("bad send in consumer", channelTest(
  449. function(ch, done) {
  450. var errorAndClose = latch(2, done);
  451. ch.on('close', succeed(errorAndClose));
  452. ch.on('error', function(error) {
  453. assert.strictEqual(541, error.code);
  454. assert.strictEqual(undefined, error.classId);
  455. assert.strictEqual(undefined, error.methodId);
  456. succeed(errorAndClose)();
  457. });
  458. ch.on('delivery', function() {
  459. ch.sendMessage({routingKey: 'foo',
  460. exchange: 'amq.direct'},
  461. {}, null); // can't send null
  462. });
  463. open(ch);
  464. },
  465. function(send, wait, done, ch) {
  466. completes(function() {
  467. send(defs.BasicDeliver, DELIVER_FIELDS, ch,
  468. Buffer.from('barfoo'));
  469. }, done);
  470. return wait(defs.ChannelClose)()
  471. .then(function() {
  472. send(defs.ChannelCloseOk, {}, ch);
  473. }).then(succeed(done), fail(done));
  474. }));
  475. test("return", channelTest(
  476. function(ch, done) {
  477. ch.on('return', function(m) {
  478. completes(function() {
  479. assert.equal('barfoo', m.content.toString());
  480. }, done);
  481. });
  482. open(ch);
  483. },
  484. function(send, wait, done, ch) {
  485. completes(function() {
  486. send(defs.BasicReturn, DELIVER_FIELDS, ch, Buffer.from('barfoo'));
  487. }, done);
  488. }));
  489. test("cancel", channelTest(
  490. function(ch, done) {
  491. ch.on('cancel', function(f) {
  492. completes(function() {
  493. assert.equal('product of society', f.consumerTag);
  494. }, done);
  495. });
  496. open(ch);
  497. },
  498. function(send, wait, done, ch) {
  499. completes(function() {
  500. send(defs.BasicCancel, {
  501. consumerTag: 'product of society',
  502. nowait: false
  503. }, ch);
  504. }, done);
  505. }));
  506. function confirmTest(variety, Method) {
  507. return test('confirm ' + variety, channelTest(
  508. function(ch, done) {
  509. ch.on(variety, function(f) {
  510. completes(function() {
  511. assert.equal(1, f.deliveryTag);
  512. }, done);
  513. });
  514. open(ch);
  515. },
  516. function(send, wait, done, ch) {
  517. completes(function() {
  518. send(Method, {
  519. deliveryTag: 1,
  520. multiple: false
  521. }, ch);
  522. }, done);
  523. }));
  524. }
  525. confirmTest("ack", defs.BasicAck);
  526. confirmTest("nack", defs.BasicNack);
  527. test("out-of-order acks", channelTest(
  528. function(ch, done) {
  529. var allConfirms = latch(3, function() {
  530. completes(function() {
  531. assert.equal(0, ch.unconfirmed.length);
  532. assert.equal(4, ch.lwm);
  533. }, done);
  534. });
  535. ch.pushConfirmCallback(allConfirms);
  536. ch.pushConfirmCallback(allConfirms);
  537. ch.pushConfirmCallback(allConfirms);
  538. open(ch);
  539. },
  540. function(send, wait, done, ch) {
  541. completes(function() {
  542. send(defs.BasicAck, {deliveryTag: 2, multiple: false}, ch);
  543. send(defs.BasicAck, {deliveryTag: 3, multiple: false}, ch);
  544. send(defs.BasicAck, {deliveryTag: 1, multiple: false}, ch);
  545. }, done);
  546. }));
  547. test("not all out-of-order acks", channelTest(
  548. function(ch, done) {
  549. var allConfirms = latch(2, function() {
  550. completes(function() {
  551. assert.equal(1, ch.unconfirmed.length);
  552. assert.equal(3, ch.lwm);
  553. }, done);
  554. });
  555. ch.pushConfirmCallback(allConfirms); // tag = 1
  556. ch.pushConfirmCallback(allConfirms); // tag = 2
  557. ch.pushConfirmCallback(function() {
  558. done(new Error('Confirm callback should not be called'));
  559. });
  560. open(ch);
  561. },
  562. function(send, wait, done, ch) {
  563. completes(function() {
  564. send(defs.BasicAck, {deliveryTag: 2, multiple: false}, ch);
  565. send(defs.BasicAck, {deliveryTag: 1, multiple: false}, ch);
  566. }, done);
  567. }));
  568. });