connect.js 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. //
  2. //
  3. //
  4. // General-purpose API for glueing everything together.
  5. 'use strict';
  6. var URL = require('url-parse');
  7. var QS = require('querystring');
  8. var Connection = require('./connection').Connection;
  9. var fmt = require('util').format;
  10. var credentials = require('./credentials');
  11. function copyInto(obj, target) {
  12. var keys = Object.keys(obj);
  13. var i = keys.length;
  14. while (i--) {
  15. var k = keys[i];
  16. target[k] = obj[k];
  17. }
  18. return target;
  19. }
  20. // Adapted from util._extend, which is too fringe to use.
  21. function clone(obj) {
  22. return copyInto(obj, {});
  23. }
  24. var CLIENT_PROPERTIES = {
  25. "product": "amqplib",
  26. "version": require('../package.json').version,
  27. "platform": fmt('Node.JS %s', process.version),
  28. "information": "http://squaremo.github.io/amqp.node",
  29. "capabilities": {
  30. "publisher_confirms": true,
  31. "exchange_exchange_bindings": true,
  32. "basic.nack": true,
  33. "consumer_cancel_notify": true,
  34. "connection.blocked": true,
  35. "authentication_failure_close": true
  36. }
  37. };
  38. // Construct the main frames used in the opening handshake
  39. function openFrames(vhost, query, credentials, extraClientProperties) {
  40. if (!vhost)
  41. vhost = '/';
  42. else
  43. vhost = QS.unescape(vhost);
  44. var query = query || {};
  45. function intOrDefault(val, def) {
  46. return (val === undefined) ? def : parseInt(val);
  47. }
  48. var clientProperties = Object.create(CLIENT_PROPERTIES);
  49. return {
  50. // start-ok
  51. 'clientProperties': copyInto(extraClientProperties, clientProperties),
  52. 'mechanism': credentials.mechanism,
  53. 'response': credentials.response(),
  54. 'locale': query.locale || 'en_US',
  55. // tune-ok
  56. 'channelMax': intOrDefault(query.channelMax, 0),
  57. 'frameMax': intOrDefault(query.frameMax, 0x1000),
  58. 'heartbeat': intOrDefault(query.heartbeat, 0),
  59. // open
  60. 'virtualHost': vhost,
  61. 'capabilities': '',
  62. 'insist': 0
  63. };
  64. }
  65. // Decide on credentials based on what we're supplied.
  66. function credentialsFromUrl(parts) {
  67. var user = 'guest', passwd = 'guest';
  68. if (parts.username != '' || parts.password != '') {
  69. user = (parts.username) ? unescape(parts.username) : '';
  70. passwd = (parts.password) ? unescape(parts.password) : '';
  71. }
  72. return credentials.plain(user, passwd);
  73. }
  74. function connect(url, socketOptions, openCallback) {
  75. // tls.connect uses `util._extend()` on the options given it, which
  76. // copies only properties mentioned in `Object.keys()`, when
  77. // processing the options. So I have to make copies too, rather
  78. // than using `Object.create()`.
  79. var sockopts = clone(socketOptions || {});
  80. url = url || 'amqp://localhost';
  81. var noDelay = !!sockopts.noDelay;
  82. var timeout = sockopts.timeout;
  83. var keepAlive = !!sockopts.keepAlive;
  84. // 0 is default for node
  85. var keepAliveDelay = sockopts.keepAliveDelay || 0;
  86. var extraClientProperties = sockopts.clientProperties || {};
  87. var protocol, fields;
  88. if (typeof url === 'object') {
  89. protocol = (url.protocol || 'amqp') + ':';
  90. sockopts.host = url.hostname;
  91. sockopts.servername = sockopts.servername || url.hostname;
  92. sockopts.port = url.port || ((protocol === 'amqp:') ? 5672 : 5671);
  93. var user, pass;
  94. // Only default if both are missing, to have the same behaviour as
  95. // the stringly URL.
  96. if (url.username == undefined && url.password == undefined) {
  97. user = 'guest'; pass = 'guest';
  98. } else {
  99. user = url.username || '';
  100. pass = url.password || '';
  101. }
  102. var config = {
  103. locale: url.locale,
  104. channelMax: url.channelMax,
  105. frameMax: url.frameMax,
  106. heartbeat: url.heartbeat,
  107. };
  108. fields = openFrames(url.vhost, config, sockopts.credentials || credentials.plain(user, pass), extraClientProperties);
  109. } else {
  110. var parts = URL(url, true); // yes, parse the query string
  111. protocol = parts.protocol;
  112. sockopts.host = parts.hostname;
  113. sockopts.servername = sockopts.servername || parts.hostname;
  114. sockopts.port = parseInt(parts.port) || ((protocol === 'amqp:') ? 5672 : 5671);
  115. var vhost = parts.pathname ? parts.pathname.substr(1) : null;
  116. fields = openFrames(vhost, parts.query, sockopts.credentials || credentialsFromUrl(parts), extraClientProperties);
  117. }
  118. var sockok = false;
  119. var sock;
  120. function onConnect() {
  121. sockok = true;
  122. sock.setNoDelay(noDelay);
  123. if (keepAlive) sock.setKeepAlive(keepAlive, keepAliveDelay);
  124. var c = new Connection(sock);
  125. c.open(fields, function(err, ok) {
  126. // disable timeout once the connection is open, we don't want
  127. // it fouling things
  128. if (timeout) sock.setTimeout(0);
  129. if (err === null) {
  130. openCallback(null, c);
  131. } else {
  132. // The connection isn't closed by the server on e.g. wrong password
  133. sock.end();
  134. sock.destroy();
  135. openCallback(err);
  136. }
  137. });
  138. }
  139. if (protocol === 'amqp:') {
  140. sock = require('net').connect(sockopts, onConnect);
  141. }
  142. else if (protocol === 'amqps:') {
  143. sock = require('tls').connect(sockopts, onConnect);
  144. }
  145. else {
  146. throw new Error("Expected amqp: or amqps: as the protocol; got " + protocol);
  147. }
  148. if (timeout) {
  149. sock.setTimeout(timeout, function() {
  150. sock.end();
  151. sock.destroy();
  152. openCallback(new Error('connect ETIMEDOUT'));
  153. });
  154. }
  155. sock.once('error', function(err) {
  156. if (!sockok) openCallback(err);
  157. });
  158. }
  159. module.exports.connect = connect;
  160. module.exports.credentialsFromUrl = credentialsFromUrl;