123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- const { fork } = require('child_process');
- const path = require('path');
- class ProcessManager {
- constructor() {
- // 存储所有worker进程
- this.workers = new Map();
- // 定义不同worker的配置
- this.workerConfigs = [
- {
- name: '0_RECEIVE_FACTORY', //筛选数据
- script: './0_RECEIVE_FACTORY/RECEIVE_FACTORY.js',
- args: ['--type=data'],
- restart: false // 是否在崩溃时重启
- },
- {
- name: '1_FILTER_FACTORY', //筛选数据
- script: './1_FILTER_FACTORY/FILTER_FACTORY.js',
- args: ['--type=data'],
- restart: false // 是否在崩溃时重启
- },
- {
- name: '2_PRODUCT_FACTORY',//书库筛选
- script: './2_PRODUCT_FACTORY/PRODUCT_FACTORY.js',
- args: ['--type=data'],
- restart: false
- },
- {
- name: '5_CREATE_LINK_FACTORY', //创建推广连接
- script: './5_CREATE_LINK_FACTORY/CREATE_LINK_FACTORY.js',
- args: ['--type=data'],
- restart: false
- },
- {
- name: '7_CHECK_AND_CREATE_URL_FACTORY',//创建检测连接
- script: './7_CHECK_AND_CREATE_URL_FACTORY/CHECK_AND_CREATE_URL_FACTORY.js',
- args: ['--type=data'],
- restart: false
- },
- {
- name: 'MESSAGE_DISPATCH', //消息中心
- script: './MESSAGE_DISPATCH/MESSAGE_DISPATCH.js',
- args: ['--type=data'],
- restart: false
- },
- ];
- }
- start() {
- // 启动所有worker
- this.workerConfigs.forEach(config => {
- this.createWorker(config);
- });
- // 处理主进程的退出
- process.on('SIGTERM', () => this.shutdown());
- process.on('SIGINT', () => this.shutdown());
- }
- createWorker(config) {
- const worker = fork(
- path.join(__dirname, '', config.script),
- config.args,
- {
- env: { ...process.env } // 可以添加额外的环境变量
- }
- );
- this.workers.set(config.name, {
- process: worker,
- config: config,
- startTime: Date.now()
- });
- // 监听工作进程消息
- worker.on('message', (message) => {
- this.handleWorkerMessage(config.name, message);
- });
- // 监听错误
- worker.on('error', (error) => {
- console.error(`[${config.name}] 错误:`, error);
- });
- // 监听退出
- worker.on('exit', (code, signal) => {
- console.log(`[${config.name}] 退出,代码: ${code}, 信号: ${signal}`);
-
- // 从workers中移除
- this.workers.delete(config.name);
- // 如果配置了重启且不是正常关闭,则重新启动
- if (config.restart && !this.isShuttingDown) {
- console.log(`[${config.name}] 正在重启...`);
- setTimeout(() => this.createWorker(config), 1000);
- }
- });
- console.log(`[${config.name}] 已启动,PID: ${worker.pid}`);
- return worker;
- }
- handleWorkerMessage(workerName, message) {
- console.log(`[${workerName}] 消息:`, message);
-
- // 处理来自worker的不同类型消息
- switch(message.type) {
- case 'status':
- this.updateWorkerStatus(workerName, message.data);
- break;
- case 'error':
- this.handleWorkerError(workerName, message.data);
- break;
- // ... 其他消息类型
- }
- }
- updateWorkerStatus(workerName, status) {
- const worker = this.workers.get(workerName);
- if (worker) {
- worker.status = status;
- }
- }
- handleWorkerError(workerName, error) {
- console.error(`[${workerName}] 报告错误:`, error);
- // 实现错误处理逻辑
- }
- getWorkerStatus(workerName) {
- return this.workers.get(workerName);
- }
- async shutdown() {
- this.isShuttingDown = true;
- console.log('开始关闭所有工作进程...');
- // 向所有worker发送关闭信号
- const shutdownPromises = Array.from(this.workers.entries()).map(
- async ([name, {process}]) => {
- console.log(`正在关闭 ${name}...`);
- process.send({ type: 'shutdown' });
-
- return new Promise((resolve) => {
- process.once('exit', resolve);
-
- // 设置超时强制结束
- setTimeout(() => {
- if (!process.killed) {
- console.log(`${name} 强制终止`);
- process.kill('SIGKILL');
- }
- }, 5000);
- });
- }
- );
- await Promise.all(shutdownPromises);
- console.log('所有工作进程已关闭');
- process.exit(0);
- }
- }
- // 启动进程管理器
- module.exports = ProcessManager;
|