event_handler.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.readyHandler = exports.errorHandler = exports.closeHandler = exports.connectHandler = void 0;
  4. const redis_errors_1 = require("redis-errors");
  5. const Command_1 = require("../Command");
  6. const errors_1 = require("../errors");
  7. const utils_1 = require("../utils");
  8. const DataHandler_1 = require("../DataHandler");
  9. const debug = (0, utils_1.Debug)("connection");
  10. function connectHandler(self) {
  11. return function () {
  12. self.setStatus("connect");
  13. self.resetCommandQueue();
  14. // AUTH command should be processed before any other commands
  15. let flushed = false;
  16. const { connectionEpoch } = self;
  17. if (self.condition.auth) {
  18. self.auth(self.condition.auth, function (err) {
  19. if (connectionEpoch !== self.connectionEpoch) {
  20. return;
  21. }
  22. if (err) {
  23. if (err.message.indexOf("no password is set") !== -1) {
  24. console.warn("[WARN] Redis server does not require a password, but a password was supplied.");
  25. }
  26. else if (err.message.indexOf("without any password configured for the default user") !== -1) {
  27. console.warn("[WARN] This Redis server's `default` user does not require a password, but a password was supplied");
  28. }
  29. else if (err.message.indexOf("wrong number of arguments for 'auth' command") !== -1) {
  30. console.warn(`[ERROR] The server returned "wrong number of arguments for 'auth' command". You are probably passing both username and password to Redis version 5 or below. You should only pass the 'password' option for Redis version 5 and under.`);
  31. }
  32. else {
  33. flushed = true;
  34. self.recoverFromFatalError(err, err);
  35. }
  36. }
  37. });
  38. }
  39. if (self.condition.select) {
  40. self.select(self.condition.select).catch((err) => {
  41. // If the node is in cluster mode, select is disallowed.
  42. // In this case, reconnect won't help.
  43. self.silentEmit("error", err);
  44. });
  45. }
  46. if (!self.options.enableReadyCheck) {
  47. exports.readyHandler(self)();
  48. }
  49. /*
  50. No need to keep the reference of DataHandler here
  51. because we don't need to do the cleanup.
  52. `Stream#end()` will remove all listeners for us.
  53. */
  54. new DataHandler_1.default(self, {
  55. stringNumbers: self.options.stringNumbers,
  56. });
  57. if (self.options.enableReadyCheck) {
  58. self._readyCheck(function (err, info) {
  59. if (connectionEpoch !== self.connectionEpoch) {
  60. return;
  61. }
  62. if (err) {
  63. if (!flushed) {
  64. self.recoverFromFatalError(new Error("Ready check failed: " + err.message), err);
  65. }
  66. }
  67. else {
  68. if (self.connector.check(info)) {
  69. exports.readyHandler(self)();
  70. }
  71. else {
  72. self.disconnect(true);
  73. }
  74. }
  75. });
  76. }
  77. };
  78. }
  79. exports.connectHandler = connectHandler;
  80. function abortError(command) {
  81. const err = new redis_errors_1.AbortError("Command aborted due to connection close");
  82. err.command = {
  83. name: command.name,
  84. args: command.args,
  85. };
  86. return err;
  87. }
  88. // If a contiguous set of pipeline commands starts from index zero then they
  89. // can be safely reattempted. If however we have a chain of pipelined commands
  90. // starting at index 1 or more it means we received a partial response before
  91. // the connection close and those pipelined commands must be aborted. For
  92. // example, if the queue looks like this: [2, 3, 4, 0, 1, 2] then after
  93. // aborting and purging we'll have a queue that looks like this: [0, 1, 2]
  94. function abortIncompletePipelines(commandQueue) {
  95. var _a;
  96. let expectedIndex = 0;
  97. for (let i = 0; i < commandQueue.length;) {
  98. const command = (_a = commandQueue.peekAt(i)) === null || _a === void 0 ? void 0 : _a.command;
  99. const pipelineIndex = command.pipelineIndex;
  100. if (pipelineIndex === undefined || pipelineIndex === 0) {
  101. expectedIndex = 0;
  102. }
  103. if (pipelineIndex !== undefined && pipelineIndex !== expectedIndex++) {
  104. commandQueue.remove(i, 1);
  105. command.reject(abortError(command));
  106. continue;
  107. }
  108. i++;
  109. }
  110. }
  111. // If only a partial transaction result was received before connection close,
  112. // we have to abort any transaction fragments that may have ended up in the
  113. // offline queue
  114. function abortTransactionFragments(commandQueue) {
  115. var _a;
  116. for (let i = 0; i < commandQueue.length;) {
  117. const command = (_a = commandQueue.peekAt(i)) === null || _a === void 0 ? void 0 : _a.command;
  118. if (command.name === "multi") {
  119. break;
  120. }
  121. if (command.name === "exec") {
  122. commandQueue.remove(i, 1);
  123. command.reject(abortError(command));
  124. break;
  125. }
  126. if (command.inTransaction) {
  127. commandQueue.remove(i, 1);
  128. command.reject(abortError(command));
  129. }
  130. else {
  131. i++;
  132. }
  133. }
  134. }
  135. function closeHandler(self) {
  136. return function () {
  137. const prevStatus = self.status;
  138. self.setStatus("close");
  139. if (self.commandQueue.length) {
  140. abortIncompletePipelines(self.commandQueue);
  141. }
  142. if (self.offlineQueue.length) {
  143. abortTransactionFragments(self.offlineQueue);
  144. }
  145. if (prevStatus === "ready") {
  146. if (!self.prevCondition) {
  147. self.prevCondition = self.condition;
  148. }
  149. if (self.commandQueue.length) {
  150. self.prevCommandQueue = self.commandQueue;
  151. }
  152. }
  153. if (self.manuallyClosing) {
  154. self.manuallyClosing = false;
  155. debug("skip reconnecting since the connection is manually closed.");
  156. return close();
  157. }
  158. if (typeof self.options.retryStrategy !== "function") {
  159. debug("skip reconnecting because `retryStrategy` is not a function");
  160. return close();
  161. }
  162. const retryDelay = self.options.retryStrategy(++self.retryAttempts);
  163. if (typeof retryDelay !== "number") {
  164. debug("skip reconnecting because `retryStrategy` doesn't return a number");
  165. return close();
  166. }
  167. debug("reconnect in %sms", retryDelay);
  168. self.setStatus("reconnecting", retryDelay);
  169. self.reconnectTimeout = setTimeout(function () {
  170. self.reconnectTimeout = null;
  171. self.connect().catch(utils_1.noop);
  172. }, retryDelay);
  173. const { maxRetriesPerRequest } = self.options;
  174. if (typeof maxRetriesPerRequest === "number") {
  175. if (maxRetriesPerRequest < 0) {
  176. debug("maxRetriesPerRequest is negative, ignoring...");
  177. }
  178. else {
  179. const remainder = self.retryAttempts % (maxRetriesPerRequest + 1);
  180. if (remainder === 0) {
  181. debug("reach maxRetriesPerRequest limitation, flushing command queue...");
  182. self.flushQueue(new errors_1.MaxRetriesPerRequestError(maxRetriesPerRequest));
  183. }
  184. }
  185. }
  186. };
  187. function close() {
  188. self.setStatus("end");
  189. self.flushQueue(new Error(utils_1.CONNECTION_CLOSED_ERROR_MSG));
  190. }
  191. }
  192. exports.closeHandler = closeHandler;
  193. function errorHandler(self) {
  194. return function (error) {
  195. debug("error: %s", error);
  196. self.silentEmit("error", error);
  197. };
  198. }
  199. exports.errorHandler = errorHandler;
  200. function readyHandler(self) {
  201. return function () {
  202. self.setStatus("ready");
  203. self.retryAttempts = 0;
  204. if (self.options.monitor) {
  205. self.call("monitor").then(() => self.setStatus("monitoring"), (error) => self.emit("error", error));
  206. const { sendCommand } = self;
  207. self.sendCommand = function (command) {
  208. if (Command_1.default.checkFlag("VALID_IN_MONITOR_MODE", command.name)) {
  209. return sendCommand.call(self, command);
  210. }
  211. command.reject(new Error("Connection is in monitoring mode, can't process commands."));
  212. return command.promise;
  213. };
  214. self.once("close", function () {
  215. delete self.sendCommand;
  216. });
  217. return;
  218. }
  219. const finalSelect = self.prevCondition
  220. ? self.prevCondition.select
  221. : self.condition.select;
  222. if (self.options.connectionName) {
  223. debug("set the connection name [%s]", self.options.connectionName);
  224. self.client("setname", self.options.connectionName).catch(utils_1.noop);
  225. }
  226. if (self.options.readOnly) {
  227. debug("set the connection to readonly mode");
  228. self.readonly().catch(utils_1.noop);
  229. }
  230. if (self.prevCondition) {
  231. const condition = self.prevCondition;
  232. self.prevCondition = null;
  233. if (condition.subscriber && self.options.autoResubscribe) {
  234. // We re-select the previous db first since
  235. // `SELECT` command is not valid in sub mode.
  236. if (self.condition.select !== finalSelect) {
  237. debug("connect to db [%d]", finalSelect);
  238. self.select(finalSelect);
  239. }
  240. const subscribeChannels = condition.subscriber.channels("subscribe");
  241. if (subscribeChannels.length) {
  242. debug("subscribe %d channels", subscribeChannels.length);
  243. self.subscribe(subscribeChannels);
  244. }
  245. const psubscribeChannels = condition.subscriber.channels("psubscribe");
  246. if (psubscribeChannels.length) {
  247. debug("psubscribe %d channels", psubscribeChannels.length);
  248. self.psubscribe(psubscribeChannels);
  249. }
  250. const ssubscribeChannels = condition.subscriber.channels("ssubscribe");
  251. if (ssubscribeChannels.length) {
  252. debug("ssubscribe %d channels", ssubscribeChannels.length);
  253. self.ssubscribe(ssubscribeChannels);
  254. }
  255. }
  256. }
  257. if (self.prevCommandQueue) {
  258. if (self.options.autoResendUnfulfilledCommands) {
  259. debug("resend %d unfulfilled commands", self.prevCommandQueue.length);
  260. while (self.prevCommandQueue.length > 0) {
  261. const item = self.prevCommandQueue.shift();
  262. if (item.select !== self.condition.select &&
  263. item.command.name !== "select") {
  264. self.select(item.select);
  265. }
  266. self.sendCommand(item.command, item.stream);
  267. }
  268. }
  269. else {
  270. self.prevCommandQueue = null;
  271. }
  272. }
  273. if (self.offlineQueue.length) {
  274. debug("send %d commands in offline queue", self.offlineQueue.length);
  275. const offlineQueue = self.offlineQueue;
  276. self.resetOfflineQueue();
  277. while (offlineQueue.length > 0) {
  278. const item = offlineQueue.shift();
  279. if (item.select !== self.condition.select &&
  280. item.command.name !== "select") {
  281. self.select(item.select);
  282. }
  283. self.sendCommand(item.command, item.stream);
  284. }
  285. }
  286. if (self.condition.select !== finalSelect) {
  287. debug("connect to db [%d]", finalSelect);
  288. self.select(finalSelect);
  289. }
  290. };
  291. }
  292. exports.readyHandler = readyHandler;