callback_model.js 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. //
  2. //
  3. //
  4. 'use strict';
  5. var defs = require('./defs');
  6. var EventEmitter = require('events');
  7. var BaseChannel = require('./channel').BaseChannel;
  8. var acceptMessage = require('./channel').acceptMessage;
  9. var Args = require('./api_args');
  10. class CallbackModel extends EventEmitter {
  11. constructor (connection) {
  12. super();
  13. this.connection = connection;
  14. var self = this;
  15. ['error', 'close', 'blocked', 'unblocked'].forEach(function (ev) {
  16. connection.on(ev, self.emit.bind(self, ev));
  17. });
  18. }
  19. close (cb) {
  20. this.connection.close(cb);
  21. }
  22. updateSecret(newSecret, reason, cb) {
  23. this.connection._updateSecret(newSecret, reason, cb);
  24. }
  25. createChannel (cb) {
  26. var ch = new Channel(this.connection);
  27. ch.open(function (err, ok) {
  28. if (err === null)
  29. cb && cb(null, ch);
  30. else
  31. cb && cb(err);
  32. });
  33. return ch;
  34. }
  35. createConfirmChannel (cb) {
  36. var ch = new ConfirmChannel(this.connection);
  37. ch.open(function (err) {
  38. if (err !== null)
  39. return cb && cb(err);
  40. else {
  41. ch.rpc(defs.ConfirmSelect, { nowait: false },
  42. defs.ConfirmSelectOk, function (err, _ok) {
  43. if (err !== null)
  44. return cb && cb(err);
  45. else
  46. cb && cb(null, ch);
  47. });
  48. }
  49. });
  50. return ch;
  51. }
  52. }
  53. class Channel extends BaseChannel {
  54. constructor (connection) {
  55. super(connection);
  56. this.on('delivery', this.handleDelivery.bind(this));
  57. this.on('cancel', this.handleCancel.bind(this));
  58. }
  59. // This encodes straight-forward RPC: no side-effects and return the
  60. // fields from the server response. It wraps the callback given it, so
  61. // the calling method argument can be passed as-is. For anything that
  62. // needs to have side-effects, or needs to change the server response,
  63. // use `#_rpc(...)` and remember to dereference `.fields` of the
  64. // server response.
  65. rpc (method, fields, expect, cb0) {
  66. var cb = callbackWrapper(this, cb0);
  67. this._rpc(method, fields, expect, function (err, ok) {
  68. cb(err, ok && ok.fields); // in case of an error, ok will be
  69. // undefined
  70. });
  71. return this;
  72. }
  73. // === Public API ===
  74. open (cb) {
  75. try { this.allocate(); }
  76. catch (e) { return cb(e); }
  77. return this.rpc(defs.ChannelOpen, { outOfBand: "" },
  78. defs.ChannelOpenOk, cb);
  79. }
  80. close (cb) {
  81. return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
  82. function () { cb && cb(null); });
  83. }
  84. assertQueue (queue, options, cb) {
  85. return this.rpc(defs.QueueDeclare,
  86. Args.assertQueue(queue, options),
  87. defs.QueueDeclareOk, cb);
  88. }
  89. checkQueue (queue, cb) {
  90. return this.rpc(defs.QueueDeclare,
  91. Args.checkQueue(queue),
  92. defs.QueueDeclareOk, cb);
  93. }
  94. deleteQueue (queue, options, cb) {
  95. return this.rpc(defs.QueueDelete,
  96. Args.deleteQueue(queue, options),
  97. defs.QueueDeleteOk, cb);
  98. }
  99. purgeQueue (queue, cb) {
  100. return this.rpc(defs.QueuePurge,
  101. Args.purgeQueue(queue),
  102. defs.QueuePurgeOk, cb);
  103. }
  104. bindQueue (queue, source, pattern, argt, cb) {
  105. return this.rpc(defs.QueueBind,
  106. Args.bindQueue(queue, source, pattern, argt),
  107. defs.QueueBindOk, cb);
  108. }
  109. unbindQueue (queue, source, pattern, argt, cb) {
  110. return this.rpc(defs.QueueUnbind,
  111. Args.unbindQueue(queue, source, pattern, argt),
  112. defs.QueueUnbindOk, cb);
  113. }
  114. assertExchange (ex, type, options, cb0) {
  115. var cb = callbackWrapper(this, cb0);
  116. this._rpc(defs.ExchangeDeclare,
  117. Args.assertExchange(ex, type, options),
  118. defs.ExchangeDeclareOk,
  119. function (e, _) { cb(e, { exchange: ex }); });
  120. return this;
  121. }
  122. checkExchange (exchange, cb) {
  123. return this.rpc(defs.ExchangeDeclare,
  124. Args.checkExchange(exchange),
  125. defs.ExchangeDeclareOk, cb);
  126. }
  127. deleteExchange (exchange, options, cb) {
  128. return this.rpc(defs.ExchangeDelete,
  129. Args.deleteExchange(exchange, options),
  130. defs.ExchangeDeleteOk, cb);
  131. }
  132. bindExchange (dest, source, pattern, argt, cb) {
  133. return this.rpc(defs.ExchangeBind,
  134. Args.bindExchange(dest, source, pattern, argt),
  135. defs.ExchangeBindOk, cb);
  136. }
  137. unbindExchange (dest, source, pattern, argt, cb) {
  138. return this.rpc(defs.ExchangeUnbind,
  139. Args.unbindExchange(dest, source, pattern, argt),
  140. defs.ExchangeUnbindOk, cb);
  141. }
  142. publish (exchange, routingKey, content, options) {
  143. var fieldsAndProps = Args.publish(exchange, routingKey, options);
  144. return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
  145. }
  146. sendToQueue (queue, content, options) {
  147. return this.publish('', queue, content, options);
  148. }
  149. consume (queue, callback, options, cb0) {
  150. var cb = callbackWrapper(this, cb0);
  151. var fields = Args.consume(queue, options);
  152. var self = this;
  153. this._rpc(
  154. defs.BasicConsume, fields, defs.BasicConsumeOk,
  155. function (err, ok) {
  156. if (err === null) {
  157. self.registerConsumer(ok.fields.consumerTag, callback);
  158. cb(null, ok.fields);
  159. }
  160. else
  161. cb(err);
  162. });
  163. return this;
  164. }
  165. cancel (consumerTag, cb0) {
  166. var cb = callbackWrapper(this, cb0);
  167. var self = this;
  168. this._rpc(
  169. defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
  170. function (err, ok) {
  171. if (err === null) {
  172. self.unregisterConsumer(consumerTag);
  173. cb(null, ok.fields);
  174. }
  175. else
  176. cb(err);
  177. });
  178. return this;
  179. }
  180. get (queue, options, cb0) {
  181. var self = this;
  182. var fields = Args.get(queue, options);
  183. var cb = callbackWrapper(this, cb0);
  184. this.sendOrEnqueue(defs.BasicGet, fields, function (err, f) {
  185. if (err === null) {
  186. if (f.id === defs.BasicGetEmpty) {
  187. cb(null, false);
  188. }
  189. else if (f.id === defs.BasicGetOk) {
  190. self.handleMessage = acceptMessage(function (m) {
  191. m.fields = f.fields;
  192. cb(null, m);
  193. });
  194. }
  195. else {
  196. cb(new Error("Unexpected response to BasicGet: " +
  197. inspect(f)));
  198. }
  199. }
  200. });
  201. return this;
  202. }
  203. ack (message, allUpTo) {
  204. this.sendImmediately(
  205. defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo));
  206. return this;
  207. }
  208. ackAll () {
  209. this.sendImmediately(defs.BasicAck, Args.ack(0, true));
  210. return this;
  211. }
  212. nack (message, allUpTo, requeue) {
  213. this.sendImmediately(
  214. defs.BasicNack,
  215. Args.nack(message.fields.deliveryTag, allUpTo, requeue));
  216. return this;
  217. }
  218. nackAll (requeue) {
  219. this.sendImmediately(
  220. defs.BasicNack, Args.nack(0, true, requeue));
  221. return this;
  222. }
  223. reject (message, requeue) {
  224. this.sendImmediately(
  225. defs.BasicReject,
  226. Args.reject(message.fields.deliveryTag, requeue));
  227. return this;
  228. }
  229. prefetch (count, global, cb) {
  230. return this.rpc(defs.BasicQos,
  231. Args.prefetch(count, global),
  232. defs.BasicQosOk, cb);
  233. }
  234. recover (cb) {
  235. return this.rpc(defs.BasicRecover,
  236. Args.recover(),
  237. defs.BasicRecoverOk, cb);
  238. }
  239. }
  240. // Wrap an RPC callback to make sure the callback is invoked with
  241. // either `(null, value)` or `(error)`, i.e., never two non-null
  242. // values. Also substitutes a stub if the callback is `undefined` or
  243. // otherwise falsey, for convenience in methods for which the callback
  244. // is optional (that is, most of them).
  245. function callbackWrapper(ch, cb) {
  246. return (cb) ? function(err, ok) {
  247. if (err === null) {
  248. cb(null, ok);
  249. }
  250. else cb(err);
  251. } : function() {};
  252. }
  253. class ConfirmChannel extends Channel {
  254. publish (exchange, routingKey,
  255. content, options, cb) {
  256. this.pushConfirmCallback(cb);
  257. return Channel.prototype.publish.call(
  258. this, exchange, routingKey, content, options);
  259. }
  260. sendToQueue (queue, content,
  261. options, cb) {
  262. return this.publish('', queue, content, options, cb);
  263. }
  264. waitForConfirms (k) {
  265. var awaiting = [];
  266. var unconfirmed = this.unconfirmed;
  267. unconfirmed.forEach(function (val, index) {
  268. if (val === null)
  269. ; // already confirmed
  270. else {
  271. var confirmed = new Promise(function (resolve, reject) {
  272. unconfirmed[index] = function (err) {
  273. if (val)
  274. val(err);
  275. if (err === null)
  276. resolve();
  277. else
  278. reject(err);
  279. };
  280. });
  281. awaiting.push(confirmed);
  282. }
  283. });
  284. return Promise.all(awaiting).then(function () { k(); },
  285. function (err) { k(err); });
  286. }
  287. }
  288. module.exports.CallbackModel = CallbackModel;
  289. module.exports.Channel = Channel;
  290. module.exports.ConfirmChannel = ConfirmChannel;