const { fork } = require('child_process'); const path = require('path'); class ProcessManager { constructor() { // 存储所有worker进程 this.workers = new Map(); // 定义不同worker的配置 this.workerConfigs = [ { name: '0_RECEIVE_FACTORY', //筛选数据 script: './0_RECEIVE_FACTORY/RECEIVE_FACTORY.js', args: ['--type=data'], restart: false // 是否在崩溃时重启 }, { name: '1_FILTER_FACTORY', //筛选数据 script: './1_FILTER_FACTORY/FILTER_FACTORY.js', args: ['--type=data'], restart: false // 是否在崩溃时重启 }, { name: '2_PRODUCT_FACTORY',//书库筛选 script: './2_PRODUCT_FACTORY/PRODUCT_FACTORY.js', args: ['--type=data'], restart: false }, { name: '5_CREATE_LINK_FACTORY', //创建推广连接 script: './5_CREATE_LINK_FACTORY/CREATE_LINK_FACTORY.js', args: ['--type=data'], restart: false }, { name: '7_CHECK_AND_CREATE_URL_FACTORY',//创建检测连接 script: './7_CHECK_AND_CREATE_URL_FACTORY/CHECK_AND_CREATE_URL_FACTORY.js', args: ['--type=data'], restart: false }, { name: 'MESSAGE_DISPATCH', //消息中心 script: './MESSAGE_DISPATCH/MESSAGE_DISPATCH.js', args: ['--type=data'], restart: false }, ]; } start() { // 启动所有worker this.workerConfigs.forEach(config => { this.createWorker(config); }); // 处理主进程的退出 process.on('SIGTERM', () => this.shutdown()); process.on('SIGINT', () => this.shutdown()); } createWorker(config) { const worker = fork( path.join(__dirname, '', config.script), config.args, { env: { ...process.env } // 可以添加额外的环境变量 } ); this.workers.set(config.name, { process: worker, config: config, startTime: Date.now() }); // 监听工作进程消息 worker.on('message', (message) => { this.handleWorkerMessage(config.name, message); }); // 监听错误 worker.on('error', (error) => { console.error(`[${config.name}] 错误:`, error); }); // 监听退出 worker.on('exit', (code, signal) => { console.log(`[${config.name}] 退出,代码: ${code}, 信号: ${signal}`); // 从workers中移除 this.workers.delete(config.name); // 如果配置了重启且不是正常关闭,则重新启动 if (config.restart && !this.isShuttingDown) { console.log(`[${config.name}] 正在重启...`); setTimeout(() => this.createWorker(config), 1000); } }); console.log(`[${config.name}] 已启动,PID: ${worker.pid}`); return worker; } handleWorkerMessage(workerName, message) { console.log(`[${workerName}] 消息:`, message); // 处理来自worker的不同类型消息 switch(message.type) { case 'status': this.updateWorkerStatus(workerName, message.data); break; case 'error': this.handleWorkerError(workerName, message.data); break; // ... 其他消息类型 } } updateWorkerStatus(workerName, status) { const worker = this.workers.get(workerName); if (worker) { worker.status = status; } } handleWorkerError(workerName, error) { console.error(`[${workerName}] 报告错误:`, error); // 实现错误处理逻辑 } getWorkerStatus(workerName) { return this.workers.get(workerName); } async shutdown() { this.isShuttingDown = true; console.log('开始关闭所有工作进程...'); // 向所有worker发送关闭信号 const shutdownPromises = Array.from(this.workers.entries()).map( async ([name, {process}]) => { console.log(`正在关闭 ${name}...`); process.send({ type: 'shutdown' }); return new Promise((resolve) => { process.once('exit', resolve); // 设置超时强制结束 setTimeout(() => { if (!process.killed) { console.log(`${name} 强制终止`); process.kill('SIGKILL'); } }, 5000); }); } ); await Promise.all(shutdownPromises); console.log('所有工作进程已关闭'); process.exit(0); } } // 启动进程管理器 module.exports = ProcessManager;