123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- //
- //
- //
- 'use strict';
- const EventEmitter = require('events');
- const promisify = require('util').promisify;
- const defs = require('./defs');
- const {BaseChannel} = require('./channel');
- const {acceptMessage} = require('./channel');
- const Args = require('./api_args');
- const {inspect} = require('./format');
- class ChannelModel extends EventEmitter {
- constructor(connection) {
- super();
- this.connection = connection;
- ['error', 'close', 'blocked', 'unblocked'].forEach(ev => {
- connection.on(ev, this.emit.bind(this, ev));
- });
- }
- close() {
- return promisify(this.connection.close.bind(this.connection))();
- }
- updateSecret(newSecret, reason) {
- return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
- }
- async createChannel() {
- const channel = new Channel(this.connection);
- await channel.open();
- return channel;
- }
- async createConfirmChannel() {
- const channel = new ConfirmChannel(this.connection);
- await channel.open();
- await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
- return channel;
- }
- }
- // Channels
- class Channel extends BaseChannel {
- constructor(connection) {
- super(connection);
- this.on('delivery', this.handleDelivery.bind(this));
- this.on('cancel', this.handleCancel.bind(this));
- }
- // An RPC that returns a 'proper' promise, which resolves to just the
- // response's fields; this is intended to be suitable for implementing
- // API procedures.
- async rpc(method, fields, expect) {
- const f = await promisify(cb => {
- return this._rpc(method, fields, expect, cb);
- })();
- return f.fields;
- }
- // Do the remarkably simple channel open handshake
- async open() {
- const ch = await this.allocate.bind(this)();
- return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
- defs.ChannelOpenOk);
- }
- close() {
- return promisify(cb => {
- return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
- cb);
- })();
- }
- // === Public API, declaring queues and stuff ===
- assertQueue(queue, options) {
- return this.rpc(defs.QueueDeclare,
- Args.assertQueue(queue, options),
- defs.QueueDeclareOk);
- }
- checkQueue(queue) {
- return this.rpc(defs.QueueDeclare,
- Args.checkQueue(queue),
- defs.QueueDeclareOk);
- }
- deleteQueue(queue, options) {
- return this.rpc(defs.QueueDelete,
- Args.deleteQueue(queue, options),
- defs.QueueDeleteOk);
- }
- purgeQueue(queue) {
- return this.rpc(defs.QueuePurge,
- Args.purgeQueue(queue),
- defs.QueuePurgeOk);
- }
- bindQueue(queue, source, pattern, argt) {
- return this.rpc(defs.QueueBind,
- Args.bindQueue(queue, source, pattern, argt),
- defs.QueueBindOk);
- }
- unbindQueue(queue, source, pattern, argt) {
- return this.rpc(defs.QueueUnbind,
- Args.unbindQueue(queue, source, pattern, argt),
- defs.QueueUnbindOk);
- }
- assertExchange(exchange, type, options) {
- // The server reply is an empty set of fields, but it's convenient
- // to have the exchange name handed to the continuation.
- return this.rpc(defs.ExchangeDeclare,
- Args.assertExchange(exchange, type, options),
- defs.ExchangeDeclareOk)
- .then(_ok => { return { exchange }; });
- }
- checkExchange(exchange) {
- return this.rpc(defs.ExchangeDeclare,
- Args.checkExchange(exchange),
- defs.ExchangeDeclareOk);
- }
- deleteExchange(name, options) {
- return this.rpc(defs.ExchangeDelete,
- Args.deleteExchange(name, options),
- defs.ExchangeDeleteOk);
- }
- bindExchange(dest, source, pattern, argt) {
- return this.rpc(defs.ExchangeBind,
- Args.bindExchange(dest, source, pattern, argt),
- defs.ExchangeBindOk);
- }
- unbindExchange(dest, source, pattern, argt) {
- return this.rpc(defs.ExchangeUnbind,
- Args.unbindExchange(dest, source, pattern, argt),
- defs.ExchangeUnbindOk);
- }
- // Working with messages
- publish(exchange, routingKey, content, options) {
- const fieldsAndProps = Args.publish(exchange, routingKey, options);
- return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
- }
- sendToQueue(queue, content, options) {
- return this.publish('', queue, content, options);
- }
- consume(queue, callback, options) {
- // NB we want the callback to be run synchronously, so that we've
- // registered the consumerTag before any messages can arrive.
- const fields = Args.consume(queue, options);
- return new Promise((resolve, reject) => {
- this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, (err, ok) => {
- if (err) return reject(err);
- this.registerConsumer(ok.fields.consumerTag, callback);
- resolve(ok.fields);
- });
- });
- }
- async cancel(consumerTag) {
- const ok = await promisify(cb => {
- this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
- defs.BasicCancelOk,
- cb);
- })()
- .then(ok => {
- this.unregisterConsumer(consumerTag);
- return ok.fields;
- });
- }
- get(queue, options) {
- const fields = Args.get(queue, options);
- return new Promise((resolve, reject) => {
- this.sendOrEnqueue(defs.BasicGet, fields, (err, f) => {
- if (err) return reject(err);
- if (f.id === defs.BasicGetEmpty) {
- return resolve(false);
- }
- else if (f.id === defs.BasicGetOk) {
- const fields = f.fields;
- this.handleMessage = acceptMessage(m => {
- m.fields = fields;
- resolve(m);
- });
- }
- else {
- reject(new Error(`Unexpected response to BasicGet: ${inspect(f)}`));
- }
- });
- });
- }
- ack(message, allUpTo) {
- this.sendImmediately(
- defs.BasicAck,
- Args.ack(message.fields.deliveryTag, allUpTo));
- }
- ackAll() {
- this.sendImmediately(defs.BasicAck, Args.ack(0, true));
- }
- nack(message, allUpTo, requeue) {
- this.sendImmediately(
- defs.BasicNack,
- Args.nack(message.fields.deliveryTag, allUpTo, requeue));
- }
- nackAll(requeue) {
- this.sendImmediately(defs.BasicNack,
- Args.nack(0, true, requeue));
- }
- // `Basic.Nack` is not available in older RabbitMQ versions (or in the
- // AMQP specification), so you have to use the one-at-a-time
- // `Basic.Reject`. This is otherwise synonymous with
- // `#nack(message, false, requeue)`.
- reject(message, requeue) {
- this.sendImmediately(
- defs.BasicReject,
- Args.reject(message.fields.deliveryTag, requeue));
- }
- recover() {
- return this.rpc(defs.BasicRecover,
- Args.recover(),
- defs.BasicRecoverOk);
- }
- qos(count, global) {
- return this.rpc(defs.BasicQos,
- Args.prefetch(count, global),
- defs.BasicQosOk);
- }
- }
- // There are more options in AMQP than exposed here; RabbitMQ only
- // implements prefetch based on message count, and only for individual
- // channels or consumers. RabbitMQ v3.3.0 and after treat prefetch
- // (without `global` set) as per-consumer (for consumers following),
- // and prefetch with `global` set as per-channel.
- Channel.prototype.prefetch = Channel.prototype.qos
- // Confirm channel. This is a channel with confirms 'switched on',
- // meaning sent messages will provoke a responding 'ack' or 'nack'
- // from the server. The upshot of this is that `publish` and
- // `sendToQueue` both take a callback, which will be called either
- // with `null` as its argument to signify 'ack', or an exception as
- // its argument to signify 'nack'.
- class ConfirmChannel extends Channel {
- publish(exchange, routingKey, content, options, cb) {
- this.pushConfirmCallback(cb);
- return super.publish(exchange, routingKey, content, options);
- }
- sendToQueue(queue, content, options, cb) {
- return this.publish('', queue, content, options, cb);
- }
- waitForConfirms() {
- const awaiting = [];
- const unconfirmed = this.unconfirmed;
- unconfirmed.forEach((val, index) => {
- if (val !== null) {
- const confirmed = new Promise((resolve, reject) => {
- unconfirmed[index] = err => {
- if (val) val(err);
- if (err === null) resolve();
- else reject(err);
- };
- });
- awaiting.push(confirmed);
- }
- });
- // Channel closed
- if (!this.pending) {
- var cb;
- while (cb = this.unconfirmed.shift()) {
- if (cb) cb(new Error('channel closed'));
- }
- }
- return Promise.all(awaiting);
- }
- }
- module.exports.ConfirmChannel = ConfirmChannel;
- module.exports.Channel = Channel;
- module.exports.ChannelModel = ChannelModel;
|