connection.js 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675
  1. //
  2. //
  3. //
  4. 'use strict';
  5. var defs = require('./defs');
  6. var constants = defs.constants;
  7. var frame = require('./frame');
  8. var HEARTBEAT = frame.HEARTBEAT;
  9. var Mux = require('./mux').Mux;
  10. var Duplex = require('stream').Duplex;
  11. var EventEmitter = require('events');
  12. var Heart = require('./heartbeat').Heart;
  13. var methodName = require('./format').methodName;
  14. var closeMsg = require('./format').closeMessage;
  15. var inspect = require('./format').inspect;
  16. var BitSet = require('./bitset').BitSet;
  17. var fmt = require('util').format;
  18. var PassThrough = require('stream').PassThrough;
  19. var IllegalOperationError = require('./error').IllegalOperationError;
  20. var stackCapture = require('./error').stackCapture;
  21. // High-water mark for channel write buffers, in 'objects' (which are
  22. // encoded frames as buffers).
  23. var DEFAULT_WRITE_HWM = 1024;
  24. // If all the frames of a message (method, properties, content) total
  25. // to less than this, copy them into a single buffer and write it all
  26. // at once. Note that this is less than the minimum frame size: if it
  27. // was greater, we might have to fragment the content.
  28. var SINGLE_CHUNK_THRESHOLD = 2048;
  29. class Connection extends EventEmitter {
  30. constructor (underlying) {
  31. super();
  32. var stream = this.stream = wrapStream(underlying);
  33. this.muxer = new Mux(stream);
  34. // frames
  35. this.rest = Buffer.alloc(0);
  36. this.frameMax = constants.FRAME_MIN_SIZE;
  37. this.sentSinceLastCheck = false;
  38. this.recvSinceLastCheck = false;
  39. this.expectSocketClose = false;
  40. this.freeChannels = new BitSet();
  41. this.channels = [{
  42. channel: { accept: channel0(this) },
  43. buffer: underlying
  44. }];
  45. }
  46. // This changed between versions, as did the codec, methods, etc. AMQP
  47. // 0-9-1 is fairly similar to 0.8, but better, and nothing implements
  48. // 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
  49. // much sense to generalise here.
  50. sendProtocolHeader () {
  51. this.sendBytes(frame.PROTOCOL_HEADER);
  52. }
  53. /*
  54. The frighteningly complicated opening protocol (spec section 2.2.4):
  55. Client -> Server
  56. protocol header ->
  57. <- start
  58. start-ok ->
  59. .. next two zero or more times ..
  60. <- secure
  61. secure-ok ->
  62. <- tune
  63. tune-ok ->
  64. open ->
  65. <- open-ok
  66. If I'm only supporting SASL's PLAIN mechanism (which I am for the time
  67. being), it gets a bit easier since the server won't in general send
  68. back a `secure`, it'll just send `tune` after the `start-ok`.
  69. (SASL PLAIN: http://tools.ietf.org/html/rfc4616)
  70. */
  71. open (allFields, openCallback0) {
  72. var self = this;
  73. var openCallback = openCallback0 || function () { };
  74. // This is where we'll put our negotiated values
  75. var tunedOptions = Object.create(allFields);
  76. function wait (k) {
  77. self.step(function (err, frame) {
  78. if (err !== null)
  79. bail(err);
  80. else if (frame.channel !== 0) {
  81. bail(new Error(
  82. fmt("Frame on channel != 0 during handshake: %s",
  83. inspect(frame, false))));
  84. }
  85. else
  86. k(frame);
  87. });
  88. }
  89. function expect (Method, k) {
  90. wait(function (frame) {
  91. if (frame.id === Method)
  92. k(frame);
  93. else {
  94. bail(new Error(
  95. fmt("Expected %s; got %s",
  96. methodName(Method), inspect(frame, false))));
  97. }
  98. });
  99. }
  100. function bail (err) {
  101. openCallback(err);
  102. }
  103. function send (Method) {
  104. // This can throw an exception if there's some problem with the
  105. // options; e.g., something is a string instead of a number.
  106. self.sendMethod(0, Method, tunedOptions);
  107. }
  108. function negotiate (server, desired) {
  109. // We get sent values for channelMax, frameMax and heartbeat,
  110. // which we may accept or lower (subject to a minimum for
  111. // frameMax, but we'll leave that to the server to enforce). In
  112. // all cases, `0` really means "no limit", or rather the highest
  113. // value in the encoding, e.g., unsigned short for channelMax.
  114. if (server === 0 || desired === 0) {
  115. // i.e., whichever places a limit, if either
  116. return Math.max(server, desired);
  117. }
  118. else {
  119. return Math.min(server, desired);
  120. }
  121. }
  122. function onStart (start) {
  123. var mechanisms = start.fields.mechanisms.toString().split(' ');
  124. if (mechanisms.indexOf(allFields.mechanism) < 0) {
  125. bail(new Error(fmt('SASL mechanism %s is not provided by the server',
  126. allFields.mechanism)));
  127. return;
  128. }
  129. self.serverProperties = start.fields.serverProperties;
  130. try {
  131. send(defs.ConnectionStartOk);
  132. } catch (err) {
  133. bail(err);
  134. return;
  135. }
  136. wait(afterStartOk);
  137. }
  138. function afterStartOk (reply) {
  139. switch (reply.id) {
  140. case defs.ConnectionSecure:
  141. bail(new Error(
  142. "Wasn't expecting to have to go through secure"));
  143. break;
  144. case defs.ConnectionClose:
  145. bail(new Error(fmt("Handshake terminated by server: %s",
  146. closeMsg(reply))));
  147. break;
  148. case defs.ConnectionTune:
  149. var fields = reply.fields;
  150. tunedOptions.frameMax =
  151. negotiate(fields.frameMax, allFields.frameMax);
  152. tunedOptions.channelMax =
  153. negotiate(fields.channelMax, allFields.channelMax);
  154. tunedOptions.heartbeat =
  155. negotiate(fields.heartbeat, allFields.heartbeat);
  156. try {
  157. send(defs.ConnectionTuneOk);
  158. send(defs.ConnectionOpen);
  159. } catch (err) {
  160. bail(err);
  161. return;
  162. }
  163. expect(defs.ConnectionOpenOk, onOpenOk);
  164. break;
  165. default:
  166. bail(new Error(
  167. fmt("Expected connection.secure, connection.close, " +
  168. "or connection.tune during handshake; got %s",
  169. inspect(reply, false))));
  170. break;
  171. }
  172. }
  173. function onOpenOk (openOk) {
  174. // Impose the maximum of the encoded value, if the negotiated
  175. // value is zero, meaning "no, no limits"
  176. self.channelMax = tunedOptions.channelMax || 0xffff;
  177. self.frameMax = tunedOptions.frameMax || 0xffffffff;
  178. // 0 means "no heartbeat", rather than "maximum period of
  179. // heartbeating"
  180. self.heartbeat = tunedOptions.heartbeat;
  181. self.heartbeater = self.startHeartbeater();
  182. self.accept = mainAccept;
  183. succeed(openOk);
  184. }
  185. // If the server closes the connection, it's probably because of
  186. // something we did
  187. function endWhileOpening (err) {
  188. bail(err || new Error('Socket closed abruptly ' +
  189. 'during opening handshake'));
  190. }
  191. this.stream.on('end', endWhileOpening);
  192. this.stream.on('error', endWhileOpening);
  193. function succeed (ok) {
  194. self.stream.removeListener('end', endWhileOpening);
  195. self.stream.removeListener('error', endWhileOpening);
  196. self.stream.on('error', self.onSocketError.bind(self));
  197. self.stream.on('end', self.onSocketError.bind(
  198. self, new Error('Unexpected close')));
  199. self.on('frameError', self.onSocketError.bind(self));
  200. self.acceptLoop();
  201. openCallback(null, ok);
  202. }
  203. // Now kick off the handshake by prompting the server
  204. this.sendProtocolHeader();
  205. expect(defs.ConnectionStart, onStart);
  206. }
  207. // Closing things: AMQP has a closing handshake that applies to
  208. // closing both connects and channels. As the initiating party, I send
  209. // Close, then ignore all frames until I see either CloseOK --
  210. // which signifies that the other party has seen the Close and shut
  211. // the connection or channel down, so it's fine to free resources; or
  212. // Close, which means the other party also wanted to close the
  213. // whatever, and I should send CloseOk so it can free resources,
  214. // then go back to waiting for the CloseOk. If I receive a Close
  215. // out of the blue, I should throw away any unsent frames (they will
  216. // be ignored anyway) and send CloseOk, then clean up resources. In
  217. // general, Close out of the blue signals an error (or a forced
  218. // closure, which may as well be an error).
  219. //
  220. // RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
  221. // | | [3]
  222. // | +------ send CloseOk ------+
  223. // recv Close recv CloseOk
  224. // | |
  225. // V V
  226. // Ended [4] ---- send CloseOk ---> Closed [5]
  227. //
  228. // [1] All frames accepted; getting a Close frame from the server
  229. // moves to Ended; client may initiate a close by sending Close
  230. // itself.
  231. // [2] Client has initiated a close; only CloseOk or (simulataneously
  232. // sent) Close is accepted.
  233. // [3] Simultaneous close
  234. // [4] Server won't send any more frames; accept no more frames, send
  235. // CloseOk.
  236. // [5] Fully closed, client will send no more, server will send no
  237. // more. Signal 'close' or 'error'.
  238. //
  239. // There are two signalling mechanisms used in the API. The first is
  240. // that calling `close` will return a promise, that will either
  241. // resolve once the connection or channel is cleanly shut down, or
  242. // will reject if the shutdown times out.
  243. //
  244. // The second is the 'close' and 'error' events. These are
  245. // emitted as above. The events will fire *before* promises are
  246. // resolved.
  247. // Close the connection without even giving a reason. Typical.
  248. close (closeCallback) {
  249. var k = closeCallback && function () { closeCallback(null); };
  250. this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k);
  251. }
  252. // Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
  253. // ignores these; maybe it logs them. The continuation will be invoked
  254. // when the CloseOk has been received, and before the 'close' event.
  255. closeBecause (reason, code, k) {
  256. this.sendMethod(0, defs.ConnectionClose, {
  257. replyText: reason,
  258. replyCode: code,
  259. methodId: 0, classId: 0
  260. });
  261. var s = stackCapture('closeBecause called: ' + reason);
  262. this.toClosing(s, k);
  263. }
  264. closeWithError (reason, code, error) {
  265. this.emit('error', error);
  266. this.closeBecause(reason, code);
  267. }
  268. onSocketError (err) {
  269. if (!this.expectSocketClose) {
  270. // forestall any more calls to onSocketError, since we're signed
  271. // up for `'error'` *and* `'end'`
  272. this.expectSocketClose = true;
  273. this.emit('error', err);
  274. var s = stackCapture('Socket error');
  275. this.toClosed(s, err);
  276. }
  277. }
  278. // A close has been initiated. Repeat: a close has been initiated.
  279. // This means we should not send more frames, anyway they will be
  280. // ignored. We also have to shut down all the channels.
  281. toClosing (capturedStack, k) {
  282. var send = this.sendMethod.bind(this);
  283. this.accept = function (f) {
  284. if (f.id === defs.ConnectionCloseOk) {
  285. if (k)
  286. k();
  287. var s = stackCapture('ConnectionCloseOk received');
  288. this.toClosed(s, undefined);
  289. }
  290. else if (f.id === defs.ConnectionClose) {
  291. send(0, defs.ConnectionCloseOk, {});
  292. }
  293. // else ignore frame
  294. };
  295. invalidateSend(this, 'Connection closing', capturedStack);
  296. }
  297. _closeChannels (capturedStack) {
  298. for (var i = 1; i < this.channels.length; i++) {
  299. var ch = this.channels[i];
  300. if (ch !== null) {
  301. ch.channel.toClosed(capturedStack); // %%% or with an error? not clear
  302. }
  303. }
  304. }
  305. // A close has been confirmed. Cease all communication.
  306. toClosed (capturedStack, maybeErr) {
  307. this._closeChannels(capturedStack);
  308. var info = fmt('Connection closed (%s)',
  309. (maybeErr) ? maybeErr.toString() : 'by client');
  310. // Tidy up, invalidate enverything, dynamite the bridges.
  311. invalidateSend(this, info, capturedStack);
  312. this.accept = invalidOp(info, capturedStack);
  313. this.close = function (cb) {
  314. cb && cb(new IllegalOperationError(info, capturedStack));
  315. };
  316. if (this.heartbeater)
  317. this.heartbeater.clear();
  318. // This is certainly true now, if it wasn't before
  319. this.expectSocketClose = true;
  320. this.stream.end();
  321. this.emit('close', maybeErr);
  322. }
  323. _updateSecret(newSecret, reason, cb) {
  324. this.sendMethod(0, defs.ConnectionUpdateSecret, {
  325. newSecret,
  326. reason
  327. });
  328. this.once('update-secret-ok', cb);
  329. }
  330. // ===
  331. startHeartbeater () {
  332. if (this.heartbeat === 0)
  333. return null;
  334. else {
  335. var self = this;
  336. var hb = new Heart(this.heartbeat,
  337. this.checkSend.bind(this),
  338. this.checkRecv.bind(this));
  339. hb.on('timeout', function () {
  340. var hberr = new Error("Heartbeat timeout");
  341. self.emit('error', hberr);
  342. var s = stackCapture('Heartbeat timeout');
  343. self.toClosed(s, hberr);
  344. });
  345. hb.on('beat', function () {
  346. self.sendHeartbeat();
  347. });
  348. return hb;
  349. }
  350. }
  351. // I use an array to keep track of the channels, rather than an
  352. // object. The channel identifiers are numbers, and allocated by the
  353. // connection. If I try to allocate low numbers when they are
  354. // available (which I do, by looking from the start of the bitset),
  355. // this ought to keep the array small, and out of 'sparse array
  356. // storage'. I also set entries to null, rather than deleting them, in
  357. // the expectation that the next channel allocation will fill the slot
  358. // again rather than growing the array. See
  359. // http://www.html5rocks.com/en/tutorials/speed/v8/
  360. freshChannel (channel, options) {
  361. var next = this.freeChannels.nextClearBit(1);
  362. if (next < 0 || next > this.channelMax)
  363. throw new Error("No channels left to allocate");
  364. this.freeChannels.set(next);
  365. var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM;
  366. var writeBuffer = new PassThrough({
  367. objectMode: true, highWaterMark: hwm
  368. });
  369. this.channels[next] = { channel: channel, buffer: writeBuffer };
  370. writeBuffer.on('drain', function () {
  371. channel.onBufferDrain();
  372. });
  373. this.muxer.pipeFrom(writeBuffer);
  374. return next;
  375. }
  376. releaseChannel (channel) {
  377. this.freeChannels.clear(channel);
  378. var buffer = this.channels[channel].buffer;
  379. buffer.end(); // will also cause it to be unpiped
  380. this.channels[channel] = null;
  381. }
  382. acceptLoop () {
  383. var self = this;
  384. function go () {
  385. try {
  386. var f; while (f = self.recvFrame())
  387. self.accept(f);
  388. }
  389. catch (e) {
  390. self.emit('frameError', e);
  391. }
  392. }
  393. self.stream.on('readable', go);
  394. go();
  395. }
  396. step (cb) {
  397. var self = this;
  398. function recv () {
  399. var f;
  400. try {
  401. f = self.recvFrame();
  402. }
  403. catch (e) {
  404. cb(e, null);
  405. return;
  406. }
  407. if (f)
  408. cb(null, f);
  409. else
  410. self.stream.once('readable', recv);
  411. }
  412. recv();
  413. }
  414. checkSend () {
  415. var check = this.sentSinceLastCheck;
  416. this.sentSinceLastCheck = false;
  417. return check;
  418. }
  419. checkRecv () {
  420. var check = this.recvSinceLastCheck;
  421. this.recvSinceLastCheck = false;
  422. return check;
  423. }
  424. sendBytes (bytes) {
  425. this.sentSinceLastCheck = true;
  426. this.stream.write(bytes);
  427. }
  428. sendHeartbeat () {
  429. return this.sendBytes(frame.HEARTBEAT_BUF);
  430. }
  431. sendMethod (channel, Method, fields) {
  432. var frame = encodeMethod(Method, channel, fields);
  433. this.sentSinceLastCheck = true;
  434. var buffer = this.channels[channel].buffer;
  435. return buffer.write(frame);
  436. }
  437. sendMessage (channel, Method, fields, Properties, props, content) {
  438. if (!Buffer.isBuffer(content))
  439. throw new TypeError('content is not a buffer');
  440. var mframe = encodeMethod(Method, channel, fields);
  441. var pframe = encodeProperties(Properties, channel,
  442. content.length, props);
  443. var buffer = this.channels[channel].buffer;
  444. this.sentSinceLastCheck = true;
  445. var methodHeaderLen = mframe.length + pframe.length;
  446. var bodyLen = (content.length > 0) ?
  447. content.length + FRAME_OVERHEAD : 0;
  448. var allLen = methodHeaderLen + bodyLen;
  449. if (allLen < SINGLE_CHUNK_THRESHOLD) {
  450. // Use `allocUnsafe` to avoid excessive allocations and CPU usage
  451. // from zeroing. The returned Buffer is not zeroed and so must be
  452. // completely filled to be used safely.
  453. // See https://github.com/amqp-node/amqplib/pull/695
  454. var all = Buffer.allocUnsafe(allLen);
  455. var offset = mframe.copy(all, 0);
  456. offset += pframe.copy(all, offset);
  457. if (bodyLen > 0)
  458. makeBodyFrame(channel, content).copy(all, offset);
  459. return buffer.write(all);
  460. }
  461. else {
  462. if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
  463. // Use `allocUnsafe` to avoid excessive allocations and CPU usage
  464. // from zeroing. The returned Buffer is not zeroed and so must be
  465. // completely filled to be used safely.
  466. // See https://github.com/amqp-node/amqplib/pull/695
  467. var both = Buffer.allocUnsafe(methodHeaderLen);
  468. var offset = mframe.copy(both, 0);
  469. pframe.copy(both, offset);
  470. buffer.write(both);
  471. }
  472. else {
  473. buffer.write(mframe);
  474. buffer.write(pframe);
  475. }
  476. return this.sendContent(channel, content);
  477. }
  478. }
  479. sendContent (channel, body) {
  480. if (!Buffer.isBuffer(body)) {
  481. throw new TypeError(fmt("Expected buffer; got %s", body));
  482. }
  483. var writeResult = true;
  484. var buffer = this.channels[channel].buffer;
  485. var maxBody = this.frameMax - FRAME_OVERHEAD;
  486. for (var offset = 0; offset < body.length; offset += maxBody) {
  487. var end = offset + maxBody;
  488. var slice = (end > body.length) ? body.subarray(offset) : body.subarray(offset, end);
  489. var bodyFrame = makeBodyFrame(channel, slice);
  490. writeResult = buffer.write(bodyFrame);
  491. }
  492. this.sentSinceLastCheck = true;
  493. return writeResult;
  494. }
  495. recvFrame () {
  496. // %%% identifying invariants might help here?
  497. var frame = parseFrame(this.rest, this.frameMax);
  498. if (!frame) {
  499. var incoming = this.stream.read();
  500. if (incoming === null) {
  501. return false;
  502. }
  503. else {
  504. this.recvSinceLastCheck = true;
  505. this.rest = Buffer.concat([this.rest, incoming]);
  506. return this.recvFrame();
  507. }
  508. }
  509. else {
  510. this.rest = frame.rest;
  511. return decodeFrame(frame);
  512. }
  513. }
  514. }
  515. // Usual frame accept mode
  516. function mainAccept(frame) {
  517. var rec = this.channels[frame.channel];
  518. if (rec) { return rec.channel.accept(frame); }
  519. // NB CHANNEL_ERROR may not be right, but I don't know what is ..
  520. else
  521. this.closeWithError(
  522. fmt('Frame on unknown channel %d', frame.channel),
  523. constants.CHANNEL_ERROR,
  524. new Error(fmt("Frame on unknown channel: %s",
  525. inspect(frame, false))));
  526. }
  527. // Handle anything that comes through on channel 0, that's the
  528. // connection control channel. This is only used once mainAccept is
  529. // installed as the frame handler, after the opening handshake.
  530. function channel0(connection) {
  531. return function(f) {
  532. // Once we get a 'close', we know 1. we'll get no more frames, and
  533. // 2. anything we send except close, or close-ok, will be
  534. // ignored. If we already sent 'close', this won't be invoked since
  535. // we're already in closing mode; if we didn't well we're not going
  536. // to send it now are we.
  537. if (f === HEARTBEAT); // ignore; it's already counted as activity
  538. // on the socket, which is its purpose
  539. else if (f.id === defs.ConnectionClose) {
  540. // Oh. OK. I guess we're done here then.
  541. connection.sendMethod(0, defs.ConnectionCloseOk, {});
  542. var emsg = fmt('Connection closed: %s', closeMsg(f));
  543. var s = stackCapture(emsg);
  544. var e = new Error(emsg);
  545. e.code = f.fields.replyCode;
  546. if (isFatalError(e)) {
  547. connection.emit('error', e);
  548. }
  549. connection.toClosed(s, e);
  550. }
  551. else if (f.id === defs.ConnectionBlocked) {
  552. connection.emit('blocked', f.fields.reason);
  553. }
  554. else if (f.id === defs.ConnectionUnblocked) {
  555. connection.emit('unblocked');
  556. }
  557. else if (f.id === defs.ConnectionUpdateSecretOk) {
  558. connection.emit('update-secret-ok');
  559. }
  560. else {
  561. connection.closeWithError(
  562. fmt("Unexpected frame on channel 0"),
  563. constants.UNEXPECTED_FRAME,
  564. new Error(fmt("Unexpected frame on channel 0: %s",
  565. inspect(f, false))));
  566. }
  567. };
  568. }
  569. function invalidOp(msg, stack) {
  570. return function() {
  571. throw new IllegalOperationError(msg, stack);
  572. };
  573. }
  574. function invalidateSend(conn, msg, stack) {
  575. conn.sendMethod = conn.sendContent = conn.sendMessage =
  576. invalidOp(msg, stack);
  577. }
  578. var encodeMethod = defs.encodeMethod;
  579. var encodeProperties = defs.encodeProperties;
  580. var FRAME_OVERHEAD = defs.FRAME_OVERHEAD;
  581. var makeBodyFrame = frame.makeBodyFrame;
  582. var parseFrame = frame.parseFrame;
  583. var decodeFrame = frame.decodeFrame;
  584. function wrapStream(s) {
  585. if (s instanceof Duplex) return s;
  586. else {
  587. var ws = new Duplex();
  588. ws.wrap(s); //wraps the readable side of things
  589. ws._write = function(chunk, encoding, callback) {
  590. return s.write(chunk, encoding, callback);
  591. };
  592. return ws;
  593. }
  594. }
  595. function isFatalError(error) {
  596. switch (error && error.code) {
  597. case defs.constants.CONNECTION_FORCED:
  598. case defs.constants.REPLY_SUCCESS:
  599. return false;
  600. default:
  601. return true;
  602. }
  603. }
  604. module.exports.Connection = Connection;
  605. module.exports.isFatalError = isFatalError;