channel_api.js 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606
  1. 'use strict';
  2. var assert = require('assert');
  3. var api = require('../channel_api');
  4. var util = require('./util');
  5. var succeed = util.succeed, fail = util.fail;
  6. var schedule = util.schedule;
  7. var randomString = util.randomString;
  8. var promisify = require('util').promisify;
  9. var URL = process.env.URL || 'amqp://localhost';
  10. function connect() {
  11. return api.connect(URL);
  12. }
  13. // Expect this promise to fail, and flip the results accordingly.
  14. function expectFail(promise) {
  15. return new Promise(function(resolve, reject) {
  16. return promise.then(reject).catch(resolve);
  17. });
  18. }
  19. // I'll rely on operations being rejected, rather than the channel
  20. // close error, to detect failure.
  21. function ignore () {}
  22. function ignoreErrors(c) {
  23. c.on('error', ignore); return c;
  24. }
  25. function logErrors(c) {
  26. c.on('error', console.warn); return c;
  27. }
  28. // Run a test with `name`, given a function that takes an open
  29. // channel, and returns a promise that is resolved on test success or
  30. // rejected on test failure.
  31. function channel_test(chmethod, name, chfun) {
  32. test(name, function(done) {
  33. connect(URL).then(logErrors).then(function(c) {
  34. c[chmethod]().then(ignoreErrors).then(chfun)
  35. .then(succeed(done), fail(done))
  36. // close the connection regardless of what happens with the test
  37. .finally(function() {c.close();});
  38. });
  39. });
  40. }
  41. var chtest = channel_test.bind(null, 'createChannel');
  42. suite("connect", function() {
  43. test("at all", function(done) {
  44. connect(URL).then(function(c) {
  45. return c.close()
  46. ;}).then(succeed(done), fail(done));
  47. });
  48. chtest("create channel", ignore); // i.e., just don't bork
  49. });
  50. suite("updateSecret", function() {
  51. test("updateSecret", function(done) {
  52. connect().then(function(c) {
  53. c.updateSecret(Buffer.from("new secret"), "no reason")
  54. .then(succeed(done), fail(done))
  55. .finally(function() { c.close(); });
  56. });
  57. });
  58. });
  59. var QUEUE_OPTS = {durable: false};
  60. var EX_OPTS = {durable: false};
  61. suite("assert, check, delete", function() {
  62. chtest("assert and check queue", function(ch) {
  63. return ch.assertQueue('test.check-queue', QUEUE_OPTS)
  64. .then(function(qok) {
  65. return ch.checkQueue('test.check-queue');
  66. });
  67. });
  68. chtest("assert and check exchange", function(ch) {
  69. return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS)
  70. .then(function(eok) {
  71. assert.equal('test.check-exchange', eok.exchange);
  72. return ch.checkExchange('test.check-exchange');
  73. });
  74. });
  75. chtest("fail on reasserting queue with different options",
  76. function(ch) {
  77. var q = 'test.reassert-queue';
  78. return ch.assertQueue(
  79. q, {durable: false, autoDelete: true})
  80. .then(function() {
  81. return expectFail(
  82. ch.assertQueue(q, {durable: false,
  83. autoDelete: false}));
  84. });
  85. });
  86. chtest("fail on checking a queue that's not there", function(ch) {
  87. return expectFail(ch.checkQueue('test.random-' + randomString()));
  88. });
  89. chtest("fail on checking an exchange that's not there", function(ch) {
  90. return expectFail(ch.checkExchange('test.random-' + randomString()));
  91. });
  92. chtest("fail on reasserting exchange with different type",
  93. function(ch) {
  94. var ex = 'test.reassert-ex';
  95. return ch.assertExchange(ex, 'fanout', EX_OPTS)
  96. .then(function() {
  97. return expectFail(
  98. ch.assertExchange(ex, 'direct', EX_OPTS));
  99. });
  100. });
  101. chtest("channel break on publishing to non-exchange", function(ch) {
  102. return new Promise(function(resolve) {
  103. ch.on('error', resolve);
  104. ch.publish(randomString(), '', Buffer.from('foobar'));
  105. });
  106. });
  107. chtest("delete queue", function(ch) {
  108. var q = 'test.delete-queue';
  109. return Promise.all([
  110. ch.assertQueue(q, QUEUE_OPTS),
  111. ch.checkQueue(q)])
  112. .then(function() {
  113. return ch.deleteQueue(q);})
  114. .then(function() {
  115. return expectFail(ch.checkQueue(q));});
  116. });
  117. chtest("delete exchange", function(ch) {
  118. var ex = 'test.delete-exchange';
  119. return Promise.all([
  120. ch.assertExchange(ex, 'fanout', EX_OPTS),
  121. ch.checkExchange(ex)])
  122. .then(function() {
  123. return ch.deleteExchange(ex);})
  124. .then(function() {
  125. return expectFail(ch.checkExchange(ex));});
  126. });
  127. });
  128. // Wait for the queue to meet the condition; useful for waiting for
  129. // messages to arrive, for example.
  130. function waitForQueue(q, condition) {
  131. return connect(URL).then(function(c) {
  132. return c.createChannel()
  133. .then(function(ch) {
  134. return ch.checkQueue(q).then(function(qok) {
  135. function check() {
  136. return ch.checkQueue(q).then(function(qok) {
  137. if (condition(qok)) {
  138. c.close();
  139. return qok;
  140. }
  141. else schedule(check);
  142. });
  143. }
  144. return check();
  145. });
  146. });
  147. });
  148. }
  149. // Return a promise that resolves when the queue has at least `num`
  150. // messages. If num is not supplied its assumed to be 1.
  151. function waitForMessages(q, num) {
  152. var min = (num === undefined) ? 1 : num;
  153. return waitForQueue(q, function(qok) {
  154. return qok.messageCount >= min;
  155. });
  156. }
  157. suite("sendMessage", function() {
  158. // publish different size messages
  159. chtest("send to queue and get from queue", function(ch) {
  160. var q = 'test.send-to-q';
  161. var msg = randomString();
  162. return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
  163. .then(function() {
  164. ch.sendToQueue(q, Buffer.from(msg));
  165. return waitForMessages(q);
  166. })
  167. .then(function() {
  168. return ch.get(q, {noAck: true});
  169. })
  170. .then(function(m) {
  171. assert(m);
  172. assert.equal(msg, m.content.toString());
  173. });
  174. });
  175. chtest("send (and get) zero content to queue", function(ch) {
  176. var q = 'test.send-to-q';
  177. var msg = Buffer.alloc(0);
  178. return Promise.all([
  179. ch.assertQueue(q, QUEUE_OPTS),
  180. ch.purgeQueue(q)])
  181. .then(function() {
  182. ch.sendToQueue(q, msg);
  183. return waitForMessages(q);})
  184. .then(function() {
  185. return ch.get(q, {noAck: true});})
  186. .then(function(m) {
  187. assert(m);
  188. assert.deepEqual(msg, m.content);
  189. });
  190. });
  191. });
  192. suite("binding, consuming", function() {
  193. // bind, publish, get
  194. chtest("route message", function(ch) {
  195. var ex = 'test.route-message';
  196. var q = 'test.route-message-q';
  197. var msg = randomString();
  198. return Promise.all([
  199. ch.assertExchange(ex, 'fanout', EX_OPTS),
  200. ch.assertQueue(q, QUEUE_OPTS),
  201. ch.purgeQueue(q),
  202. ch.bindQueue(q, ex, '', {})])
  203. .then(function() {
  204. ch.publish(ex, '', Buffer.from(msg));
  205. return waitForMessages(q);})
  206. .then(function() {
  207. return ch.get(q, {noAck: true});})
  208. .then(function(m) {
  209. assert(m);
  210. assert.equal(msg, m.content.toString());
  211. });
  212. });
  213. // send to queue, purge, get-empty
  214. chtest("purge queue", function(ch) {
  215. var q = 'test.purge-queue';
  216. return ch.assertQueue(q, {durable: false})
  217. .then(function() {
  218. ch.sendToQueue(q, Buffer.from('foobar'));
  219. return waitForMessages(q);})
  220. .then(function() {
  221. ch.purgeQueue(q);
  222. return ch.get(q, {noAck: true});})
  223. .then(function(m) {
  224. assert(!m); // get-empty
  225. });
  226. });
  227. // bind again, unbind, publish, get-empty
  228. chtest("unbind queue", function(ch) {
  229. var ex = 'test.unbind-queue-ex';
  230. var q = 'test.unbind-queue';
  231. var viabinding = randomString();
  232. var direct = randomString();
  233. return Promise.all([
  234. ch.assertExchange(ex, 'fanout', EX_OPTS),
  235. ch.assertQueue(q, QUEUE_OPTS),
  236. ch.purgeQueue(q),
  237. ch.bindQueue(q, ex, '', {})])
  238. .then(function() {
  239. ch.publish(ex, '', Buffer.from('foobar'));
  240. return waitForMessages(q);})
  241. .then(function() { // message got through!
  242. return ch.get(q, {noAck:true})
  243. .then(function(m) {assert(m);});})
  244. .then(function() {
  245. return ch.unbindQueue(q, ex, '', {});})
  246. .then(function() {
  247. // via the no-longer-existing binding
  248. ch.publish(ex, '', Buffer.from(viabinding));
  249. // direct to the queue
  250. ch.sendToQueue(q, Buffer.from(direct));
  251. return waitForMessages(q);})
  252. .then(function() {return ch.get(q)})
  253. .then(function(m) {
  254. // the direct to queue message got through, the via-binding
  255. // message (sent first) did not
  256. assert.equal(direct, m.content.toString());
  257. });
  258. });
  259. // To some extent this is now just testing semantics of the server,
  260. // but we can at least try out a few settings, and consume.
  261. chtest("consume via exchange-exchange binding", function(ch) {
  262. var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';
  263. var q = 'test.ex-ex-binding-q';
  264. var rk = 'test.routing.key', msg = randomString();
  265. return Promise.all([
  266. ch.assertExchange(ex1, 'direct', EX_OPTS),
  267. ch.assertExchange(ex2, 'fanout',
  268. {durable: false, internal: true}),
  269. ch.assertQueue(q, QUEUE_OPTS),
  270. ch.purgeQueue(q),
  271. ch.bindExchange(ex2, ex1, rk, {}),
  272. ch.bindQueue(q, ex2, '', {})])
  273. .then(function() {
  274. return new Promise(function(resolve, reject) {
  275. function delivery(m) {
  276. if (m.content.toString() === msg) resolve();
  277. else reject(new Error("Wrong message"));
  278. }
  279. ch.consume(q, delivery, {noAck: true})
  280. .then(function() {
  281. ch.publish(ex1, rk, Buffer.from(msg));
  282. });
  283. });
  284. });
  285. });
  286. // bind again, unbind, publish, get-empty
  287. chtest("unbind exchange", function(ch) {
  288. var source = 'test.unbind-ex-source';
  289. var dest = 'test.unbind-ex-dest';
  290. var q = 'test.unbind-ex-queue';
  291. var viabinding = randomString();
  292. var direct = randomString();
  293. return Promise.all([
  294. ch.assertExchange(source, 'fanout', EX_OPTS),
  295. ch.assertExchange(dest, 'fanout', EX_OPTS),
  296. ch.assertQueue(q, QUEUE_OPTS),
  297. ch.purgeQueue(q),
  298. ch.bindExchange(dest, source, '', {}),
  299. ch.bindQueue(q, dest, '', {})])
  300. .then(function() {
  301. ch.publish(source, '', Buffer.from('foobar'));
  302. return waitForMessages(q);})
  303. .then(function() { // message got through!
  304. return ch.get(q, {noAck:true})
  305. .then(function(m) {assert(m);});})
  306. .then(function() {
  307. return ch.unbindExchange(dest, source, '', {});})
  308. .then(function() {
  309. // via the no-longer-existing binding
  310. ch.publish(source, '', Buffer.from(viabinding));
  311. // direct to the queue
  312. ch.sendToQueue(q, Buffer.from(direct));
  313. return waitForMessages(q);})
  314. .then(function() {return ch.get(q)})
  315. .then(function(m) {
  316. // the direct to queue message got through, the via-binding
  317. // message (sent first) did not
  318. assert.equal(direct, m.content.toString());
  319. });
  320. });
  321. // This is a bit convoluted. Sorry.
  322. chtest("cancel consumer", function(ch) {
  323. var q = 'test.consumer-cancel';
  324. var ctag;
  325. var recv1 = new Promise(function (resolve, reject) {
  326. Promise.all([
  327. ch.assertQueue(q, QUEUE_OPTS),
  328. ch.purgeQueue(q),
  329. // My callback is 'resolve the promise in `arrived`'
  330. ch.consume(q, resolve, {noAck:true})
  331. .then(function(ok) {
  332. ctag = ok.consumerTag;
  333. ch.sendToQueue(q, Buffer.from('foo'));
  334. })]);
  335. });
  336. // A message should arrive because of the consume
  337. return recv1.then(function() {
  338. var recv2 = Promise.all([
  339. ch.cancel(ctag).then(function() {
  340. return ch.sendToQueue(q, Buffer.from('bar'));
  341. }),
  342. // but check a message did arrive in the queue
  343. waitForMessages(q)])
  344. .then(function() {
  345. return ch.get(q, {noAck:true});
  346. })
  347. .then(function(m) {
  348. // I'm going to reject it, because I flip succeed/fail
  349. // just below
  350. if (m.content.toString() === 'bar') {
  351. throw new Error();
  352. }
  353. });
  354. return expectFail(recv2);
  355. });
  356. });
  357. chtest("cancelled consumer", function(ch) {
  358. var q = 'test.cancelled-consumer';
  359. return new Promise(function(resolve, reject) {
  360. return Promise.all([
  361. ch.assertQueue(q),
  362. ch.purgeQueue(q),
  363. ch.consume(q, function(msg) {
  364. if (msg === null) resolve();
  365. else reject(new Error('Message not expected'));
  366. })])
  367. .then(function() {
  368. return ch.deleteQueue(q);
  369. });
  370. });
  371. });
  372. // ack, by default, removes a single message from the queue
  373. chtest("ack", function(ch) {
  374. var q = 'test.ack';
  375. var msg1 = randomString(), msg2 = randomString();
  376. return Promise.all([
  377. ch.assertQueue(q, QUEUE_OPTS),
  378. ch.purgeQueue(q)])
  379. .then(function() {
  380. ch.sendToQueue(q, Buffer.from(msg1));
  381. ch.sendToQueue(q, Buffer.from(msg2));
  382. return waitForMessages(q, 2);
  383. })
  384. .then(function() {
  385. return ch.get(q, {noAck: false})
  386. })
  387. .then(function(m) {
  388. assert.equal(msg1, m.content.toString());
  389. ch.ack(m);
  390. // %%% is there a race here? may depend on
  391. // rabbitmq-sepcific semantics
  392. return ch.get(q);
  393. })
  394. .then(function(m) {
  395. assert(m);
  396. assert.equal(msg2, m.content.toString());
  397. });
  398. });
  399. // Nack, by default, puts a message back on the queue (where in the
  400. // queue is up to the server)
  401. chtest("nack", function(ch) {
  402. var q = 'test.nack';
  403. var msg1 = randomString();
  404. return Promise.all([
  405. ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
  406. .then(function() {
  407. ch.sendToQueue(q, Buffer.from(msg1));
  408. return waitForMessages(q);})
  409. .then(function() {
  410. return ch.get(q, {noAck: false})})
  411. .then(function(m) {
  412. assert.equal(msg1, m.content.toString());
  413. ch.nack(m);
  414. return waitForMessages(q);})
  415. .then(function() {
  416. return ch.get(q);})
  417. .then(function(m) {
  418. assert(m);
  419. assert.equal(msg1, m.content.toString());
  420. });
  421. });
  422. // reject is a near-synonym for nack, the latter of which is not
  423. // available in earlier RabbitMQ (or in AMQP proper).
  424. chtest("reject", function(ch) {
  425. var q = 'test.reject';
  426. var msg1 = randomString();
  427. return Promise.all([
  428. ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
  429. .then(function() {
  430. ch.sendToQueue(q, Buffer.from(msg1));
  431. return waitForMessages(q);})
  432. .then(function() {
  433. return ch.get(q, {noAck: false})})
  434. .then(function(m) {
  435. assert.equal(msg1, m.content.toString());
  436. ch.reject(m);
  437. return waitForMessages(q);})
  438. .then(function() {
  439. return ch.get(q);})
  440. .then(function(m) {
  441. assert(m);
  442. assert.equal(msg1, m.content.toString());
  443. });
  444. });
  445. chtest("prefetch", function(ch) {
  446. var q = 'test.prefetch';
  447. return Promise.all([
  448. ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),
  449. ch.prefetch(1)])
  450. .then(function() {
  451. ch.sendToQueue(q, Buffer.from('foobar'));
  452. ch.sendToQueue(q, Buffer.from('foobar'));
  453. return waitForMessages(q, 2);
  454. })
  455. .then(function() {
  456. return new Promise(function(resolve) {
  457. var messageCount = 0;
  458. function receive(msg) {
  459. ch.ack(msg);
  460. if (++messageCount > 1) {
  461. resolve(messageCount);
  462. }
  463. }
  464. return ch.consume(q, receive, {noAck: false})
  465. });
  466. })
  467. .then(function(c) {
  468. return assert.equal(2, c);
  469. });
  470. });
  471. chtest('close', function(ch) {
  472. // Resolving promise guarantees
  473. // channel is closed
  474. return ch.close();
  475. });
  476. });
  477. var confirmtest = channel_test.bind(null, 'createConfirmChannel');
  478. suite("confirms", function() {
  479. confirmtest('message is confirmed', function(ch) {
  480. var q = 'test.confirm-message';
  481. return Promise.all([
  482. ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
  483. .then(function() {
  484. return ch.sendToQueue(q, Buffer.from('bleep'));
  485. });
  486. });
  487. // Usually one can provoke the server into confirming more than one
  488. // message in an ack by simply sending a few messages in quick
  489. // succession; a bit unscientific I know. Luckily we can eavesdrop on
  490. // the acknowledgements coming through to see if we really did get a
  491. // multi-ack.
  492. confirmtest('multiple confirms', function(ch) {
  493. var q = 'test.multiple-confirms';
  494. return Promise.all([
  495. ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
  496. .then(function() {
  497. var multipleRainbows = false;
  498. ch.on('ack', function(a) {
  499. if (a.multiple) multipleRainbows = true;
  500. });
  501. function prod(num) {
  502. var cs = [];
  503. function sendAndPushPromise() {
  504. var conf = promisify(function(cb) {
  505. return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb);
  506. })();
  507. cs.push(conf);
  508. }
  509. for (var i=0; i < num; i++) sendAndPushPromise();
  510. return Promise.all(cs).then(function() {
  511. if (multipleRainbows) return true;
  512. else if (num > 500) throw new Error(
  513. "Couldn't provoke the server" +
  514. " into multi-acking with " + num +
  515. " messages; giving up");
  516. else {
  517. //console.warn("Failed with " + num + "; trying " + num * 2);
  518. return prod(num * 2);
  519. }
  520. });
  521. }
  522. return prod(5);
  523. });
  524. });
  525. confirmtest('wait for confirms', function(ch) {
  526. for (var i=0; i < 1000; i++) {
  527. ch.publish('', '', Buffer.from('foobar'), {});
  528. }
  529. return ch.waitForConfirms();
  530. })
  531. confirmtest('works when channel is closed', function(ch) {
  532. for (var i=0; i < 1000; i++) {
  533. ch.publish('', '', Buffer.from('foobar'), {});
  534. }
  535. return ch.close().then(function () {
  536. return ch.waitForConfirms()
  537. }).then(function () {
  538. assert.strictEqual(true, false, 'Wait should have failed.')
  539. }, function (e) {
  540. assert.strictEqual(e.message, 'channel closed')
  541. });
  542. });
  543. });