channel.js 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. //
  2. //
  3. //
  4. // Channel machinery.
  5. 'use strict';
  6. var defs = require('./defs');
  7. var closeMsg = require('./format').closeMessage;
  8. var inspect = require('./format').inspect;
  9. var methodName = require('./format').methodName;
  10. var assert = require('assert');
  11. var EventEmitter = require('events');
  12. var fmt = require('util').format;
  13. var IllegalOperationError = require('./error').IllegalOperationError;
  14. var stackCapture = require('./error').stackCapture;
  15. class Channel extends EventEmitter {
  16. constructor (connection) {
  17. super();
  18. this.connection = connection;
  19. // for the presently outstanding RPC
  20. this.reply = null;
  21. // for the RPCs awaiting action
  22. this.pending = [];
  23. // for unconfirmed messages
  24. this.lwm = 1; // the least, unconfirmed deliveryTag
  25. this.unconfirmed = []; // rolling window of delivery callbacks
  26. this.on('ack', this.handleConfirm.bind(this, function (cb) {
  27. if (cb)
  28. cb(null);
  29. }));
  30. this.on('nack', this.handleConfirm.bind(this, function (cb) {
  31. if (cb)
  32. cb(new Error('message nacked'));
  33. }));
  34. this.on('close', function () {
  35. var cb;
  36. while (cb = this.unconfirmed.shift()) {
  37. if (cb)
  38. cb(new Error('channel closed'));
  39. }
  40. });
  41. // message frame state machine
  42. this.handleMessage = acceptDeliveryOrReturn;
  43. }
  44. allocate () {
  45. this.ch = this.connection.freshChannel(this);
  46. return this;
  47. }
  48. // Incoming frames are either notifications of e.g., message delivery,
  49. // or replies to something we've sent. In general I deal with the
  50. // former by emitting an event, and with the latter by keeping a track
  51. // of what's expecting a reply.
  52. //
  53. // The AMQP specification implies that RPCs can't be pipelined; that
  54. // is, you can have only one outstanding RPC on a channel at a
  55. // time. Certainly that's what RabbitMQ and its clients assume. For
  56. // this reason, I buffer RPCs if the channel is already waiting for a
  57. // reply.
  58. // Just send the damn frame.
  59. sendImmediately (method, fields) {
  60. return this.connection.sendMethod(this.ch, method, fields);
  61. }
  62. // Invariant: !this.reply -> pending.length == 0. That is, whenever we
  63. // clear a reply, we must send another RPC (and thereby fill
  64. // this.reply) if there is one waiting. The invariant relevant here
  65. // and in `accept`.
  66. sendOrEnqueue (method, fields, reply) {
  67. if (!this.reply) { // if no reply waiting, we can go
  68. assert(this.pending.length === 0);
  69. this.reply = reply;
  70. this.sendImmediately(method, fields);
  71. }
  72. else {
  73. this.pending.push({
  74. method: method,
  75. fields: fields,
  76. reply: reply
  77. });
  78. }
  79. }
  80. sendMessage (fields, properties, content) {
  81. return this.connection.sendMessage(
  82. this.ch,
  83. defs.BasicPublish, fields,
  84. defs.BasicProperties, properties,
  85. content);
  86. }
  87. // Internal, synchronously resolved RPC; the return value is resolved
  88. // with the whole frame.
  89. _rpc (method, fields, expect, cb) {
  90. var self = this;
  91. function reply (err, f) {
  92. if (err === null) {
  93. if (f.id === expect) {
  94. return cb(null, f);
  95. }
  96. else {
  97. // We have detected a problem, so it's up to us to close the
  98. // channel
  99. var expectedName = methodName(expect);
  100. var e = new Error(fmt("Expected %s; got %s",
  101. expectedName, inspect(f, false)));
  102. self.closeWithError(f.id, fmt('Expected %s; got %s',
  103. expectedName, methodName(f.id)),
  104. defs.constants.UNEXPECTED_FRAME, e);
  105. return cb(e);
  106. }
  107. }
  108. // An error will be given if, for example, this is waiting to be
  109. // sent and the connection closes
  110. else if (err instanceof Error)
  111. return cb(err);
  112. // A close frame will be given if this is the RPC awaiting reply
  113. // and the channel is closed by the server
  114. else {
  115. // otherwise, it's a close frame
  116. var closeReason = (err.fields.classId << 16) + err.fields.methodId;
  117. var e = (method === closeReason)
  118. ? fmt("Operation failed: %s; %s",
  119. methodName(method), closeMsg(err))
  120. : fmt("Channel closed by server: %s", closeMsg(err));
  121. var closeFrameError = new Error(e);
  122. closeFrameError.code = err.fields.replyCode;
  123. closeFrameError.classId = err.fields.classId;
  124. closeFrameError.methodId = err.fields.methodId;
  125. return cb(closeFrameError);
  126. }
  127. }
  128. this.sendOrEnqueue(method, fields, reply);
  129. }
  130. // Move to entirely closed state.
  131. toClosed (capturedStack) {
  132. this._rejectPending();
  133. invalidateSend(this, 'Channel closed', capturedStack);
  134. this.accept = invalidOp('Channel closed', capturedStack);
  135. this.connection.releaseChannel(this.ch);
  136. this.emit('close');
  137. }
  138. // Stop being able to send and receive methods and content. Used when
  139. // we close the channel. Invokes the continuation once the server has
  140. // acknowledged the close, but before the channel is moved to the
  141. // closed state.
  142. toClosing (capturedStack, k) {
  143. var send = this.sendImmediately.bind(this);
  144. invalidateSend(this, 'Channel closing', capturedStack);
  145. this.accept = function (f) {
  146. if (f.id === defs.ChannelCloseOk) {
  147. if (k)
  148. k();
  149. var s = stackCapture('ChannelCloseOk frame received');
  150. this.toClosed(s);
  151. }
  152. else if (f.id === defs.ChannelClose) {
  153. send(defs.ChannelCloseOk, {});
  154. }
  155. // else ignore frame
  156. };
  157. }
  158. _rejectPending () {
  159. function rej (r) {
  160. r(new Error("Channel ended, no reply will be forthcoming"));
  161. }
  162. if (this.reply !== null)
  163. rej(this.reply);
  164. this.reply = null;
  165. var discard;
  166. while (discard = this.pending.shift())
  167. rej(discard.reply);
  168. this.pending = null; // so pushes will break
  169. }
  170. closeBecause (reason, code, k) {
  171. this.sendImmediately(defs.ChannelClose, {
  172. replyText: reason,
  173. replyCode: code,
  174. methodId: 0, classId: 0
  175. });
  176. var s = stackCapture('closeBecause called: ' + reason);
  177. this.toClosing(s, k);
  178. }
  179. // If we close because there's been an error, we need to distinguish
  180. // between what we tell the server (`reason`) and what we report as
  181. // the cause in the client (`error`).
  182. closeWithError (id, reason, code, error) {
  183. var self = this;
  184. this.closeBecause(reason, code, function () {
  185. error.code = code;
  186. // content frames and consumer errors do not provide a method a class/method ID
  187. if (id) {
  188. error.classId = defs.info(id).classId;
  189. error.methodId = defs.info(id).methodId;
  190. }
  191. self.emit('error', error);
  192. });
  193. }
  194. // A trampolining state machine for message frames on a channel. A
  195. // message arrives in at least two frames: first, a method announcing
  196. // the message (either a BasicDeliver or BasicGetOk); then, a message
  197. // header with the message properties; then, zero or more content
  198. // frames.
  199. // Keep the try/catch localised, in an attempt to avoid disabling
  200. // optimisation
  201. acceptMessageFrame (f) {
  202. try {
  203. this.handleMessage = this.handleMessage(f);
  204. }
  205. catch (msg) {
  206. if (typeof msg === 'string') {
  207. this.closeWithError(f.id, msg, defs.constants.UNEXPECTED_FRAME,
  208. new Error(msg));
  209. }
  210. else if (msg instanceof Error) {
  211. this.closeWithError(f.id, 'Error while processing message',
  212. defs.constants.INTERNAL_ERROR, msg);
  213. }
  214. else {
  215. this.closeWithError(f.id, 'Internal error while processing message',
  216. defs.constants.INTERNAL_ERROR,
  217. new Error(msg.toString()));
  218. }
  219. }
  220. }
  221. handleConfirm (handle, f) {
  222. var tag = f.deliveryTag;
  223. var multi = f.multiple;
  224. if (multi) {
  225. var confirmed = this.unconfirmed.splice(0, tag - this.lwm + 1);
  226. this.lwm = tag + 1;
  227. confirmed.forEach(handle);
  228. }
  229. else {
  230. var c;
  231. if (tag === this.lwm) {
  232. c = this.unconfirmed.shift();
  233. this.lwm++;
  234. // Advance the LWM and the window to the next non-gap, or
  235. // possibly to the end
  236. while (this.unconfirmed[0] === null) {
  237. this.unconfirmed.shift();
  238. this.lwm++;
  239. }
  240. }
  241. else {
  242. c = this.unconfirmed[tag - this.lwm];
  243. this.unconfirmed[tag - this.lwm] = null;
  244. }
  245. // Technically, in the single-deliveryTag case, I should report a
  246. // protocol breach if it's already been confirmed.
  247. handle(c);
  248. }
  249. }
  250. pushConfirmCallback (cb) {
  251. // `null` is used specifically for marking already confirmed slots,
  252. // so I coerce `undefined` and `null` to false; functions are never
  253. // falsey.
  254. this.unconfirmed.push(cb || false);
  255. }
  256. onBufferDrain () {
  257. this.emit('drain');
  258. }
  259. accept(f) {
  260. switch (f.id) {
  261. // Message frames
  262. case undefined: // content frame!
  263. case defs.BasicDeliver:
  264. case defs.BasicReturn:
  265. case defs.BasicProperties:
  266. return this.acceptMessageFrame(f);
  267. // confirmations, need to do confirm.select first
  268. case defs.BasicAck:
  269. return this.emit('ack', f.fields);
  270. case defs.BasicNack:
  271. return this.emit('nack', f.fields);
  272. case defs.BasicCancel:
  273. // The broker can send this if e.g., the queue is deleted.
  274. return this.emit('cancel', f.fields);
  275. case defs.ChannelClose:
  276. // Any remote closure is an error to us. Reject the pending reply
  277. // with the close frame, so it can see whether it was that
  278. // operation that caused it to close.
  279. if (this.reply) {
  280. var reply = this.reply; this.reply = null;
  281. reply(f);
  282. }
  283. var emsg = "Channel closed by server: " + closeMsg(f);
  284. this.sendImmediately(defs.ChannelCloseOk, {});
  285. var error = new Error(emsg);
  286. error.code = f.fields.replyCode;
  287. error.classId = f.fields.classId;
  288. error.methodId = f.fields.methodId;
  289. this.emit('error', error);
  290. var s = stackCapture(emsg);
  291. this.toClosed(s);
  292. return;
  293. case defs.BasicFlow:
  294. // RabbitMQ doesn't send this, it just blocks the TCP socket
  295. return this.closeWithError(f.id, "Flow not implemented",
  296. defs.constants.NOT_IMPLEMENTED,
  297. new Error('Flow not implemented'));
  298. default: // assume all other things are replies
  299. // Resolving the reply may lead to another RPC; to make sure we
  300. // don't hold that up, clear this.reply
  301. var reply = this.reply; this.reply = null;
  302. // however, maybe there's an RPC waiting to go? If so, that'll
  303. // fill this.reply again, restoring the invariant. This does rely
  304. // on any response being recv'ed after resolving the promise,
  305. // below; hence, I use synchronous defer.
  306. if (this.pending.length > 0) {
  307. var send = this.pending.shift();
  308. this.reply = send.reply;
  309. this.sendImmediately(send.method, send.fields);
  310. }
  311. return reply(null, f);
  312. }
  313. }
  314. }
  315. // Shutdown protocol. There's three scenarios:
  316. //
  317. // 1. The application decides to shut the channel
  318. // 2. The server decides to shut the channel, possibly because of
  319. // something the application did
  320. // 3. The connection is closing, so there won't be any more frames
  321. // going back and forth.
  322. //
  323. // 1 and 2 involve an exchange of method frames (Close and CloseOk),
  324. // while 3 doesn't; the connection simply says "shutdown" to the
  325. // channel, which then acts as if it's closing, without going through
  326. // the exchange.
  327. function invalidOp(msg, stack) {
  328. return function() {
  329. throw new IllegalOperationError(msg, stack);
  330. };
  331. }
  332. function invalidateSend(ch, msg, stack) {
  333. ch.sendImmediately = ch.sendOrEnqueue = ch.sendMessage =
  334. invalidOp(msg, stack);
  335. }
  336. // Kick off a message delivery given a BasicDeliver or BasicReturn
  337. // frame (BasicGet uses the RPC mechanism)
  338. function acceptDeliveryOrReturn(f) {
  339. var event;
  340. if (f.id === defs.BasicDeliver) event = 'delivery';
  341. else if (f.id === defs.BasicReturn) event = 'return';
  342. else throw fmt("Expected BasicDeliver or BasicReturn; got %s",
  343. inspect(f));
  344. var self = this;
  345. var fields = f.fields;
  346. return acceptMessage(function(message) {
  347. message.fields = fields;
  348. self.emit(event, message);
  349. });
  350. }
  351. // Move to the state of waiting for message frames (headers, then
  352. // one or more content frames)
  353. function acceptMessage(continuation) {
  354. var totalSize = 0, remaining = 0;
  355. var buffers = null;
  356. var message = {
  357. fields: null,
  358. properties: null,
  359. content: null
  360. };
  361. return headers;
  362. // expect a headers frame
  363. function headers(f) {
  364. if (f.id === defs.BasicProperties) {
  365. message.properties = f.fields;
  366. totalSize = remaining = f.size;
  367. // for zero-length messages, content frames aren't required.
  368. if (totalSize === 0) {
  369. message.content = Buffer.alloc(0);
  370. continuation(message);
  371. return acceptDeliveryOrReturn;
  372. }
  373. else {
  374. return content;
  375. }
  376. }
  377. else {
  378. throw "Expected headers frame after delivery";
  379. }
  380. }
  381. // expect a content frame
  382. // %%% TODO cancelled messages (sent as zero-length content frame)
  383. function content(f) {
  384. if (f.content) {
  385. var size = f.content.length;
  386. remaining -= size;
  387. if (remaining === 0) {
  388. if (buffers !== null) {
  389. buffers.push(f.content);
  390. message.content = Buffer.concat(buffers);
  391. }
  392. else {
  393. message.content = f.content;
  394. }
  395. continuation(message);
  396. return acceptDeliveryOrReturn;
  397. }
  398. else if (remaining < 0) {
  399. throw fmt("Too much content sent! Expected %d bytes",
  400. totalSize);
  401. }
  402. else {
  403. if (buffers !== null)
  404. buffers.push(f.content);
  405. else
  406. buffers = [f.content];
  407. return content;
  408. }
  409. }
  410. else throw "Expected content frame after headers"
  411. }
  412. }
  413. // This adds just a bit more stuff useful for the APIs, but not
  414. // low-level machinery.
  415. class BaseChannel extends Channel {
  416. constructor (connection) {
  417. super(connection);
  418. this.consumers = new Map();
  419. }
  420. // Not sure I like the ff, it's going to be changing hidden classes
  421. // all over the place. On the other hand, whaddya do.
  422. registerConsumer (tag, callback) {
  423. this.consumers.set(tag, callback);
  424. }
  425. unregisterConsumer (tag) {
  426. this.consumers.delete(tag);
  427. }
  428. dispatchMessage (fields, message) {
  429. var consumerTag = fields.consumerTag;
  430. var consumer = this.consumers.get(consumerTag);
  431. if (consumer) {
  432. return consumer(message);
  433. }
  434. else {
  435. // %%% Surely a race here
  436. throw new Error("Unknown consumer: " + consumerTag);
  437. }
  438. }
  439. handleDelivery (message) {
  440. return this.dispatchMessage(message.fields, message);
  441. }
  442. handleCancel (fields) {
  443. var result = this.dispatchMessage(fields, null);
  444. this.unregisterConsumer(fields.consumerTag);
  445. return result;
  446. }
  447. }
  448. module.exports.acceptMessage = acceptMessage;
  449. module.exports.BaseChannel = BaseChannel;
  450. module.exports.Channel = Channel;