123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- //
- //
- //
- // General-purpose API for glueing everything together.
- 'use strict';
- var URL = require('url-parse');
- var QS = require('querystring');
- var Connection = require('./connection').Connection;
- var fmt = require('util').format;
- var credentials = require('./credentials');
- function copyInto(obj, target) {
- var keys = Object.keys(obj);
- var i = keys.length;
- while (i--) {
- var k = keys[i];
- target[k] = obj[k];
- }
- return target;
- }
- // Adapted from util._extend, which is too fringe to use.
- function clone(obj) {
- return copyInto(obj, {});
- }
- var CLIENT_PROPERTIES = {
- "product": "amqplib",
- "version": require('../package.json').version,
- "platform": fmt('Node.JS %s', process.version),
- "information": "http://squaremo.github.io/amqp.node",
- "capabilities": {
- "publisher_confirms": true,
- "exchange_exchange_bindings": true,
- "basic.nack": true,
- "consumer_cancel_notify": true,
- "connection.blocked": true,
- "authentication_failure_close": true
- }
- };
- // Construct the main frames used in the opening handshake
- function openFrames(vhost, query, credentials, extraClientProperties) {
- if (!vhost)
- vhost = '/';
- else
- vhost = QS.unescape(vhost);
- var query = query || {};
- function intOrDefault(val, def) {
- return (val === undefined) ? def : parseInt(val);
- }
- var clientProperties = Object.create(CLIENT_PROPERTIES);
- return {
- // start-ok
- 'clientProperties': copyInto(extraClientProperties, clientProperties),
- 'mechanism': credentials.mechanism,
- 'response': credentials.response(),
- 'locale': query.locale || 'en_US',
- // tune-ok
- 'channelMax': intOrDefault(query.channelMax, 0),
- 'frameMax': intOrDefault(query.frameMax, 0x1000),
- 'heartbeat': intOrDefault(query.heartbeat, 0),
- // open
- 'virtualHost': vhost,
- 'capabilities': '',
- 'insist': 0
- };
- }
- // Decide on credentials based on what we're supplied.
- function credentialsFromUrl(parts) {
- var user = 'guest', passwd = 'guest';
- if (parts.username != '' || parts.password != '') {
- user = (parts.username) ? unescape(parts.username) : '';
- passwd = (parts.password) ? unescape(parts.password) : '';
- }
- return credentials.plain(user, passwd);
- }
- function connect(url, socketOptions, openCallback) {
- // tls.connect uses `util._extend()` on the options given it, which
- // copies only properties mentioned in `Object.keys()`, when
- // processing the options. So I have to make copies too, rather
- // than using `Object.create()`.
- var sockopts = clone(socketOptions || {});
- url = url || 'amqp://localhost';
- var noDelay = !!sockopts.noDelay;
- var timeout = sockopts.timeout;
- var keepAlive = !!sockopts.keepAlive;
- // 0 is default for node
- var keepAliveDelay = sockopts.keepAliveDelay || 0;
- var extraClientProperties = sockopts.clientProperties || {};
- var protocol, fields;
- if (typeof url === 'object') {
- protocol = (url.protocol || 'amqp') + ':';
- sockopts.host = url.hostname;
- sockopts.servername = sockopts.servername || url.hostname;
- sockopts.port = url.port || ((protocol === 'amqp:') ? 5672 : 5671);
- var user, pass;
- // Only default if both are missing, to have the same behaviour as
- // the stringly URL.
- if (url.username == undefined && url.password == undefined) {
- user = 'guest'; pass = 'guest';
- } else {
- user = url.username || '';
- pass = url.password || '';
- }
- var config = {
- locale: url.locale,
- channelMax: url.channelMax,
- frameMax: url.frameMax,
- heartbeat: url.heartbeat,
- };
- fields = openFrames(url.vhost, config, sockopts.credentials || credentials.plain(user, pass), extraClientProperties);
- } else {
- var parts = URL(url, true); // yes, parse the query string
- protocol = parts.protocol;
- sockopts.host = parts.hostname;
- sockopts.servername = sockopts.servername || parts.hostname;
- sockopts.port = parseInt(parts.port) || ((protocol === 'amqp:') ? 5672 : 5671);
- var vhost = parts.pathname ? parts.pathname.substr(1) : null;
- fields = openFrames(vhost, parts.query, sockopts.credentials || credentialsFromUrl(parts), extraClientProperties);
- }
- var sockok = false;
- var sock;
- function onConnect() {
- sockok = true;
- sock.setNoDelay(noDelay);
- if (keepAlive) sock.setKeepAlive(keepAlive, keepAliveDelay);
- var c = new Connection(sock);
- c.open(fields, function(err, ok) {
- // disable timeout once the connection is open, we don't want
- // it fouling things
- if (timeout) sock.setTimeout(0);
- if (err === null) {
- openCallback(null, c);
- } else {
- // The connection isn't closed by the server on e.g. wrong password
- sock.end();
- sock.destroy();
- openCallback(err);
- }
- });
- }
- if (protocol === 'amqp:') {
- sock = require('net').connect(sockopts, onConnect);
- }
- else if (protocol === 'amqps:') {
- sock = require('tls').connect(sockopts, onConnect);
- }
- else {
- throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol);
- }
- if (timeout) {
- sock.setTimeout(timeout, function() {
- sock.end();
- sock.destroy();
- openCallback(new Error('connect ETIMEDOUT'));
- });
- }
- sock.once('error', function(err) {
- if (!sockok) openCallback(err);
- });
- }
- module.exports.connect = connect;
- module.exports.credentialsFromUrl = credentialsFromUrl;
|