904118851 8 달 전
부모
커밋
2fc8116e09
2개의 변경된 파일61개의 추가작업 그리고 8개의 파일을 삭제
  1. 36 6
      MESSAGE_DISPATCH/MESSAGE_DISPATCH.js
  2. 25 2
      PULL_DATA_FACTORY/PULL_DATA.js

+ 36 - 6
MESSAGE_DISPATCH/MESSAGE_DISPATCH.js

@@ -140,6 +140,19 @@ async function sendMQMessage(message,routingKey = null) {
     }
 }
 
+async function sendPullMQMessage(message,routingKey = null) {
+    try {
+        if(routingKey!=null){
+            await rabbitMq.producerDirectMsg( message,"exchange_update_pull_config",routingKey);
+        }else{
+            await rabbitMq.producerDirectMsg( message,"exchange_update_pull_config");
+        }
+        console.log('消息发送成功');
+    } catch (error) {
+        console.error('发送消息失败:', error);
+    }
+}
+
 async function recvMessage(data){
     try {
         // 单个任务
@@ -164,13 +177,30 @@ async function recvMessage(data){
             case "updateBlackBooks":
                 CMD.updateBlackBooks()
             break;
+            case "addMain":
+                {
+                    const postData = {
+                        cmd:"filter_task",
+                        fun:json_msg.cmd,
+                        data:json_msg
+                    };
+                    sendMQMessage(postData,json_msg.cmd)
+                }
+
+            break;
+            case "updatePullConig":
+                {
+                    const postData = {
+                        cmd:"filter_task",
+                        fun:json_msg.cmd,
+                        data:json_msg
+                    };
+                    sendPullMQMessage(postData,json_msg.cmd)
+                }
+     
+            break;
             default:
-                const postData = {
-                    cmd:"filter_task",
-                    fun:json_msg.cmd,
-                    data:json_msg
-                };
-                sendMQMessage(postData,json_msg.cmd)
+     
                 // const client = new HttpClient({
                 //     timeout: 5000,
                 //     // headers: {

+ 25 - 2
PULL_DATA_FACTORY/PULL_DATA.js

@@ -10,17 +10,34 @@ const config = require('../etc/config.json');
 const DailyRecordManager = require('./daily_records');
 const schedule = require('node-schedule');
 const manager = new DailyRecordManager();
+const rabbitMq = require('../src/mq/rabbit-mq');
 var session = null
 var cur_page = 0
 var task_list = []
 var _24HourRanges = []
 var jobs = []
 var cur_day_data = []
+
+const messageHandler = async (msg) => {
+    // CMD.start_task(msg['data']['data'])
+    console.log("msg:",msg)
+};
+
+// 启动消费者
+async function startConsumer() {
+    try {
+        await rabbitMq.consumerDirectMsg(messageHandler,"exchange_update_pull_config","updatePullConig");
+    } catch (error) {
+        console.error('启动消费者失败:', error);
+    }
+}
+
 async function processTask(timeRange){
     try{
         if(cur_page==-1){
             throw "没有数据了"
         }
+        await redis_help.setKeyValue("pull_data_token",await CMD.getToken().result.token)
         let FilterConfig = await redis_help.getKeyValue("FilterConfig")
         FilterConfig = JSON.parse(FilterConfig)
         let response = await CMD.get_novel_material_list(cur_page)
@@ -148,6 +165,7 @@ CMD.getToken = async function(){
         password:config.pull_data_config.password,
         loginType:"OPENAPI"
     });
+    console.log("getToken:",response.data.result.token)
     return response.data
 }
 
@@ -191,9 +209,9 @@ CMD.init = async function(){
         }
     });
 
+    // await startConsumer();
     redis_help.connect(async ()=>{
         let response = await CMD.getToken()
-        console.log("getToken:",response.result.token)
         await redis_help.setKeyValue("pull_data_token",response.result.token)
         // processTask()
         _24HourRanges = CMD.generate24HourRanges()
@@ -458,8 +476,13 @@ CMD.init = async function(){
     return ranges;
 }
 
+CMD.stopScheduler = function() {
+    jobs.forEach(({ job }) => job.cancel());
+    jobs = [];
+    console.log('调度器已停止,所有任务已清除');
+}
 CMD.startScheduler = function(taskCallback) {
- 
+    CMD.stopScheduler()
     // 为每个时间点创建定时任务
     _24HourRanges.forEach(timeRange => {
         // 解析小时