123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- //
- //
- //
- 'use strict';
- /*
- The channel (promise) and callback APIs have similar signatures, and
- in particular, both need AMQP fields prepared from the same arguments
- and options. The arguments marshalling is done here. Each of the
- procedures below takes arguments and options (the latter in an object)
- particular to the operation it represents, and returns an object with
- fields for handing to the encoder.
- */
- // A number of AMQP methods have a table-typed field called
- // `arguments`, that is intended to carry extension-specific
- // values. RabbitMQ uses this in a number of places; e.g., to specify
- // an 'alternate exchange'.
- //
- // Many of the methods in this API have an `options` argument, from
- // which I take both values that have a default in AMQP (e.g.,
- // autoDelete in QueueDeclare) *and* values that are specific to
- // RabbitMQ (e.g., 'alternate-exchange'), which would normally be
- // supplied in `arguments`. So that extensions I don't support yet can
- // be used, I include `arguments` itself among the options.
- //
- // The upshot of this is that I often need to prepare an `arguments`
- // value that has any values passed in `options.arguments` as well as
- // any I've promoted to being options themselves. Since I don't want
- // to mutate anything passed in, the general pattern is to create a
- // fresh object with the `arguments` value given as its prototype; all
- // fields in the supplied value will be serialised, as well as any I
- // set on the fresh object. What I don't want to do, however, is set a
- // field to undefined by copying possibly missing field values,
- // because that will mask a value in the prototype.
- //
- // NB the `arguments` field already has a default value of `{}`, so
- // there's no need to explicitly default it unless I'm setting values.
- function setIfDefined(obj, prop, value) {
- if (value != undefined) obj[prop] = value;
- }
- var EMPTY_OPTIONS = Object.freeze({});
- var Args = {};
- Args.assertQueue = function(queue, options) {
- queue = queue || '';
- options = options || EMPTY_OPTIONS;
- var argt = Object.create(options.arguments || null);
- setIfDefined(argt, 'x-expires', options.expires);
- setIfDefined(argt, 'x-message-ttl', options.messageTtl);
- setIfDefined(argt, 'x-dead-letter-exchange',
- options.deadLetterExchange);
- setIfDefined(argt, 'x-dead-letter-routing-key',
- options.deadLetterRoutingKey);
- setIfDefined(argt, 'x-max-length', options.maxLength);
- setIfDefined(argt, 'x-max-priority', options.maxPriority);
- setIfDefined(argt, 'x-overflow', options.overflow);
- setIfDefined(argt, 'x-queue-mode', options.queueMode);
- return {
- queue: queue,
- exclusive: !!options.exclusive,
- durable: (options.durable === undefined) ? true : options.durable,
- autoDelete: !!options.autoDelete,
- arguments: argt,
- passive: false,
- // deprecated but we have to include it
- ticket: 0,
- nowait: false
- };
- };
- Args.checkQueue = function(queue) {
- return {
- queue: queue,
- passive: true, // switch to "completely different" mode
- nowait: false,
- durable: true, autoDelete: false, exclusive: false, // ignored
- ticket: 0,
- };
- };
- Args.deleteQueue = function(queue, options) {
- options = options || EMPTY_OPTIONS;
- return {
- queue: queue,
- ifUnused: !!options.ifUnused,
- ifEmpty: !!options.ifEmpty,
- ticket: 0, nowait: false
- };
- };
- Args.purgeQueue = function(queue) {
- return {
- queue: queue,
- ticket: 0, nowait: false
- };
- };
- Args.bindQueue = function(queue, source, pattern, argt) {
- return {
- queue: queue,
- exchange: source,
- routingKey: pattern,
- arguments: argt,
- ticket: 0, nowait: false
- };
- };
- Args.unbindQueue = function(queue, source, pattern, argt) {
- return {
- queue: queue,
- exchange: source,
- routingKey: pattern,
- arguments: argt,
- ticket: 0, nowait: false
- };
- };
- Args.assertExchange = function(exchange, type, options) {
- options = options || EMPTY_OPTIONS;
- var argt = Object.create(options.arguments || null);
- setIfDefined(argt, 'alternate-exchange', options.alternateExchange);
- return {
- exchange: exchange,
- ticket: 0,
- type: type,
- passive: false,
- durable: (options.durable === undefined) ? true : options.durable,
- autoDelete: !!options.autoDelete,
- internal: !!options.internal,
- nowait: false,
- arguments: argt
- };
- };
- Args.checkExchange = function(exchange) {
- return {
- exchange: exchange,
- passive: true, // switch to 'may as well be another method' mode
- nowait: false,
- // ff are ignored
- durable: true, internal: false, type: '', autoDelete: false,
- ticket: 0
- };
- };
- Args.deleteExchange = function(exchange, options) {
- options = options || EMPTY_OPTIONS;
- return {
- exchange: exchange,
- ifUnused: !!options.ifUnused,
- ticket: 0, nowait: false
- };
- };
- Args.bindExchange = function(dest, source, pattern, argt) {
- return {
- source: source,
- destination: dest,
- routingKey: pattern,
- arguments: argt,
- ticket: 0, nowait: false
- };
- };
- Args.unbindExchange = function(dest, source, pattern, argt) {
- return {
- source: source,
- destination: dest,
- routingKey: pattern,
- arguments: argt,
- ticket: 0, nowait: false
- };
- };
- // It's convenient to construct the properties and the method fields
- // at the same time, since in the APIs, values for both can appear in
- // `options`. Since the property or mthod field names don't overlap, I
- // just return one big object that can be used for both purposes, and
- // the encoder will pick out what it wants.
- Args.publish = function(exchange, routingKey, options) {
- options = options || EMPTY_OPTIONS;
- // The CC and BCC fields expect an array of "longstr", which would
- // normally be buffer values in JavaScript; however, since a field
- // array (or table) cannot have shortstr values, the codec will
- // encode all strings as longstrs anyway.
- function convertCC(cc) {
- if (cc === undefined) {
- return undefined;
- }
- else if (Array.isArray(cc)) {
- return cc.map(String);
- }
- else return [String(cc)];
- }
- var headers = Object.create(options.headers || null);
- setIfDefined(headers, 'CC', convertCC(options.CC));
- setIfDefined(headers, 'BCC', convertCC(options.BCC));
- var deliveryMode; // undefined will default to 1 (non-persistent)
- // Previously I overloaded deliveryMode be a boolean meaning
- // 'persistent or not'; better is to name this option for what it
- // is, but I need to have backwards compatibility for applications
- // that either supply a numeric or boolean value.
- if (options.persistent !== undefined)
- deliveryMode = (options.persistent) ? 2 : 1;
- else if (typeof options.deliveryMode === 'number')
- deliveryMode = options.deliveryMode;
- else if (options.deliveryMode) // is supplied and truthy
- deliveryMode = 2;
- var expiration = options.expiration;
- if (expiration !== undefined) expiration = expiration.toString();
- return {
- // method fields
- exchange: exchange,
- routingKey: routingKey,
- mandatory: !!options.mandatory,
- immediate: false, // RabbitMQ doesn't implement this any more
- ticket: undefined,
- // properties
- contentType: options.contentType,
- contentEncoding: options.contentEncoding,
- headers: headers,
- deliveryMode: deliveryMode,
- priority: options.priority,
- correlationId: options.correlationId,
- replyTo: options.replyTo,
- expiration: expiration,
- messageId: options.messageId,
- timestamp: options.timestamp,
- type: options.type,
- userId: options.userId,
- appId: options.appId,
- clusterId: undefined
- };
- };
- Args.consume = function(queue, options) {
- options = options || EMPTY_OPTIONS;
- var argt = Object.create(options.arguments || null);
- setIfDefined(argt, 'x-priority', options.priority);
- return {
- ticket: 0,
- queue: queue,
- consumerTag: options.consumerTag || '',
- noLocal: !!options.noLocal,
- noAck: !!options.noAck,
- exclusive: !!options.exclusive,
- nowait: false,
- arguments: argt
- };
- };
- Args.cancel = function(consumerTag) {
- return {
- consumerTag: consumerTag,
- nowait: false
- };
- };
- Args.get = function(queue, options) {
- options = options || EMPTY_OPTIONS;
- return {
- ticket: 0,
- queue: queue,
- noAck: !!options.noAck
- };
- };
- Args.ack = function(tag, allUpTo) {
- return {
- deliveryTag: tag,
- multiple: !!allUpTo
- };
- };
- Args.nack = function(tag, allUpTo, requeue) {
- return {
- deliveryTag: tag,
- multiple: !!allUpTo,
- requeue: (requeue === undefined) ? true : requeue
- };
- };
- Args.reject = function(tag, requeue) {
- return {
- deliveryTag: tag,
- requeue: (requeue === undefined) ? true : requeue
- };
- };
- Args.prefetch = function(count, global) {
- return {
- prefetchCount: count || 0,
- prefetchSize: 0,
- global: !!global
- };
- };
- Args.recover = function() {
- return {requeue: true};
- };
- module.exports = Object.freeze(Args);
|