ProcessManager.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. const { fork } = require('child_process');
  2. const path = require('path');
  3. class ProcessManager {
  4. constructor() {
  5. // 存储所有worker进程
  6. this.workers = new Map();
  7. // 定义不同worker的配置
  8. this.workerConfigs = [
  9. {
  10. name: '1_FILTER_FACTORY', //筛选数据
  11. script: './1_FILTER_FACTORY/FILTER_FACTORY.js',
  12. args: ['--type=data'],
  13. restart: true // 是否在崩溃时重启
  14. },
  15. {
  16. name: '2_PRODUCT_FACTORY',//书库筛选
  17. script: './2_PRODUCT_FACTORY/PRODUCT_FACTORY.js',
  18. args: ['--type=data'],
  19. restart: true
  20. },
  21. {
  22. name: '5_CREATE_LINK_FACTORY', //创建推广连接
  23. script: './5_CREATE_LINK_FACTORY/CREATE_LINK_FACTORY.js',
  24. args: ['--type=data'],
  25. restart: true
  26. },
  27. {
  28. name: '7_CHECK_AND_CREATE_URL_FACTORY',//创建检测连接
  29. script: './7_CHECK_AND_CREATE_URL_FACTORY/CHECK_AND_CREATE_URL_FACTORY.js',
  30. args: ['--type=data'],
  31. restart: true
  32. },
  33. {
  34. name: 'MESSAGE_DISPATCH', //消息中心
  35. script: './MESSAGE_DISPATCH/MESSAGE_DISPATCH.js',
  36. args: ['--type=data'],
  37. restart: true
  38. },
  39. ];
  40. }
  41. start() {
  42. // 启动所有worker
  43. this.workerConfigs.forEach(config => {
  44. this.createWorker(config);
  45. });
  46. // 处理主进程的退出
  47. process.on('SIGTERM', () => this.shutdown());
  48. process.on('SIGINT', () => this.shutdown());
  49. }
  50. createWorker(config) {
  51. const worker = fork(
  52. path.join(__dirname, 'workers', config.script),
  53. config.args,
  54. {
  55. env: { ...process.env } // 可以添加额外的环境变量
  56. }
  57. );
  58. this.workers.set(config.name, {
  59. process: worker,
  60. config: config,
  61. startTime: Date.now()
  62. });
  63. // 监听工作进程消息
  64. worker.on('message', (message) => {
  65. this.handleWorkerMessage(config.name, message);
  66. });
  67. // 监听错误
  68. worker.on('error', (error) => {
  69. console.error(`[${config.name}] 错误:`, error);
  70. });
  71. // 监听退出
  72. worker.on('exit', (code, signal) => {
  73. console.log(`[${config.name}] 退出,代码: ${code}, 信号: ${signal}`);
  74. // 从workers中移除
  75. this.workers.delete(config.name);
  76. // 如果配置了重启且不是正常关闭,则重新启动
  77. if (config.restart && !this.isShuttingDown) {
  78. console.log(`[${config.name}] 正在重启...`);
  79. setTimeout(() => this.createWorker(config), 1000);
  80. }
  81. });
  82. console.log(`[${config.name}] 已启动,PID: ${worker.pid}`);
  83. return worker;
  84. }
  85. handleWorkerMessage(workerName, message) {
  86. console.log(`[${workerName}] 消息:`, message);
  87. // 处理来自worker的不同类型消息
  88. switch(message.type) {
  89. case 'status':
  90. this.updateWorkerStatus(workerName, message.data);
  91. break;
  92. case 'error':
  93. this.handleWorkerError(workerName, message.data);
  94. break;
  95. // ... 其他消息类型
  96. }
  97. }
  98. updateWorkerStatus(workerName, status) {
  99. const worker = this.workers.get(workerName);
  100. if (worker) {
  101. worker.status = status;
  102. }
  103. }
  104. handleWorkerError(workerName, error) {
  105. console.error(`[${workerName}] 报告错误:`, error);
  106. // 实现错误处理逻辑
  107. }
  108. getWorkerStatus(workerName) {
  109. return this.workers.get(workerName);
  110. }
  111. async shutdown() {
  112. this.isShuttingDown = true;
  113. console.log('开始关闭所有工作进程...');
  114. // 向所有worker发送关闭信号
  115. const shutdownPromises = Array.from(this.workers.entries()).map(
  116. async ([name, {process}]) => {
  117. console.log(`正在关闭 ${name}...`);
  118. process.send({ type: 'shutdown' });
  119. return new Promise((resolve) => {
  120. process.once('exit', resolve);
  121. // 设置超时强制结束
  122. setTimeout(() => {
  123. if (!process.killed) {
  124. console.log(`${name} 强制终止`);
  125. process.kill('SIGKILL');
  126. }
  127. }, 5000);
  128. });
  129. }
  130. );
  131. await Promise.all(shutdownPromises);
  132. console.log('所有工作进程已关闭');
  133. process.exit(0);
  134. }
  135. }
  136. // 启动进程管理器
  137. module.exports = ProcessManager;