channel_model.js 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. //
  2. //
  3. //
  4. 'use strict';
  5. const EventEmitter = require('events');
  6. const promisify = require('util').promisify;
  7. const defs = require('./defs');
  8. const {BaseChannel} = require('./channel');
  9. const {acceptMessage} = require('./channel');
  10. const Args = require('./api_args');
  11. const {inspect} = require('./format');
  12. class ChannelModel extends EventEmitter {
  13. constructor(connection) {
  14. super();
  15. this.connection = connection;
  16. ['error', 'close', 'blocked', 'unblocked'].forEach(ev => {
  17. connection.on(ev, this.emit.bind(this, ev));
  18. });
  19. }
  20. close() {
  21. return promisify(this.connection.close.bind(this.connection))();
  22. }
  23. updateSecret(newSecret, reason) {
  24. return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
  25. }
  26. async createChannel() {
  27. const channel = new Channel(this.connection);
  28. await channel.open();
  29. return channel;
  30. }
  31. async createConfirmChannel() {
  32. const channel = new ConfirmChannel(this.connection);
  33. await channel.open();
  34. await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
  35. return channel;
  36. }
  37. }
  38. // Channels
  39. class Channel extends BaseChannel {
  40. constructor(connection) {
  41. super(connection);
  42. this.on('delivery', this.handleDelivery.bind(this));
  43. this.on('cancel', this.handleCancel.bind(this));
  44. }
  45. // An RPC that returns a 'proper' promise, which resolves to just the
  46. // response's fields; this is intended to be suitable for implementing
  47. // API procedures.
  48. async rpc(method, fields, expect) {
  49. const f = await promisify(cb => {
  50. return this._rpc(method, fields, expect, cb);
  51. })();
  52. return f.fields;
  53. }
  54. // Do the remarkably simple channel open handshake
  55. async open() {
  56. const ch = await this.allocate.bind(this)();
  57. return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
  58. defs.ChannelOpenOk);
  59. }
  60. close() {
  61. return promisify(cb => {
  62. return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
  63. cb);
  64. })();
  65. }
  66. // === Public API, declaring queues and stuff ===
  67. assertQueue(queue, options) {
  68. return this.rpc(defs.QueueDeclare,
  69. Args.assertQueue(queue, options),
  70. defs.QueueDeclareOk);
  71. }
  72. checkQueue(queue) {
  73. return this.rpc(defs.QueueDeclare,
  74. Args.checkQueue(queue),
  75. defs.QueueDeclareOk);
  76. }
  77. deleteQueue(queue, options) {
  78. return this.rpc(defs.QueueDelete,
  79. Args.deleteQueue(queue, options),
  80. defs.QueueDeleteOk);
  81. }
  82. purgeQueue(queue) {
  83. return this.rpc(defs.QueuePurge,
  84. Args.purgeQueue(queue),
  85. defs.QueuePurgeOk);
  86. }
  87. bindQueue(queue, source, pattern, argt) {
  88. return this.rpc(defs.QueueBind,
  89. Args.bindQueue(queue, source, pattern, argt),
  90. defs.QueueBindOk);
  91. }
  92. unbindQueue(queue, source, pattern, argt) {
  93. return this.rpc(defs.QueueUnbind,
  94. Args.unbindQueue(queue, source, pattern, argt),
  95. defs.QueueUnbindOk);
  96. }
  97. assertExchange(exchange, type, options) {
  98. // The server reply is an empty set of fields, but it's convenient
  99. // to have the exchange name handed to the continuation.
  100. return this.rpc(defs.ExchangeDeclare,
  101. Args.assertExchange(exchange, type, options),
  102. defs.ExchangeDeclareOk)
  103. .then(_ok => { return { exchange }; });
  104. }
  105. checkExchange(exchange) {
  106. return this.rpc(defs.ExchangeDeclare,
  107. Args.checkExchange(exchange),
  108. defs.ExchangeDeclareOk);
  109. }
  110. deleteExchange(name, options) {
  111. return this.rpc(defs.ExchangeDelete,
  112. Args.deleteExchange(name, options),
  113. defs.ExchangeDeleteOk);
  114. }
  115. bindExchange(dest, source, pattern, argt) {
  116. return this.rpc(defs.ExchangeBind,
  117. Args.bindExchange(dest, source, pattern, argt),
  118. defs.ExchangeBindOk);
  119. }
  120. unbindExchange(dest, source, pattern, argt) {
  121. return this.rpc(defs.ExchangeUnbind,
  122. Args.unbindExchange(dest, source, pattern, argt),
  123. defs.ExchangeUnbindOk);
  124. }
  125. // Working with messages
  126. publish(exchange, routingKey, content, options) {
  127. const fieldsAndProps = Args.publish(exchange, routingKey, options);
  128. return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
  129. }
  130. sendToQueue(queue, content, options) {
  131. return this.publish('', queue, content, options);
  132. }
  133. consume(queue, callback, options) {
  134. // NB we want the callback to be run synchronously, so that we've
  135. // registered the consumerTag before any messages can arrive.
  136. const fields = Args.consume(queue, options);
  137. return new Promise((resolve, reject) => {
  138. this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, (err, ok) => {
  139. if (err) return reject(err);
  140. this.registerConsumer(ok.fields.consumerTag, callback);
  141. resolve(ok.fields);
  142. });
  143. });
  144. }
  145. async cancel(consumerTag) {
  146. const ok = await promisify(cb => {
  147. this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
  148. defs.BasicCancelOk,
  149. cb);
  150. })()
  151. .then(ok => {
  152. this.unregisterConsumer(consumerTag);
  153. return ok.fields;
  154. });
  155. }
  156. get(queue, options) {
  157. const fields = Args.get(queue, options);
  158. return new Promise((resolve, reject) => {
  159. this.sendOrEnqueue(defs.BasicGet, fields, (err, f) => {
  160. if (err) return reject(err);
  161. if (f.id === defs.BasicGetEmpty) {
  162. return resolve(false);
  163. }
  164. else if (f.id === defs.BasicGetOk) {
  165. const fields = f.fields;
  166. this.handleMessage = acceptMessage(m => {
  167. m.fields = fields;
  168. resolve(m);
  169. });
  170. }
  171. else {
  172. reject(new Error(`Unexpected response to BasicGet: ${inspect(f)}`));
  173. }
  174. });
  175. });
  176. }
  177. ack(message, allUpTo) {
  178. this.sendImmediately(
  179. defs.BasicAck,
  180. Args.ack(message.fields.deliveryTag, allUpTo));
  181. }
  182. ackAll() {
  183. this.sendImmediately(defs.BasicAck, Args.ack(0, true));
  184. }
  185. nack(message, allUpTo, requeue) {
  186. this.sendImmediately(
  187. defs.BasicNack,
  188. Args.nack(message.fields.deliveryTag, allUpTo, requeue));
  189. }
  190. nackAll(requeue) {
  191. this.sendImmediately(defs.BasicNack,
  192. Args.nack(0, true, requeue));
  193. }
  194. // `Basic.Nack` is not available in older RabbitMQ versions (or in the
  195. // AMQP specification), so you have to use the one-at-a-time
  196. // `Basic.Reject`. This is otherwise synonymous with
  197. // `#nack(message, false, requeue)`.
  198. reject(message, requeue) {
  199. this.sendImmediately(
  200. defs.BasicReject,
  201. Args.reject(message.fields.deliveryTag, requeue));
  202. }
  203. recover() {
  204. return this.rpc(defs.BasicRecover,
  205. Args.recover(),
  206. defs.BasicRecoverOk);
  207. }
  208. qos(count, global) {
  209. return this.rpc(defs.BasicQos,
  210. Args.prefetch(count, global),
  211. defs.BasicQosOk);
  212. }
  213. }
  214. // There are more options in AMQP than exposed here; RabbitMQ only
  215. // implements prefetch based on message count, and only for individual
  216. // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
  217. // (without `global` set) as per-consumer (for consumers following),
  218. // and prefetch with `global` set as per-channel.
  219. Channel.prototype.prefetch = Channel.prototype.qos
  220. // Confirm channel. This is a channel with confirms 'switched on',
  221. // meaning sent messages will provoke a responding 'ack' or 'nack'
  222. // from the server. The upshot of this is that `publish` and
  223. // `sendToQueue` both take a callback, which will be called either
  224. // with `null` as its argument to signify 'ack', or an exception as
  225. // its argument to signify 'nack'.
  226. class ConfirmChannel extends Channel {
  227. publish(exchange, routingKey, content, options, cb) {
  228. this.pushConfirmCallback(cb);
  229. return super.publish(exchange, routingKey, content, options);
  230. }
  231. sendToQueue(queue, content, options, cb) {
  232. return this.publish('', queue, content, options, cb);
  233. }
  234. waitForConfirms() {
  235. const awaiting = [];
  236. const unconfirmed = this.unconfirmed;
  237. unconfirmed.forEach((val, index) => {
  238. if (val !== null) {
  239. const confirmed = new Promise((resolve, reject) => {
  240. unconfirmed[index] = err => {
  241. if (val) val(err);
  242. if (err === null) resolve();
  243. else reject(err);
  244. };
  245. });
  246. awaiting.push(confirmed);
  247. }
  248. });
  249. // Channel closed
  250. if (!this.pending) {
  251. var cb;
  252. while (cb = this.unconfirmed.shift()) {
  253. if (cb) cb(new Error('channel closed'));
  254. }
  255. }
  256. return Promise.all(awaiting);
  257. }
  258. }
  259. module.exports.ConfirmChannel = ConfirmChannel;
  260. module.exports.Channel = Channel;
  261. module.exports.ChannelModel = ChannelModel;