connection.js 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. 'use strict';
  2. var assert = require('assert');
  3. var defs = require('../lib/defs');
  4. var Connection = require('../lib/connection').Connection;
  5. var HEARTBEAT = require('../lib/frame').HEARTBEAT;
  6. var HB_BUF = require('../lib/frame').HEARTBEAT_BUF;
  7. var util = require('./util');
  8. var succeed = util.succeed, fail = util.fail, latch = util.latch;
  9. var completes = util.completes;
  10. var kCallback = util.kCallback;
  11. var LOG_ERRORS = process.env.LOG_ERRORS;
  12. var OPEN_OPTS = {
  13. // start-ok
  14. 'clientProperties': {},
  15. 'mechanism': 'PLAIN',
  16. 'response': Buffer.from(['', 'guest', 'guest'].join(String.fromCharCode(0))),
  17. 'locale': 'en_US',
  18. // tune-ok
  19. 'channelMax': 0,
  20. 'frameMax': 0,
  21. 'heartbeat': 0,
  22. // open
  23. 'virtualHost': '/',
  24. 'capabilities': '',
  25. 'insist': 0
  26. };
  27. module.exports.OPEN_OPTS = OPEN_OPTS;
  28. function happy_open(send, wait) {
  29. // kick it off
  30. send(defs.ConnectionStart,
  31. {versionMajor: 0,
  32. versionMinor: 9,
  33. serverProperties: {},
  34. mechanisms: Buffer.from('PLAIN'),
  35. locales: Buffer.from('en_US')});
  36. return wait(defs.ConnectionStartOk)()
  37. .then(function(f) {
  38. send(defs.ConnectionTune,
  39. {channelMax: 0,
  40. heartbeat: 0,
  41. frameMax: 0});
  42. })
  43. .then(wait(defs.ConnectionTuneOk))
  44. .then(wait(defs.ConnectionOpen))
  45. .then(function(f) {
  46. send(defs.ConnectionOpenOk,
  47. {knownHosts: ''});
  48. });
  49. }
  50. module.exports.connection_handshake = happy_open;
  51. function connectionTest(client, server) {
  52. return function(done) {
  53. var bothDone = latch(2, done);
  54. var pair = util.socketPair();
  55. var c = new Connection(pair.client);
  56. if (LOG_ERRORS) c.on('error', console.warn);
  57. client(c, bothDone);
  58. // NB only not a race here because the writes are synchronous
  59. var protocolHeader = pair.server.read(8);
  60. assert.deepEqual(Buffer.from("AMQP" + String.fromCharCode(0,0,9,1)),
  61. protocolHeader);
  62. var s = util.runServer(pair.server, function(send, wait) {
  63. server(send, wait, bothDone, pair.server);
  64. });
  65. };
  66. }
  67. suite("Connection errors", function() {
  68. test("socket close during open", function(done) {
  69. // RabbitMQ itself will take at least 3 seconds to close the socket
  70. // in the event of a handshake problem. Instead of using a live
  71. // connection, I'm just going to pretend.
  72. var pair = util.socketPair();
  73. var conn = new Connection(pair.client);
  74. pair.server.on('readable', function() {
  75. pair.server.end();
  76. });
  77. conn.open({}, kCallback(fail(done), succeed(done)));
  78. });
  79. test("bad frame during open", function(done) {
  80. var ss = util.socketPair();
  81. var conn = new (require('../lib/connection').Connection)(ss.client);
  82. ss.server.on('readable', function() {
  83. ss.server.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]));
  84. });
  85. conn.open({}, kCallback(fail(done), succeed(done)));
  86. });
  87. });
  88. suite("Connection open", function() {
  89. test("happy", connectionTest(
  90. function(c, done) {
  91. c.open(OPEN_OPTS, kCallback(succeed(done), fail(done)));
  92. },
  93. function(send, wait, done) {
  94. happy_open(send, wait).then(succeed(done), fail(done));
  95. }));
  96. test("wrong first frame", connectionTest(
  97. function(c, done) {
  98. c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
  99. },
  100. function(send, wait, done) {
  101. // bad server! bad! whatever were you thinking?
  102. completes(function() {
  103. send(defs.ConnectionTune,
  104. {channelMax: 0,
  105. heartbeat: 0,
  106. frameMax: 0});
  107. }, done);
  108. }));
  109. test("unexpected socket close", connectionTest(
  110. function(c, done) {
  111. c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
  112. },
  113. function(send, wait, done, socket) {
  114. send(defs.ConnectionStart,
  115. {versionMajor: 0,
  116. versionMinor: 9,
  117. serverProperties: {},
  118. mechanisms: Buffer.from('PLAIN'),
  119. locales: Buffer.from('en_US')});
  120. return wait(defs.ConnectionStartOk)()
  121. .then(function() {
  122. socket.end();
  123. })
  124. .then(succeed(done), fail(done));
  125. }));
  126. });
  127. suite("Connection running", function() {
  128. test("wrong frame on channel 0", connectionTest(
  129. function(c, done) {
  130. c.on('error', succeed(done));
  131. c.open(OPEN_OPTS);
  132. },
  133. function(send, wait, done) {
  134. happy_open(send, wait)
  135. .then(function() {
  136. // there's actually nothing that would plausibly be sent to a
  137. // just opened connection, so this is violating more than one
  138. // rule. Nonetheless.
  139. send(defs.ChannelOpenOk, {channelId: Buffer.from('')}, 0);
  140. })
  141. .then(wait(defs.ConnectionClose))
  142. .then(function(close) {
  143. send(defs.ConnectionCloseOk, {}, 0);
  144. }).then(succeed(done), fail(done));
  145. }));
  146. test("unopened channel", connectionTest(
  147. function(c, done) {
  148. c.on('error', succeed(done));
  149. c.open(OPEN_OPTS);
  150. },
  151. function(send, wait, done) {
  152. happy_open(send, wait)
  153. .then(function() {
  154. // there's actually nothing that would plausibly be sent to a
  155. // just opened connection, so this is violating more than one
  156. // rule. Nonetheless.
  157. send(defs.ChannelOpenOk, {channelId: Buffer.from('')}, 3);
  158. })
  159. .then(wait(defs.ConnectionClose))
  160. .then(function(close) {
  161. send(defs.ConnectionCloseOk, {}, 0);
  162. }).then(succeed(done), fail(done));
  163. }));
  164. test("unexpected socket close", connectionTest(
  165. function(c, done) {
  166. var errorAndClosed = latch(2, done);
  167. c.on('error', succeed(errorAndClosed));
  168. c.on('close', succeed(errorAndClosed));
  169. c.open(OPEN_OPTS, kCallback(function() {
  170. c.sendHeartbeat();
  171. }, fail(errorAndClosed)));
  172. },
  173. function(send, wait, done, socket) {
  174. happy_open(send, wait)
  175. .then(wait())
  176. .then(function() {
  177. socket.end();
  178. }).then(succeed(done));
  179. }));
  180. test("connection.blocked", connectionTest(
  181. function(c, done) {
  182. c.on('blocked', succeed(done));
  183. c.open(OPEN_OPTS);
  184. },
  185. function(send, wait, done, socket) {
  186. happy_open(send, wait)
  187. .then(function() {
  188. send(defs.ConnectionBlocked, {reason: 'felt like it'}, 0);
  189. })
  190. .then(succeed(done));
  191. }));
  192. test("connection.unblocked", connectionTest(
  193. function(c, done) {
  194. c.on('unblocked', succeed(done));
  195. c.open(OPEN_OPTS);
  196. },
  197. function(send, wait, done, socket) {
  198. happy_open(send, wait)
  199. .then(function() {
  200. send(defs.ConnectionUnblocked, {}, 0);
  201. })
  202. .then(succeed(done));
  203. }));
  204. });
  205. suite("Connection close", function() {
  206. test("happy", connectionTest(
  207. function(c, done0) {
  208. var done = latch(2, done0);
  209. c.on('close', done);
  210. c.open(OPEN_OPTS, kCallback(function(_ok) {
  211. c.close(kCallback(succeed(done), fail(done)));
  212. }, function() {}));
  213. },
  214. function(send, wait, done) {
  215. happy_open(send, wait)
  216. .then(wait(defs.ConnectionClose))
  217. .then(function(close) {
  218. send(defs.ConnectionCloseOk, {});
  219. })
  220. .then(succeed(done), fail(done));
  221. }));
  222. test("interleaved close frames", connectionTest(
  223. function(c, done0) {
  224. var done = latch(2, done0);
  225. c.on('close', done);
  226. c.open(OPEN_OPTS, kCallback(function(_ok) {
  227. c.close(kCallback(succeed(done), fail(done)));
  228. }, done));
  229. },
  230. function(send, wait, done) {
  231. happy_open(send, wait)
  232. .then(wait(defs.ConnectionClose))
  233. .then(function(f) {
  234. send(defs.ConnectionClose, {
  235. replyText: "Ha!",
  236. replyCode: defs.constants.REPLY_SUCCESS,
  237. methodId: 0, classId: 0
  238. });
  239. })
  240. .then(wait(defs.ConnectionCloseOk))
  241. .then(function(f) {
  242. send(defs.ConnectionCloseOk, {});
  243. })
  244. .then(succeed(done), fail(done));
  245. }));
  246. test("server error close", connectionTest(
  247. function(c, done0) {
  248. var done = latch(2, done0);
  249. c.on('close', succeed(done));
  250. c.on('error', succeed(done));
  251. c.open(OPEN_OPTS);
  252. },
  253. function(send, wait, done) {
  254. happy_open(send, wait)
  255. .then(function(f) {
  256. send(defs.ConnectionClose, {
  257. replyText: "Begone",
  258. replyCode: defs.constants.INTERNAL_ERROR,
  259. methodId: 0, classId: 0
  260. });
  261. })
  262. .then(wait(defs.ConnectionCloseOk))
  263. .then(succeed(done), fail(done));
  264. }));
  265. test("operator-intiated close", connectionTest(
  266. function(c, done) {
  267. c.on('close', succeed(done));
  268. c.on('error', fail(done));
  269. c.open(OPEN_OPTS);
  270. },
  271. function(send, wait, done) {
  272. happy_open(send, wait)
  273. .then(function(f) {
  274. send(defs.ConnectionClose, {
  275. replyText: "Begone",
  276. replyCode: defs.constants.CONNECTION_FORCED,
  277. methodId: 0, classId: 0
  278. });
  279. })
  280. .then(wait(defs.ConnectionCloseOk))
  281. .then(succeed(done), fail(done));
  282. }));
  283. test("double close", connectionTest(
  284. function(c, done) {
  285. c.open(OPEN_OPTS, kCallback(function() {
  286. c.close();
  287. // NB no synchronisation, we do this straight away
  288. assert.throws(function() {
  289. c.close();
  290. });
  291. done();
  292. }, done));
  293. },
  294. function(send, wait, done) {
  295. happy_open(send, wait)
  296. .then(wait(defs.ConnectionClose))
  297. .then(function() {
  298. send(defs.ConnectionCloseOk, {});
  299. })
  300. .then(succeed(done), fail(done));
  301. }));
  302. });
  303. suite("heartbeats", function() {
  304. var heartbeat = require('../lib/heartbeat');
  305. setup(function() {
  306. heartbeat.UNITS_TO_MS = 20;
  307. });
  308. teardown(function() {
  309. heartbeat.UNITS_TO_MS = 1000;
  310. });
  311. test("send heartbeat after open", connectionTest(
  312. function(c, done) {
  313. completes(function() {
  314. var opts = Object.create(OPEN_OPTS);
  315. opts.heartbeat = 1;
  316. // Don't leave the error waiting to happen for the next test, this
  317. // confuses mocha awfully
  318. c.on('error', function() {});
  319. c.open(opts);
  320. }, done);
  321. },
  322. function(send, wait, done, socket) {
  323. var timer;
  324. happy_open(send, wait)
  325. .then(function() {
  326. timer = setInterval(function() {
  327. socket.write(HB_BUF);
  328. }, heartbeat.UNITS_TO_MS);
  329. })
  330. .then(wait())
  331. .then(function(hb) {
  332. if (hb === HEARTBEAT) done();
  333. else done("Next frame after silence not a heartbeat");
  334. clearInterval(timer);
  335. });
  336. }));
  337. test("detect lack of heartbeats", connectionTest(
  338. function(c, done) {
  339. var opts = Object.create(OPEN_OPTS);
  340. opts.heartbeat = 1;
  341. c.on('error', succeed(done));
  342. c.open(opts);
  343. },
  344. function(send, wait, done, socket) {
  345. happy_open(send, wait)
  346. .then(succeed(done), fail(done));
  347. // conspicuously not sending anything ...
  348. }));
  349. });