index.js 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const commands_1 = require("@ioredis/commands");
  4. const events_1 = require("events");
  5. const redis_errors_1 = require("redis-errors");
  6. const standard_as_callback_1 = require("standard-as-callback");
  7. const Command_1 = require("../Command");
  8. const ClusterAllFailedError_1 = require("../errors/ClusterAllFailedError");
  9. const Redis_1 = require("../Redis");
  10. const ScanStream_1 = require("../ScanStream");
  11. const transaction_1 = require("../transaction");
  12. const utils_1 = require("../utils");
  13. const applyMixin_1 = require("../utils/applyMixin");
  14. const Commander_1 = require("../utils/Commander");
  15. const ClusterOptions_1 = require("./ClusterOptions");
  16. const ClusterSubscriber_1 = require("./ClusterSubscriber");
  17. const ConnectionPool_1 = require("./ConnectionPool");
  18. const DelayQueue_1 = require("./DelayQueue");
  19. const util_1 = require("./util");
  20. const Deque = require("denque");
  21. const debug = (0, utils_1.Debug)("cluster");
  22. const REJECT_OVERWRITTEN_COMMANDS = new WeakSet();
  23. /**
  24. * Client for the official Redis Cluster
  25. */
  26. class Cluster extends Commander_1.default {
  27. /**
  28. * Creates an instance of Cluster.
  29. */
  30. constructor(startupNodes, options = {}) {
  31. super();
  32. this.slots = [];
  33. /**
  34. * @ignore
  35. */
  36. this._groupsIds = {};
  37. /**
  38. * @ignore
  39. */
  40. this._groupsBySlot = Array(16384);
  41. /**
  42. * @ignore
  43. */
  44. this.isCluster = true;
  45. this.retryAttempts = 0;
  46. this.delayQueue = new DelayQueue_1.default();
  47. this.offlineQueue = new Deque();
  48. this.isRefreshing = false;
  49. this._refreshSlotsCacheCallbacks = [];
  50. this._autoPipelines = new Map();
  51. this._runningAutoPipelines = new Set();
  52. this._readyDelayedCallbacks = [];
  53. /**
  54. * Every time Cluster#connect() is called, this value will be
  55. * auto-incrementing. The purpose of this value is used for
  56. * discarding previous connect attampts when creating a new
  57. * connection.
  58. */
  59. this.connectionEpoch = 0;
  60. events_1.EventEmitter.call(this);
  61. this.startupNodes = startupNodes;
  62. this.options = (0, utils_1.defaults)({}, options, ClusterOptions_1.DEFAULT_CLUSTER_OPTIONS, this.options);
  63. if (this.options.redisOptions &&
  64. this.options.redisOptions.keyPrefix &&
  65. !this.options.keyPrefix) {
  66. this.options.keyPrefix = this.options.redisOptions.keyPrefix;
  67. }
  68. // validate options
  69. if (typeof this.options.scaleReads !== "function" &&
  70. ["all", "master", "slave"].indexOf(this.options.scaleReads) === -1) {
  71. throw new Error('Invalid option scaleReads "' +
  72. this.options.scaleReads +
  73. '". Expected "all", "master", "slave" or a custom function');
  74. }
  75. this.connectionPool = new ConnectionPool_1.default(this.options.redisOptions);
  76. this.connectionPool.on("-node", (redis, key) => {
  77. this.emit("-node", redis);
  78. });
  79. this.connectionPool.on("+node", (redis) => {
  80. this.emit("+node", redis);
  81. });
  82. this.connectionPool.on("drain", () => {
  83. this.setStatus("close");
  84. });
  85. this.connectionPool.on("nodeError", (error, key) => {
  86. this.emit("node error", error, key);
  87. });
  88. this.subscriber = new ClusterSubscriber_1.default(this.connectionPool, this);
  89. if (this.options.scripts) {
  90. Object.entries(this.options.scripts).forEach(([name, definition]) => {
  91. this.defineCommand(name, definition);
  92. });
  93. }
  94. if (this.options.lazyConnect) {
  95. this.setStatus("wait");
  96. }
  97. else {
  98. this.connect().catch((err) => {
  99. debug("connecting failed: %s", err);
  100. });
  101. }
  102. }
  103. /**
  104. * Connect to a cluster
  105. */
  106. connect() {
  107. return new Promise((resolve, reject) => {
  108. if (this.status === "connecting" ||
  109. this.status === "connect" ||
  110. this.status === "ready") {
  111. reject(new Error("Redis is already connecting/connected"));
  112. return;
  113. }
  114. const epoch = ++this.connectionEpoch;
  115. this.setStatus("connecting");
  116. this.resolveStartupNodeHostnames()
  117. .then((nodes) => {
  118. if (this.connectionEpoch !== epoch) {
  119. debug("discard connecting after resolving startup nodes because epoch not match: %d != %d", epoch, this.connectionEpoch);
  120. reject(new redis_errors_1.RedisError("Connection is discarded because a new connection is made"));
  121. return;
  122. }
  123. if (this.status !== "connecting") {
  124. debug("discard connecting after resolving startup nodes because the status changed to %s", this.status);
  125. reject(new redis_errors_1.RedisError("Connection is aborted"));
  126. return;
  127. }
  128. this.connectionPool.reset(nodes);
  129. const readyHandler = () => {
  130. this.setStatus("ready");
  131. this.retryAttempts = 0;
  132. this.executeOfflineCommands();
  133. this.resetNodesRefreshInterval();
  134. resolve();
  135. };
  136. let closeListener = undefined;
  137. const refreshListener = () => {
  138. this.invokeReadyDelayedCallbacks(undefined);
  139. this.removeListener("close", closeListener);
  140. this.manuallyClosing = false;
  141. this.setStatus("connect");
  142. if (this.options.enableReadyCheck) {
  143. this.readyCheck((err, fail) => {
  144. if (err || fail) {
  145. debug("Ready check failed (%s). Reconnecting...", err || fail);
  146. if (this.status === "connect") {
  147. this.disconnect(true);
  148. }
  149. }
  150. else {
  151. readyHandler();
  152. }
  153. });
  154. }
  155. else {
  156. readyHandler();
  157. }
  158. };
  159. closeListener = () => {
  160. const error = new Error("None of startup nodes is available");
  161. this.removeListener("refresh", refreshListener);
  162. this.invokeReadyDelayedCallbacks(error);
  163. reject(error);
  164. };
  165. this.once("refresh", refreshListener);
  166. this.once("close", closeListener);
  167. this.once("close", this.handleCloseEvent.bind(this));
  168. this.refreshSlotsCache((err) => {
  169. if (err && err.message === ClusterAllFailedError_1.default.defaultMessage) {
  170. Redis_1.default.prototype.silentEmit.call(this, "error", err);
  171. this.connectionPool.reset([]);
  172. }
  173. });
  174. this.subscriber.start();
  175. })
  176. .catch((err) => {
  177. this.setStatus("close");
  178. this.handleCloseEvent(err);
  179. this.invokeReadyDelayedCallbacks(err);
  180. reject(err);
  181. });
  182. });
  183. }
  184. /**
  185. * Disconnect from every node in the cluster.
  186. */
  187. disconnect(reconnect = false) {
  188. const status = this.status;
  189. this.setStatus("disconnecting");
  190. if (!reconnect) {
  191. this.manuallyClosing = true;
  192. }
  193. if (this.reconnectTimeout && !reconnect) {
  194. clearTimeout(this.reconnectTimeout);
  195. this.reconnectTimeout = null;
  196. debug("Canceled reconnecting attempts");
  197. }
  198. this.clearNodesRefreshInterval();
  199. this.subscriber.stop();
  200. if (status === "wait") {
  201. this.setStatus("close");
  202. this.handleCloseEvent();
  203. }
  204. else {
  205. this.connectionPool.reset([]);
  206. }
  207. }
  208. /**
  209. * Quit the cluster gracefully.
  210. */
  211. quit(callback) {
  212. const status = this.status;
  213. this.setStatus("disconnecting");
  214. this.manuallyClosing = true;
  215. if (this.reconnectTimeout) {
  216. clearTimeout(this.reconnectTimeout);
  217. this.reconnectTimeout = null;
  218. }
  219. this.clearNodesRefreshInterval();
  220. this.subscriber.stop();
  221. if (status === "wait") {
  222. const ret = (0, standard_as_callback_1.default)(Promise.resolve("OK"), callback);
  223. // use setImmediate to make sure "close" event
  224. // being emitted after quit() is returned
  225. setImmediate(function () {
  226. this.setStatus("close");
  227. this.handleCloseEvent();
  228. }.bind(this));
  229. return ret;
  230. }
  231. return (0, standard_as_callback_1.default)(Promise.all(this.nodes().map((node) => node.quit().catch((err) => {
  232. // Ignore the error caused by disconnecting since
  233. // we're disconnecting...
  234. if (err.message === utils_1.CONNECTION_CLOSED_ERROR_MSG) {
  235. return "OK";
  236. }
  237. throw err;
  238. }))).then(() => "OK"), callback);
  239. }
  240. /**
  241. * Create a new instance with the same startup nodes and options as the current one.
  242. *
  243. * @example
  244. * ```js
  245. * var cluster = new Redis.Cluster([{ host: "127.0.0.1", port: "30001" }]);
  246. * var anotherCluster = cluster.duplicate();
  247. * ```
  248. */
  249. duplicate(overrideStartupNodes = [], overrideOptions = {}) {
  250. const startupNodes = overrideStartupNodes.length > 0
  251. ? overrideStartupNodes
  252. : this.startupNodes.slice(0);
  253. const options = Object.assign({}, this.options, overrideOptions);
  254. return new Cluster(startupNodes, options);
  255. }
  256. /**
  257. * Get nodes with the specified role
  258. */
  259. nodes(role = "all") {
  260. if (role !== "all" && role !== "master" && role !== "slave") {
  261. throw new Error('Invalid role "' + role + '". Expected "all", "master" or "slave"');
  262. }
  263. return this.connectionPool.getNodes(role);
  264. }
  265. /**
  266. * This is needed in order not to install a listener for each auto pipeline
  267. *
  268. * @ignore
  269. */
  270. delayUntilReady(callback) {
  271. this._readyDelayedCallbacks.push(callback);
  272. }
  273. /**
  274. * Get the number of commands queued in automatic pipelines.
  275. *
  276. * This is not available (and returns 0) until the cluster is connected and slots information have been received.
  277. */
  278. get autoPipelineQueueSize() {
  279. let queued = 0;
  280. for (const pipeline of this._autoPipelines.values()) {
  281. queued += pipeline.length;
  282. }
  283. return queued;
  284. }
  285. /**
  286. * Refresh the slot cache
  287. *
  288. * @ignore
  289. */
  290. refreshSlotsCache(callback) {
  291. if (callback) {
  292. this._refreshSlotsCacheCallbacks.push(callback);
  293. }
  294. if (this.isRefreshing) {
  295. return;
  296. }
  297. this.isRefreshing = true;
  298. const _this = this;
  299. const wrapper = (error) => {
  300. this.isRefreshing = false;
  301. for (const callback of this._refreshSlotsCacheCallbacks) {
  302. callback(error);
  303. }
  304. this._refreshSlotsCacheCallbacks = [];
  305. };
  306. const nodes = (0, utils_1.shuffle)(this.connectionPool.getNodes());
  307. let lastNodeError = null;
  308. function tryNode(index) {
  309. if (index === nodes.length) {
  310. const error = new ClusterAllFailedError_1.default(ClusterAllFailedError_1.default.defaultMessage, lastNodeError);
  311. return wrapper(error);
  312. }
  313. const node = nodes[index];
  314. const key = `${node.options.host}:${node.options.port}`;
  315. debug("getting slot cache from %s", key);
  316. _this.getInfoFromNode(node, function (err) {
  317. switch (_this.status) {
  318. case "close":
  319. case "end":
  320. return wrapper(new Error("Cluster is disconnected."));
  321. case "disconnecting":
  322. return wrapper(new Error("Cluster is disconnecting."));
  323. }
  324. if (err) {
  325. _this.emit("node error", err, key);
  326. lastNodeError = err;
  327. tryNode(index + 1);
  328. }
  329. else {
  330. _this.emit("refresh");
  331. wrapper();
  332. }
  333. });
  334. }
  335. tryNode(0);
  336. }
  337. /**
  338. * @ignore
  339. */
  340. sendCommand(command, stream, node) {
  341. if (this.status === "wait") {
  342. this.connect().catch(utils_1.noop);
  343. }
  344. if (this.status === "end") {
  345. command.reject(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  346. return command.promise;
  347. }
  348. let to = this.options.scaleReads;
  349. if (to !== "master") {
  350. const isCommandReadOnly = command.isReadOnly ||
  351. ((0, commands_1.exists)(command.name) && (0, commands_1.hasFlag)(command.name, "readonly"));
  352. if (!isCommandReadOnly) {
  353. to = "master";
  354. }
  355. }
  356. let targetSlot = node ? node.slot : command.getSlot();
  357. const ttl = {};
  358. const _this = this;
  359. if (!node && !REJECT_OVERWRITTEN_COMMANDS.has(command)) {
  360. REJECT_OVERWRITTEN_COMMANDS.add(command);
  361. const reject = command.reject;
  362. command.reject = function (err) {
  363. const partialTry = tryConnection.bind(null, true);
  364. _this.handleError(err, ttl, {
  365. moved: function (slot, key) {
  366. debug("command %s is moved to %s", command.name, key);
  367. targetSlot = Number(slot);
  368. if (_this.slots[slot]) {
  369. _this.slots[slot][0] = key;
  370. }
  371. else {
  372. _this.slots[slot] = [key];
  373. }
  374. _this._groupsBySlot[slot] =
  375. _this._groupsIds[_this.slots[slot].join(";")];
  376. _this.connectionPool.findOrCreate(_this.natMapper(key));
  377. tryConnection();
  378. debug("refreshing slot caches... (triggered by MOVED error)");
  379. _this.refreshSlotsCache();
  380. },
  381. ask: function (slot, key) {
  382. debug("command %s is required to ask %s:%s", command.name, key);
  383. const mapped = _this.natMapper(key);
  384. _this.connectionPool.findOrCreate(mapped);
  385. tryConnection(false, `${mapped.host}:${mapped.port}`);
  386. },
  387. tryagain: partialTry,
  388. clusterDown: partialTry,
  389. connectionClosed: partialTry,
  390. maxRedirections: function (redirectionError) {
  391. reject.call(command, redirectionError);
  392. },
  393. defaults: function () {
  394. reject.call(command, err);
  395. },
  396. });
  397. };
  398. }
  399. tryConnection();
  400. function tryConnection(random, asking) {
  401. if (_this.status === "end") {
  402. command.reject(new redis_errors_1.AbortError("Cluster is ended."));
  403. return;
  404. }
  405. let redis;
  406. if (_this.status === "ready" || command.name === "cluster") {
  407. if (node && node.redis) {
  408. redis = node.redis;
  409. }
  410. else if (Command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", command.name) ||
  411. Command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", command.name)) {
  412. redis = _this.subscriber.getInstance();
  413. if (!redis) {
  414. command.reject(new redis_errors_1.AbortError("No subscriber for the cluster"));
  415. return;
  416. }
  417. }
  418. else {
  419. if (!random) {
  420. if (typeof targetSlot === "number" && _this.slots[targetSlot]) {
  421. const nodeKeys = _this.slots[targetSlot];
  422. if (typeof to === "function") {
  423. const nodes = nodeKeys.map(function (key) {
  424. return _this.connectionPool.getInstanceByKey(key);
  425. });
  426. redis = to(nodes, command);
  427. if (Array.isArray(redis)) {
  428. redis = (0, utils_1.sample)(redis);
  429. }
  430. if (!redis) {
  431. redis = nodes[0];
  432. }
  433. }
  434. else {
  435. let key;
  436. if (to === "all") {
  437. key = (0, utils_1.sample)(nodeKeys);
  438. }
  439. else if (to === "slave" && nodeKeys.length > 1) {
  440. key = (0, utils_1.sample)(nodeKeys, 1);
  441. }
  442. else {
  443. key = nodeKeys[0];
  444. }
  445. redis = _this.connectionPool.getInstanceByKey(key);
  446. }
  447. }
  448. if (asking) {
  449. redis = _this.connectionPool.getInstanceByKey(asking);
  450. redis.asking();
  451. }
  452. }
  453. if (!redis) {
  454. redis =
  455. (typeof to === "function"
  456. ? null
  457. : _this.connectionPool.getSampleInstance(to)) ||
  458. _this.connectionPool.getSampleInstance("all");
  459. }
  460. }
  461. if (node && !node.redis) {
  462. node.redis = redis;
  463. }
  464. }
  465. if (redis) {
  466. redis.sendCommand(command, stream);
  467. }
  468. else if (_this.options.enableOfflineQueue) {
  469. _this.offlineQueue.push({
  470. command: command,
  471. stream: stream,
  472. node: node,
  473. });
  474. }
  475. else {
  476. command.reject(new Error("Cluster isn't ready and enableOfflineQueue options is false"));
  477. }
  478. }
  479. return command.promise;
  480. }
  481. sscanStream(key, options) {
  482. return this.createScanStream("sscan", { key, options });
  483. }
  484. sscanBufferStream(key, options) {
  485. return this.createScanStream("sscanBuffer", { key, options });
  486. }
  487. hscanStream(key, options) {
  488. return this.createScanStream("hscan", { key, options });
  489. }
  490. hscanBufferStream(key, options) {
  491. return this.createScanStream("hscanBuffer", { key, options });
  492. }
  493. zscanStream(key, options) {
  494. return this.createScanStream("zscan", { key, options });
  495. }
  496. zscanBufferStream(key, options) {
  497. return this.createScanStream("zscanBuffer", { key, options });
  498. }
  499. /**
  500. * @ignore
  501. */
  502. handleError(error, ttl, handlers) {
  503. if (typeof ttl.value === "undefined") {
  504. ttl.value = this.options.maxRedirections;
  505. }
  506. else {
  507. ttl.value -= 1;
  508. }
  509. if (ttl.value <= 0) {
  510. handlers.maxRedirections(new Error("Too many Cluster redirections. Last error: " + error));
  511. return;
  512. }
  513. const errv = error.message.split(" ");
  514. if (errv[0] === "MOVED") {
  515. const timeout = this.options.retryDelayOnMoved;
  516. if (timeout && typeof timeout === "number") {
  517. this.delayQueue.push("moved", handlers.moved.bind(null, errv[1], errv[2]), { timeout });
  518. }
  519. else {
  520. handlers.moved(errv[1], errv[2]);
  521. }
  522. }
  523. else if (errv[0] === "ASK") {
  524. handlers.ask(errv[1], errv[2]);
  525. }
  526. else if (errv[0] === "TRYAGAIN") {
  527. this.delayQueue.push("tryagain", handlers.tryagain, {
  528. timeout: this.options.retryDelayOnTryAgain,
  529. });
  530. }
  531. else if (errv[0] === "CLUSTERDOWN" &&
  532. this.options.retryDelayOnClusterDown > 0) {
  533. this.delayQueue.push("clusterdown", handlers.connectionClosed, {
  534. timeout: this.options.retryDelayOnClusterDown,
  535. callback: this.refreshSlotsCache.bind(this),
  536. });
  537. }
  538. else if (error.message === utils_1.CONNECTION_CLOSED_ERROR_MSG &&
  539. this.options.retryDelayOnFailover > 0 &&
  540. this.status === "ready") {
  541. this.delayQueue.push("failover", handlers.connectionClosed, {
  542. timeout: this.options.retryDelayOnFailover,
  543. callback: this.refreshSlotsCache.bind(this),
  544. });
  545. }
  546. else {
  547. handlers.defaults();
  548. }
  549. }
  550. resetOfflineQueue() {
  551. this.offlineQueue = new Deque();
  552. }
  553. clearNodesRefreshInterval() {
  554. if (this.slotsTimer) {
  555. clearTimeout(this.slotsTimer);
  556. this.slotsTimer = null;
  557. }
  558. }
  559. resetNodesRefreshInterval() {
  560. if (this.slotsTimer || !this.options.slotsRefreshInterval) {
  561. return;
  562. }
  563. const nextRound = () => {
  564. this.slotsTimer = setTimeout(() => {
  565. debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)');
  566. this.refreshSlotsCache(() => {
  567. nextRound();
  568. });
  569. }, this.options.slotsRefreshInterval);
  570. };
  571. nextRound();
  572. }
  573. /**
  574. * Change cluster instance's status
  575. */
  576. setStatus(status) {
  577. debug("status: %s -> %s", this.status || "[empty]", status);
  578. this.status = status;
  579. process.nextTick(() => {
  580. this.emit(status);
  581. });
  582. }
  583. /**
  584. * Called when closed to check whether a reconnection should be made
  585. */
  586. handleCloseEvent(reason) {
  587. if (reason) {
  588. debug("closed because %s", reason);
  589. }
  590. let retryDelay;
  591. if (!this.manuallyClosing &&
  592. typeof this.options.clusterRetryStrategy === "function") {
  593. retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts, reason);
  594. }
  595. if (typeof retryDelay === "number") {
  596. this.setStatus("reconnecting");
  597. this.reconnectTimeout = setTimeout(() => {
  598. this.reconnectTimeout = null;
  599. debug("Cluster is disconnected. Retrying after %dms", retryDelay);
  600. this.connect().catch(function (err) {
  601. debug("Got error %s when reconnecting. Ignoring...", err);
  602. });
  603. }, retryDelay);
  604. }
  605. else {
  606. this.setStatus("end");
  607. this.flushQueue(new Error("None of startup nodes is available"));
  608. }
  609. }
  610. /**
  611. * Flush offline queue with error.
  612. */
  613. flushQueue(error) {
  614. let item;
  615. while ((item = this.offlineQueue.shift())) {
  616. item.command.reject(error);
  617. }
  618. }
  619. executeOfflineCommands() {
  620. if (this.offlineQueue.length) {
  621. debug("send %d commands in offline queue", this.offlineQueue.length);
  622. const offlineQueue = this.offlineQueue;
  623. this.resetOfflineQueue();
  624. let item;
  625. while ((item = offlineQueue.shift())) {
  626. this.sendCommand(item.command, item.stream, item.node);
  627. }
  628. }
  629. }
  630. natMapper(nodeKey) {
  631. if (this.options.natMap && typeof this.options.natMap === "object") {
  632. const key = typeof nodeKey === "string"
  633. ? nodeKey
  634. : `${nodeKey.host}:${nodeKey.port}`;
  635. const mapped = this.options.natMap[key];
  636. if (mapped) {
  637. debug("NAT mapping %s -> %O", key, mapped);
  638. return Object.assign({}, mapped);
  639. }
  640. }
  641. return typeof nodeKey === "string"
  642. ? (0, util_1.nodeKeyToRedisOptions)(nodeKey)
  643. : nodeKey;
  644. }
  645. getInfoFromNode(redis, callback) {
  646. if (!redis) {
  647. return callback(new Error("Node is disconnected"));
  648. }
  649. // Use a duplication of the connection to avoid
  650. // timeouts when the connection is in the blocking
  651. // mode (e.g. waiting for BLPOP).
  652. const duplicatedConnection = redis.duplicate({
  653. enableOfflineQueue: true,
  654. enableReadyCheck: false,
  655. retryStrategy: null,
  656. connectionName: (0, util_1.getConnectionName)("refresher", this.options.redisOptions && this.options.redisOptions.connectionName),
  657. });
  658. // Ignore error events since we will handle
  659. // exceptions for the CLUSTER SLOTS command.
  660. duplicatedConnection.on("error", utils_1.noop);
  661. duplicatedConnection.cluster("SLOTS", (0, utils_1.timeout)((err, result) => {
  662. duplicatedConnection.disconnect();
  663. if (err) {
  664. return callback(err);
  665. }
  666. if (this.status === "disconnecting" ||
  667. this.status === "close" ||
  668. this.status === "end") {
  669. debug("ignore CLUSTER.SLOTS results (count: %d) since cluster status is %s", result.length, this.status);
  670. callback();
  671. return;
  672. }
  673. const nodes = [];
  674. debug("cluster slots result count: %d", result.length);
  675. for (let i = 0; i < result.length; ++i) {
  676. const items = result[i];
  677. const slotRangeStart = items[0];
  678. const slotRangeEnd = items[1];
  679. const keys = [];
  680. for (let j = 2; j < items.length; j++) {
  681. if (!items[j][0]) {
  682. continue;
  683. }
  684. const node = this.natMapper({
  685. host: items[j][0],
  686. port: items[j][1],
  687. });
  688. node.readOnly = j !== 2;
  689. nodes.push(node);
  690. keys.push(node.host + ":" + node.port);
  691. }
  692. debug("cluster slots result [%d]: slots %d~%d served by %s", i, slotRangeStart, slotRangeEnd, keys);
  693. for (let slot = slotRangeStart; slot <= slotRangeEnd; slot++) {
  694. this.slots[slot] = keys;
  695. }
  696. }
  697. // Assign to each node keys a numeric value to make autopipeline comparison faster.
  698. this._groupsIds = Object.create(null);
  699. let j = 0;
  700. for (let i = 0; i < 16384; i++) {
  701. const target = (this.slots[i] || []).join(";");
  702. if (!target.length) {
  703. this._groupsBySlot[i] = undefined;
  704. continue;
  705. }
  706. if (!this._groupsIds[target]) {
  707. this._groupsIds[target] = ++j;
  708. }
  709. this._groupsBySlot[i] = this._groupsIds[target];
  710. }
  711. this.connectionPool.reset(nodes);
  712. callback();
  713. }, this.options.slotsRefreshTimeout));
  714. }
  715. invokeReadyDelayedCallbacks(err) {
  716. for (const c of this._readyDelayedCallbacks) {
  717. process.nextTick(c, err);
  718. }
  719. this._readyDelayedCallbacks = [];
  720. }
  721. /**
  722. * Check whether Cluster is able to process commands
  723. */
  724. readyCheck(callback) {
  725. this.cluster("INFO", (err, res) => {
  726. if (err) {
  727. return callback(err);
  728. }
  729. if (typeof res !== "string") {
  730. return callback();
  731. }
  732. let state;
  733. const lines = res.split("\r\n");
  734. for (let i = 0; i < lines.length; ++i) {
  735. const parts = lines[i].split(":");
  736. if (parts[0] === "cluster_state") {
  737. state = parts[1];
  738. break;
  739. }
  740. }
  741. if (state === "fail") {
  742. debug("cluster state not ok (%s)", state);
  743. callback(null, state);
  744. }
  745. else {
  746. callback();
  747. }
  748. });
  749. }
  750. resolveSrv(hostname) {
  751. return new Promise((resolve, reject) => {
  752. this.options.resolveSrv(hostname, (err, records) => {
  753. if (err) {
  754. return reject(err);
  755. }
  756. const self = this, groupedRecords = (0, util_1.groupSrvRecords)(records), sortedKeys = Object.keys(groupedRecords).sort((a, b) => parseInt(a) - parseInt(b));
  757. function tryFirstOne(err) {
  758. if (!sortedKeys.length) {
  759. return reject(err);
  760. }
  761. const key = sortedKeys[0], group = groupedRecords[key], record = (0, util_1.weightSrvRecords)(group);
  762. if (!group.records.length) {
  763. sortedKeys.shift();
  764. }
  765. self.dnsLookup(record.name).then((host) => resolve({
  766. host,
  767. port: record.port,
  768. }), tryFirstOne);
  769. }
  770. tryFirstOne();
  771. });
  772. });
  773. }
  774. dnsLookup(hostname) {
  775. return new Promise((resolve, reject) => {
  776. this.options.dnsLookup(hostname, (err, address) => {
  777. if (err) {
  778. debug("failed to resolve hostname %s to IP: %s", hostname, err.message);
  779. reject(err);
  780. }
  781. else {
  782. debug("resolved hostname %s to IP %s", hostname, address);
  783. resolve(address);
  784. }
  785. });
  786. });
  787. }
  788. /**
  789. * Normalize startup nodes, and resolving hostnames to IPs.
  790. *
  791. * This process happens every time when #connect() is called since
  792. * #startupNodes and DNS records may chanage.
  793. */
  794. async resolveStartupNodeHostnames() {
  795. if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
  796. throw new Error("`startupNodes` should contain at least one node.");
  797. }
  798. const startupNodes = (0, util_1.normalizeNodeOptions)(this.startupNodes);
  799. const hostnames = (0, util_1.getUniqueHostnamesFromOptions)(startupNodes);
  800. if (hostnames.length === 0) {
  801. return startupNodes;
  802. }
  803. const configs = await Promise.all(hostnames.map((this.options.useSRVRecords ? this.resolveSrv : this.dnsLookup).bind(this)));
  804. const hostnameToConfig = (0, utils_1.zipMap)(hostnames, configs);
  805. return startupNodes.map((node) => {
  806. const config = hostnameToConfig.get(node.host);
  807. if (!config) {
  808. return node;
  809. }
  810. if (this.options.useSRVRecords) {
  811. return Object.assign({}, node, config);
  812. }
  813. return Object.assign({}, node, { host: config });
  814. });
  815. }
  816. createScanStream(command, { key, options = {} }) {
  817. return new ScanStream_1.default({
  818. objectMode: true,
  819. key: key,
  820. redis: this,
  821. command: command,
  822. ...options,
  823. });
  824. }
  825. }
  826. (0, applyMixin_1.default)(Cluster, events_1.EventEmitter);
  827. (0, transaction_1.addTransactionSupport)(Cluster.prototype);
  828. exports.default = Cluster;