ClusterSubscriber.js 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const util_1 = require("./util");
  4. const utils_1 = require("../utils");
  5. const Redis_1 = require("../Redis");
  6. const debug = (0, utils_1.Debug)("cluster:subscriber");
  7. class ClusterSubscriber {
  8. constructor(connectionPool, emitter) {
  9. this.connectionPool = connectionPool;
  10. this.emitter = emitter;
  11. this.started = false;
  12. this.subscriber = null;
  13. this.onSubscriberEnd = () => {
  14. if (!this.started) {
  15. debug("subscriber has disconnected, but ClusterSubscriber is not started, so not reconnecting.");
  16. return;
  17. }
  18. // If the subscriber closes whilst it's still the active connection,
  19. // we might as well try to connecting to a new node if possible to
  20. // minimise the number of missed publishes.
  21. debug("subscriber has disconnected, selecting a new one...");
  22. this.selectSubscriber();
  23. };
  24. // If the current node we're using as the subscriber disappears
  25. // from the node pool for some reason, we will select a new one
  26. // to connect to.
  27. // Note that this event is only triggered if the connection to
  28. // the node has been used; cluster subscriptions are setup with
  29. // lazyConnect = true. It's possible for the subscriber node to
  30. // disappear without this method being called!
  31. // See https://github.com/luin/ioredis/pull/1589
  32. this.connectionPool.on("-node", (_, key) => {
  33. if (!this.started || !this.subscriber) {
  34. return;
  35. }
  36. if ((0, util_1.getNodeKey)(this.subscriber.options) === key) {
  37. debug("subscriber has left, selecting a new one...");
  38. this.selectSubscriber();
  39. }
  40. });
  41. this.connectionPool.on("+node", () => {
  42. if (!this.started || this.subscriber) {
  43. return;
  44. }
  45. debug("a new node is discovered and there is no subscriber, selecting a new one...");
  46. this.selectSubscriber();
  47. });
  48. }
  49. getInstance() {
  50. return this.subscriber;
  51. }
  52. start() {
  53. this.started = true;
  54. this.selectSubscriber();
  55. debug("started");
  56. }
  57. stop() {
  58. this.started = false;
  59. if (this.subscriber) {
  60. this.subscriber.disconnect();
  61. this.subscriber = null;
  62. }
  63. debug("stopped");
  64. }
  65. selectSubscriber() {
  66. const lastActiveSubscriber = this.lastActiveSubscriber;
  67. // Disconnect the previous subscriber even if there
  68. // will not be a new one.
  69. if (lastActiveSubscriber) {
  70. lastActiveSubscriber.off("end", this.onSubscriberEnd);
  71. lastActiveSubscriber.disconnect();
  72. }
  73. if (this.subscriber) {
  74. this.subscriber.off("end", this.onSubscriberEnd);
  75. this.subscriber.disconnect();
  76. }
  77. const sampleNode = (0, utils_1.sample)(this.connectionPool.getNodes());
  78. if (!sampleNode) {
  79. debug("selecting subscriber failed since there is no node discovered in the cluster yet");
  80. this.subscriber = null;
  81. return;
  82. }
  83. const { options } = sampleNode;
  84. debug("selected a subscriber %s:%s", options.host, options.port);
  85. /*
  86. * Create a specialized Redis connection for the subscription.
  87. * Note that auto reconnection is enabled here.
  88. *
  89. * `enableReadyCheck` is also enabled because although subscription is allowed
  90. * while redis is loading data from the disk, we can check if the password
  91. * provided for the subscriber is correct, and if not, the current subscriber
  92. * will be disconnected and a new subscriber will be selected.
  93. */
  94. this.subscriber = new Redis_1.default({
  95. port: options.port,
  96. host: options.host,
  97. username: options.username,
  98. password: options.password,
  99. enableReadyCheck: true,
  100. connectionName: (0, util_1.getConnectionName)("subscriber", options.connectionName),
  101. lazyConnect: true,
  102. tls: options.tls,
  103. // Don't try to reconnect the subscriber connection. If the connection fails
  104. // we will get an end event (handled below), at which point we'll pick a new
  105. // node from the pool and try to connect to that as the subscriber connection.
  106. retryStrategy: null,
  107. });
  108. // Ignore the errors since they're handled in the connection pool.
  109. this.subscriber.on("error", utils_1.noop);
  110. // The node we lost connection to may not come back up in a
  111. // reasonable amount of time (e.g. a slave that's taken down
  112. // for maintainence), we could potentially miss many published
  113. // messages so we should reconnect as quickly as possible, to
  114. // a different node if needed.
  115. this.subscriber.once("end", this.onSubscriberEnd);
  116. // Re-subscribe previous channels
  117. const previousChannels = { subscribe: [], psubscribe: [], ssubscribe: [] };
  118. if (lastActiveSubscriber) {
  119. const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition;
  120. if (condition && condition.subscriber) {
  121. previousChannels.subscribe = condition.subscriber.channels("subscribe");
  122. previousChannels.psubscribe =
  123. condition.subscriber.channels("psubscribe");
  124. previousChannels.ssubscribe =
  125. condition.subscriber.channels("ssubscribe");
  126. }
  127. }
  128. if (previousChannels.subscribe.length ||
  129. previousChannels.psubscribe.length ||
  130. previousChannels.ssubscribe.length) {
  131. let pending = 0;
  132. for (const type of ["subscribe", "psubscribe", "ssubscribe"]) {
  133. const channels = previousChannels[type];
  134. if (channels.length) {
  135. pending += 1;
  136. debug("%s %d channels", type, channels.length);
  137. this.subscriber[type](channels)
  138. .then(() => {
  139. if (!--pending) {
  140. this.lastActiveSubscriber = this.subscriber;
  141. }
  142. })
  143. .catch(() => {
  144. // TODO: should probably disconnect the subscriber and try again.
  145. debug("failed to %s %d channels", type, channels.length);
  146. });
  147. }
  148. }
  149. }
  150. else {
  151. this.lastActiveSubscriber = this.subscriber;
  152. }
  153. for (const event of [
  154. "message",
  155. "messageBuffer",
  156. "smessage",
  157. "smessageBuffer",
  158. ]) {
  159. this.subscriber.on(event, (arg1, arg2) => {
  160. this.emitter.emit(event, arg1, arg2);
  161. });
  162. }
  163. for (const event of ["pmessage", "pmessageBuffer"]) {
  164. this.subscriber.on(event, (arg1, arg2, arg3) => {
  165. this.emitter.emit(event, arg1, arg2, arg3);
  166. });
  167. }
  168. }
  169. }
  170. exports.default = ClusterSubscriber;