123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390 |
- 'use strict';
- var assert = require('assert');
- var defs = require('../lib/defs');
- var Connection = require('../lib/connection').Connection;
- var HEARTBEAT = require('../lib/frame').HEARTBEAT;
- var HB_BUF = require('../lib/frame').HEARTBEAT_BUF;
- var util = require('./util');
- var succeed = util.succeed, fail = util.fail, latch = util.latch;
- var completes = util.completes;
- var kCallback = util.kCallback;
- var LOG_ERRORS = process.env.LOG_ERRORS;
- var OPEN_OPTS = {
- // start-ok
- 'clientProperties': {},
- 'mechanism': 'PLAIN',
- 'response': Buffer.from(['', 'guest', 'guest'].join(String.fromCharCode(0))),
- 'locale': 'en_US',
- // tune-ok
- 'channelMax': 0,
- 'frameMax': 0,
- 'heartbeat': 0,
- // open
- 'virtualHost': '/',
- 'capabilities': '',
- 'insist': 0
- };
- module.exports.OPEN_OPTS = OPEN_OPTS;
- function happy_open(send, wait) {
- // kick it off
- send(defs.ConnectionStart,
- {versionMajor: 0,
- versionMinor: 9,
- serverProperties: {},
- mechanisms: Buffer.from('PLAIN'),
- locales: Buffer.from('en_US')});
- return wait(defs.ConnectionStartOk)()
- .then(function(f) {
- send(defs.ConnectionTune,
- {channelMax: 0,
- heartbeat: 0,
- frameMax: 0});
- })
- .then(wait(defs.ConnectionTuneOk))
- .then(wait(defs.ConnectionOpen))
- .then(function(f) {
- send(defs.ConnectionOpenOk,
- {knownHosts: ''});
- });
- }
- module.exports.connection_handshake = happy_open;
- function connectionTest(client, server) {
- return function(done) {
- var bothDone = latch(2, done);
- var pair = util.socketPair();
- var c = new Connection(pair.client);
- if (LOG_ERRORS) c.on('error', console.warn);
- client(c, bothDone);
- // NB only not a race here because the writes are synchronous
- var protocolHeader = pair.server.read(8);
- assert.deepEqual(Buffer.from("AMQP" + String.fromCharCode(0,0,9,1)),
- protocolHeader);
- var s = util.runServer(pair.server, function(send, wait) {
- server(send, wait, bothDone, pair.server);
- });
- };
- }
- suite("Connection errors", function() {
- test("socket close during open", function(done) {
- // RabbitMQ itself will take at least 3 seconds to close the socket
- // in the event of a handshake problem. Instead of using a live
- // connection, I'm just going to pretend.
- var pair = util.socketPair();
- var conn = new Connection(pair.client);
- pair.server.on('readable', function() {
- pair.server.end();
- });
- conn.open({}, kCallback(fail(done), succeed(done)));
- });
- test("bad frame during open", function(done) {
- var ss = util.socketPair();
- var conn = new (require('../lib/connection').Connection)(ss.client);
- ss.server.on('readable', function() {
- ss.server.write(Buffer.from([0, 0, 0, 0, 0, 0, 0, 0, 0, 0]));
- });
- conn.open({}, kCallback(fail(done), succeed(done)));
- });
- });
- suite("Connection open", function() {
- test("happy", connectionTest(
- function(c, done) {
- c.open(OPEN_OPTS, kCallback(succeed(done), fail(done)));
- },
- function(send, wait, done) {
- happy_open(send, wait).then(succeed(done), fail(done));
- }));
- test("wrong first frame", connectionTest(
- function(c, done) {
- c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
- },
- function(send, wait, done) {
- // bad server! bad! whatever were you thinking?
- completes(function() {
- send(defs.ConnectionTune,
- {channelMax: 0,
- heartbeat: 0,
- frameMax: 0});
- }, done);
- }));
- test("unexpected socket close", connectionTest(
- function(c, done) {
- c.open(OPEN_OPTS, kCallback(fail(done), succeed(done)));
- },
- function(send, wait, done, socket) {
- send(defs.ConnectionStart,
- {versionMajor: 0,
- versionMinor: 9,
- serverProperties: {},
- mechanisms: Buffer.from('PLAIN'),
- locales: Buffer.from('en_US')});
- return wait(defs.ConnectionStartOk)()
- .then(function() {
- socket.end();
- })
- .then(succeed(done), fail(done));
- }));
- });
- suite("Connection running", function() {
- test("wrong frame on channel 0", connectionTest(
- function(c, done) {
- c.on('error', succeed(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(function() {
- // there's actually nothing that would plausibly be sent to a
- // just opened connection, so this is violating more than one
- // rule. Nonetheless.
- send(defs.ChannelOpenOk, {channelId: Buffer.from('')}, 0);
- })
- .then(wait(defs.ConnectionClose))
- .then(function(close) {
- send(defs.ConnectionCloseOk, {}, 0);
- }).then(succeed(done), fail(done));
- }));
- test("unopened channel", connectionTest(
- function(c, done) {
- c.on('error', succeed(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(function() {
- // there's actually nothing that would plausibly be sent to a
- // just opened connection, so this is violating more than one
- // rule. Nonetheless.
- send(defs.ChannelOpenOk, {channelId: Buffer.from('')}, 3);
- })
- .then(wait(defs.ConnectionClose))
- .then(function(close) {
- send(defs.ConnectionCloseOk, {}, 0);
- }).then(succeed(done), fail(done));
- }));
- test("unexpected socket close", connectionTest(
- function(c, done) {
- var errorAndClosed = latch(2, done);
- c.on('error', succeed(errorAndClosed));
- c.on('close', succeed(errorAndClosed));
- c.open(OPEN_OPTS, kCallback(function() {
- c.sendHeartbeat();
- }, fail(errorAndClosed)));
- },
- function(send, wait, done, socket) {
- happy_open(send, wait)
- .then(wait())
- .then(function() {
- socket.end();
- }).then(succeed(done));
- }));
- test("connection.blocked", connectionTest(
- function(c, done) {
- c.on('blocked', succeed(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done, socket) {
- happy_open(send, wait)
- .then(function() {
- send(defs.ConnectionBlocked, {reason: 'felt like it'}, 0);
- })
- .then(succeed(done));
- }));
- test("connection.unblocked", connectionTest(
- function(c, done) {
- c.on('unblocked', succeed(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done, socket) {
- happy_open(send, wait)
- .then(function() {
- send(defs.ConnectionUnblocked, {}, 0);
- })
- .then(succeed(done));
- }));
- });
- suite("Connection close", function() {
- test("happy", connectionTest(
- function(c, done0) {
- var done = latch(2, done0);
- c.on('close', done);
- c.open(OPEN_OPTS, kCallback(function(_ok) {
- c.close(kCallback(succeed(done), fail(done)));
- }, function() {}));
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(wait(defs.ConnectionClose))
- .then(function(close) {
- send(defs.ConnectionCloseOk, {});
- })
- .then(succeed(done), fail(done));
- }));
- test("interleaved close frames", connectionTest(
- function(c, done0) {
- var done = latch(2, done0);
- c.on('close', done);
- c.open(OPEN_OPTS, kCallback(function(_ok) {
- c.close(kCallback(succeed(done), fail(done)));
- }, done));
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(wait(defs.ConnectionClose))
- .then(function(f) {
- send(defs.ConnectionClose, {
- replyText: "Ha!",
- replyCode: defs.constants.REPLY_SUCCESS,
- methodId: 0, classId: 0
- });
- })
- .then(wait(defs.ConnectionCloseOk))
- .then(function(f) {
- send(defs.ConnectionCloseOk, {});
- })
- .then(succeed(done), fail(done));
- }));
- test("server error close", connectionTest(
- function(c, done0) {
- var done = latch(2, done0);
- c.on('close', succeed(done));
- c.on('error', succeed(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(function(f) {
- send(defs.ConnectionClose, {
- replyText: "Begone",
- replyCode: defs.constants.INTERNAL_ERROR,
- methodId: 0, classId: 0
- });
- })
- .then(wait(defs.ConnectionCloseOk))
- .then(succeed(done), fail(done));
- }));
- test("operator-intiated close", connectionTest(
- function(c, done) {
- c.on('close', succeed(done));
- c.on('error', fail(done));
- c.open(OPEN_OPTS);
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(function(f) {
- send(defs.ConnectionClose, {
- replyText: "Begone",
- replyCode: defs.constants.CONNECTION_FORCED,
- methodId: 0, classId: 0
- });
- })
- .then(wait(defs.ConnectionCloseOk))
- .then(succeed(done), fail(done));
- }));
- test("double close", connectionTest(
- function(c, done) {
- c.open(OPEN_OPTS, kCallback(function() {
- c.close();
- // NB no synchronisation, we do this straight away
- assert.throws(function() {
- c.close();
- });
- done();
- }, done));
- },
- function(send, wait, done) {
- happy_open(send, wait)
- .then(wait(defs.ConnectionClose))
- .then(function() {
- send(defs.ConnectionCloseOk, {});
- })
- .then(succeed(done), fail(done));
- }));
- });
- suite("heartbeats", function() {
- var heartbeat = require('../lib/heartbeat');
- setup(function() {
- heartbeat.UNITS_TO_MS = 20;
- });
- teardown(function() {
- heartbeat.UNITS_TO_MS = 1000;
- });
- test("send heartbeat after open", connectionTest(
- function(c, done) {
- completes(function() {
- var opts = Object.create(OPEN_OPTS);
- opts.heartbeat = 1;
- // Don't leave the error waiting to happen for the next test, this
- // confuses mocha awfully
- c.on('error', function() {});
- c.open(opts);
- }, done);
- },
- function(send, wait, done, socket) {
- var timer;
- happy_open(send, wait)
- .then(function() {
- timer = setInterval(function() {
- socket.write(HB_BUF);
- }, heartbeat.UNITS_TO_MS);
- })
- .then(wait())
- .then(function(hb) {
- if (hb === HEARTBEAT) done();
- else done("Next frame after silence not a heartbeat");
- clearInterval(timer);
- });
- }));
- test("detect lack of heartbeats", connectionTest(
- function(c, done) {
- var opts = Object.create(OPEN_OPTS);
- opts.heartbeat = 1;
- c.on('error', succeed(done));
- c.open(opts);
- },
- function(send, wait, done, socket) {
- happy_open(send, wait)
- .then(succeed(done), fail(done));
- // conspicuously not sending anything ...
- }));
- });
|