api_args.js 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. //
  2. //
  3. //
  4. 'use strict';
  5. /*
  6. The channel (promise) and callback APIs have similar signatures, and
  7. in particular, both need AMQP fields prepared from the same arguments
  8. and options. The arguments marshalling is done here. Each of the
  9. procedures below takes arguments and options (the latter in an object)
  10. particular to the operation it represents, and returns an object with
  11. fields for handing to the encoder.
  12. */
  13. // A number of AMQP methods have a table-typed field called
  14. // `arguments`, that is intended to carry extension-specific
  15. // values. RabbitMQ uses this in a number of places; e.g., to specify
  16. // an 'alternate exchange'.
  17. //
  18. // Many of the methods in this API have an `options` argument, from
  19. // which I take both values that have a default in AMQP (e.g.,
  20. // autoDelete in QueueDeclare) *and* values that are specific to
  21. // RabbitMQ (e.g., 'alternate-exchange'), which would normally be
  22. // supplied in `arguments`. So that extensions I don't support yet can
  23. // be used, I include `arguments` itself among the options.
  24. //
  25. // The upshot of this is that I often need to prepare an `arguments`
  26. // value that has any values passed in `options.arguments` as well as
  27. // any I've promoted to being options themselves. Since I don't want
  28. // to mutate anything passed in, the general pattern is to create a
  29. // fresh object with the `arguments` value given as its prototype; all
  30. // fields in the supplied value will be serialised, as well as any I
  31. // set on the fresh object. What I don't want to do, however, is set a
  32. // field to undefined by copying possibly missing field values,
  33. // because that will mask a value in the prototype.
  34. //
  35. // NB the `arguments` field already has a default value of `{}`, so
  36. // there's no need to explicitly default it unless I'm setting values.
  37. function setIfDefined(obj, prop, value) {
  38. if (value != undefined) obj[prop] = value;
  39. }
  40. var EMPTY_OPTIONS = Object.freeze({});
  41. var Args = {};
  42. Args.assertQueue = function(queue, options) {
  43. queue = queue || '';
  44. options = options || EMPTY_OPTIONS;
  45. var argt = Object.create(options.arguments || null);
  46. setIfDefined(argt, 'x-expires', options.expires);
  47. setIfDefined(argt, 'x-message-ttl', options.messageTtl);
  48. setIfDefined(argt, 'x-dead-letter-exchange',
  49. options.deadLetterExchange);
  50. setIfDefined(argt, 'x-dead-letter-routing-key',
  51. options.deadLetterRoutingKey);
  52. setIfDefined(argt, 'x-max-length', options.maxLength);
  53. setIfDefined(argt, 'x-max-priority', options.maxPriority);
  54. setIfDefined(argt, 'x-overflow', options.overflow);
  55. setIfDefined(argt, 'x-queue-mode', options.queueMode);
  56. return {
  57. queue: queue,
  58. exclusive: !!options.exclusive,
  59. durable: (options.durable === undefined) ? true : options.durable,
  60. autoDelete: !!options.autoDelete,
  61. arguments: argt,
  62. passive: false,
  63. // deprecated but we have to include it
  64. ticket: 0,
  65. nowait: false
  66. };
  67. };
  68. Args.checkQueue = function(queue) {
  69. return {
  70. queue: queue,
  71. passive: true, // switch to "completely different" mode
  72. nowait: false,
  73. durable: true, autoDelete: false, exclusive: false, // ignored
  74. ticket: 0,
  75. };
  76. };
  77. Args.deleteQueue = function(queue, options) {
  78. options = options || EMPTY_OPTIONS;
  79. return {
  80. queue: queue,
  81. ifUnused: !!options.ifUnused,
  82. ifEmpty: !!options.ifEmpty,
  83. ticket: 0, nowait: false
  84. };
  85. };
  86. Args.purgeQueue = function(queue) {
  87. return {
  88. queue: queue,
  89. ticket: 0, nowait: false
  90. };
  91. };
  92. Args.bindQueue = function(queue, source, pattern, argt) {
  93. return {
  94. queue: queue,
  95. exchange: source,
  96. routingKey: pattern,
  97. arguments: argt,
  98. ticket: 0, nowait: false
  99. };
  100. };
  101. Args.unbindQueue = function(queue, source, pattern, argt) {
  102. return {
  103. queue: queue,
  104. exchange: source,
  105. routingKey: pattern,
  106. arguments: argt,
  107. ticket: 0, nowait: false
  108. };
  109. };
  110. Args.assertExchange = function(exchange, type, options) {
  111. options = options || EMPTY_OPTIONS;
  112. var argt = Object.create(options.arguments || null);
  113. setIfDefined(argt, 'alternate-exchange', options.alternateExchange);
  114. return {
  115. exchange: exchange,
  116. ticket: 0,
  117. type: type,
  118. passive: false,
  119. durable: (options.durable === undefined) ? true : options.durable,
  120. autoDelete: !!options.autoDelete,
  121. internal: !!options.internal,
  122. nowait: false,
  123. arguments: argt
  124. };
  125. };
  126. Args.checkExchange = function(exchange) {
  127. return {
  128. exchange: exchange,
  129. passive: true, // switch to 'may as well be another method' mode
  130. nowait: false,
  131. // ff are ignored
  132. durable: true, internal: false, type: '', autoDelete: false,
  133. ticket: 0
  134. };
  135. };
  136. Args.deleteExchange = function(exchange, options) {
  137. options = options || EMPTY_OPTIONS;
  138. return {
  139. exchange: exchange,
  140. ifUnused: !!options.ifUnused,
  141. ticket: 0, nowait: false
  142. };
  143. };
  144. Args.bindExchange = function(dest, source, pattern, argt) {
  145. return {
  146. source: source,
  147. destination: dest,
  148. routingKey: pattern,
  149. arguments: argt,
  150. ticket: 0, nowait: false
  151. };
  152. };
  153. Args.unbindExchange = function(dest, source, pattern, argt) {
  154. return {
  155. source: source,
  156. destination: dest,
  157. routingKey: pattern,
  158. arguments: argt,
  159. ticket: 0, nowait: false
  160. };
  161. };
  162. // It's convenient to construct the properties and the method fields
  163. // at the same time, since in the APIs, values for both can appear in
  164. // `options`. Since the property or mthod field names don't overlap, I
  165. // just return one big object that can be used for both purposes, and
  166. // the encoder will pick out what it wants.
  167. Args.publish = function(exchange, routingKey, options) {
  168. options = options || EMPTY_OPTIONS;
  169. // The CC and BCC fields expect an array of "longstr", which would
  170. // normally be buffer values in JavaScript; however, since a field
  171. // array (or table) cannot have shortstr values, the codec will
  172. // encode all strings as longstrs anyway.
  173. function convertCC(cc) {
  174. if (cc === undefined) {
  175. return undefined;
  176. }
  177. else if (Array.isArray(cc)) {
  178. return cc.map(String);
  179. }
  180. else return [String(cc)];
  181. }
  182. var headers = Object.create(options.headers || null);
  183. setIfDefined(headers, 'CC', convertCC(options.CC));
  184. setIfDefined(headers, 'BCC', convertCC(options.BCC));
  185. var deliveryMode; // undefined will default to 1 (non-persistent)
  186. // Previously I overloaded deliveryMode be a boolean meaning
  187. // 'persistent or not'; better is to name this option for what it
  188. // is, but I need to have backwards compatibility for applications
  189. // that either supply a numeric or boolean value.
  190. if (options.persistent !== undefined)
  191. deliveryMode = (options.persistent) ? 2 : 1;
  192. else if (typeof options.deliveryMode === 'number')
  193. deliveryMode = options.deliveryMode;
  194. else if (options.deliveryMode) // is supplied and truthy
  195. deliveryMode = 2;
  196. var expiration = options.expiration;
  197. if (expiration !== undefined) expiration = expiration.toString();
  198. return {
  199. // method fields
  200. exchange: exchange,
  201. routingKey: routingKey,
  202. mandatory: !!options.mandatory,
  203. immediate: false, // RabbitMQ doesn't implement this any more
  204. ticket: undefined,
  205. // properties
  206. contentType: options.contentType,
  207. contentEncoding: options.contentEncoding,
  208. headers: headers,
  209. deliveryMode: deliveryMode,
  210. priority: options.priority,
  211. correlationId: options.correlationId,
  212. replyTo: options.replyTo,
  213. expiration: expiration,
  214. messageId: options.messageId,
  215. timestamp: options.timestamp,
  216. type: options.type,
  217. userId: options.userId,
  218. appId: options.appId,
  219. clusterId: undefined
  220. };
  221. };
  222. Args.consume = function(queue, options) {
  223. options = options || EMPTY_OPTIONS;
  224. var argt = Object.create(options.arguments || null);
  225. setIfDefined(argt, 'x-priority', options.priority);
  226. return {
  227. ticket: 0,
  228. queue: queue,
  229. consumerTag: options.consumerTag || '',
  230. noLocal: !!options.noLocal,
  231. noAck: !!options.noAck,
  232. exclusive: !!options.exclusive,
  233. nowait: false,
  234. arguments: argt
  235. };
  236. };
  237. Args.cancel = function(consumerTag) {
  238. return {
  239. consumerTag: consumerTag,
  240. nowait: false
  241. };
  242. };
  243. Args.get = function(queue, options) {
  244. options = options || EMPTY_OPTIONS;
  245. return {
  246. ticket: 0,
  247. queue: queue,
  248. noAck: !!options.noAck
  249. };
  250. };
  251. Args.ack = function(tag, allUpTo) {
  252. return {
  253. deliveryTag: tag,
  254. multiple: !!allUpTo
  255. };
  256. };
  257. Args.nack = function(tag, allUpTo, requeue) {
  258. return {
  259. deliveryTag: tag,
  260. multiple: !!allUpTo,
  261. requeue: (requeue === undefined) ? true : requeue
  262. };
  263. };
  264. Args.reject = function(tag, requeue) {
  265. return {
  266. deliveryTag: tag,
  267. requeue: (requeue === undefined) ? true : requeue
  268. };
  269. };
  270. Args.prefetch = function(count, global) {
  271. return {
  272. prefetchCount: count || 0,
  273. prefetchSize: 0,
  274. global: !!global
  275. };
  276. };
  277. Args.recover = function() {
  278. return {requeue: true};
  279. };
  280. module.exports = Object.freeze(Args);