DataHandler.js 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. const Command_1 = require("./Command");
  4. const utils_1 = require("./utils");
  5. const RedisParser = require("redis-parser");
  6. const SubscriptionSet_1 = require("./SubscriptionSet");
  7. const debug = (0, utils_1.Debug)("dataHandler");
  8. class DataHandler {
  9. constructor(redis, parserOptions) {
  10. this.redis = redis;
  11. const parser = new RedisParser({
  12. stringNumbers: parserOptions.stringNumbers,
  13. returnBuffers: true,
  14. returnError: (err) => {
  15. this.returnError(err);
  16. },
  17. returnFatalError: (err) => {
  18. this.returnFatalError(err);
  19. },
  20. returnReply: (reply) => {
  21. this.returnReply(reply);
  22. },
  23. });
  24. redis.stream.on("data", (data) => {
  25. parser.execute(data);
  26. });
  27. }
  28. returnFatalError(err) {
  29. err.message += ". Please report this.";
  30. this.redis.recoverFromFatalError(err, err, { offlineQueue: false });
  31. }
  32. returnError(err) {
  33. const item = this.shiftCommand(err);
  34. if (!item) {
  35. return;
  36. }
  37. err.command = {
  38. name: item.command.name,
  39. args: item.command.args,
  40. };
  41. this.redis.handleReconnection(err, item);
  42. }
  43. returnReply(reply) {
  44. if (this.handleMonitorReply(reply)) {
  45. return;
  46. }
  47. if (this.handleSubscriberReply(reply)) {
  48. return;
  49. }
  50. const item = this.shiftCommand(reply);
  51. if (!item) {
  52. return;
  53. }
  54. if (Command_1.default.checkFlag("ENTER_SUBSCRIBER_MODE", item.command.name)) {
  55. this.redis.condition.subscriber = new SubscriptionSet_1.default();
  56. this.redis.condition.subscriber.add(item.command.name, reply[1].toString());
  57. if (!fillSubCommand(item.command, reply[2])) {
  58. this.redis.commandQueue.unshift(item);
  59. }
  60. }
  61. else if (Command_1.default.checkFlag("EXIT_SUBSCRIBER_MODE", item.command.name)) {
  62. if (!fillUnsubCommand(item.command, reply[2])) {
  63. this.redis.commandQueue.unshift(item);
  64. }
  65. }
  66. else {
  67. item.command.resolve(reply);
  68. }
  69. }
  70. handleSubscriberReply(reply) {
  71. if (!this.redis.condition.subscriber) {
  72. return false;
  73. }
  74. const replyType = Array.isArray(reply) ? reply[0].toString() : null;
  75. debug('receive reply "%s" in subscriber mode', replyType);
  76. switch (replyType) {
  77. case "message":
  78. if (this.redis.listeners("message").length > 0) {
  79. // Check if there're listeners to avoid unnecessary `toString()`.
  80. this.redis.emit("message", reply[1].toString(), reply[2] ? reply[2].toString() : "");
  81. }
  82. this.redis.emit("messageBuffer", reply[1], reply[2]);
  83. break;
  84. case "pmessage": {
  85. const pattern = reply[1].toString();
  86. if (this.redis.listeners("pmessage").length > 0) {
  87. this.redis.emit("pmessage", pattern, reply[2].toString(), reply[3].toString());
  88. }
  89. this.redis.emit("pmessageBuffer", pattern, reply[2], reply[3]);
  90. break;
  91. }
  92. case "smessage": {
  93. if (this.redis.listeners("smessage").length > 0) {
  94. this.redis.emit("smessage", reply[1].toString(), reply[2] ? reply[2].toString() : "");
  95. }
  96. this.redis.emit("smessageBuffer", reply[1], reply[2]);
  97. break;
  98. }
  99. case "ssubscribe":
  100. case "subscribe":
  101. case "psubscribe": {
  102. const channel = reply[1].toString();
  103. this.redis.condition.subscriber.add(replyType, channel);
  104. const item = this.shiftCommand(reply);
  105. if (!item) {
  106. return;
  107. }
  108. if (!fillSubCommand(item.command, reply[2])) {
  109. this.redis.commandQueue.unshift(item);
  110. }
  111. break;
  112. }
  113. case "sunsubscribe":
  114. case "unsubscribe":
  115. case "punsubscribe": {
  116. const channel = reply[1] ? reply[1].toString() : null;
  117. if (channel) {
  118. this.redis.condition.subscriber.del(replyType, channel);
  119. }
  120. const count = reply[2];
  121. if (Number(count) === 0) {
  122. this.redis.condition.subscriber = false;
  123. }
  124. const item = this.shiftCommand(reply);
  125. if (!item) {
  126. return;
  127. }
  128. if (!fillUnsubCommand(item.command, count)) {
  129. this.redis.commandQueue.unshift(item);
  130. }
  131. break;
  132. }
  133. default: {
  134. const item = this.shiftCommand(reply);
  135. if (!item) {
  136. return;
  137. }
  138. item.command.resolve(reply);
  139. }
  140. }
  141. return true;
  142. }
  143. handleMonitorReply(reply) {
  144. if (this.redis.status !== "monitoring") {
  145. return false;
  146. }
  147. const replyStr = reply.toString();
  148. if (replyStr === "OK") {
  149. // Valid commands in the monitoring mode are AUTH and MONITOR,
  150. // both of which always reply with 'OK'.
  151. // So if we got an 'OK', we can make certain that
  152. // the reply is made to AUTH & MONITOR.
  153. return false;
  154. }
  155. // Since commands sent in the monitoring mode will trigger an exception,
  156. // any replies we received in the monitoring mode should consider to be
  157. // realtime monitor data instead of result of commands.
  158. const len = replyStr.indexOf(" ");
  159. const timestamp = replyStr.slice(0, len);
  160. const argIndex = replyStr.indexOf('"');
  161. const args = replyStr
  162. .slice(argIndex + 1, -1)
  163. .split('" "')
  164. .map((elem) => elem.replace(/\\"/g, '"'));
  165. const dbAndSource = replyStr.slice(len + 2, argIndex - 2).split(" ");
  166. this.redis.emit("monitor", timestamp, args, dbAndSource[1], dbAndSource[0]);
  167. return true;
  168. }
  169. shiftCommand(reply) {
  170. const item = this.redis.commandQueue.shift();
  171. if (!item) {
  172. const message = "Command queue state error. If you can reproduce this, please report it.";
  173. const error = new Error(message +
  174. (reply instanceof Error
  175. ? ` Last error: ${reply.message}`
  176. : ` Last reply: ${reply.toString()}`));
  177. this.redis.emit("error", error);
  178. return null;
  179. }
  180. return item;
  181. }
  182. }
  183. exports.default = DataHandler;
  184. const remainingRepliesMap = new WeakMap();
  185. function fillSubCommand(command, count) {
  186. let remainingReplies = remainingRepliesMap.has(command)
  187. ? remainingRepliesMap.get(command)
  188. : command.args.length;
  189. remainingReplies -= 1;
  190. if (remainingReplies <= 0) {
  191. command.resolve(count);
  192. remainingRepliesMap.delete(command);
  193. return true;
  194. }
  195. remainingRepliesMap.set(command, remainingReplies);
  196. return false;
  197. }
  198. function fillUnsubCommand(command, count) {
  199. let remainingReplies = remainingRepliesMap.has(command)
  200. ? remainingRepliesMap.get(command)
  201. : command.args.length;
  202. if (remainingReplies === 0) {
  203. if (Number(count) === 0) {
  204. remainingRepliesMap.delete(command);
  205. command.resolve(count);
  206. return true;
  207. }
  208. return false;
  209. }
  210. remainingReplies -= 1;
  211. if (remainingReplies <= 0) {
  212. command.resolve(count);
  213. return true;
  214. }
  215. remainingRepliesMap.set(command, remainingReplies);
  216. return false;
  217. }