|
@@ -2,10 +2,7 @@ const CMD = {}
|
|
|
const time_count = 5000;
|
|
|
const redis_help = require('../src/use_redis');
|
|
|
const origin_data_controllers = require('../src/data_manager/Controllers/origin_data_controllers');
|
|
|
-const pull_log_controllers = require('../src/data_manager/Controllers/pull_log_controllers');
|
|
|
-const helper = require('../src/helper');
|
|
|
const axios = require('axios')
|
|
|
-const HttpClient = require('../src/HttpClient')
|
|
|
const config = require('../etc/config.json');
|
|
|
const DailyRecordManager = require('./daily_records');
|
|
|
const schedule = require('node-schedule');
|
|
@@ -13,14 +10,60 @@ const manager = new DailyRecordManager();
|
|
|
const rabbitMq = require('../src/mq/rabbit-mq');
|
|
|
var session = null
|
|
|
var cur_page = 0
|
|
|
-var task_list = []
|
|
|
var _24HourRanges = []
|
|
|
var jobs = []
|
|
|
-var cur_day_data = []
|
|
|
-
|
|
|
+var cur_day_data = new Map()
|
|
|
+var cur_timeRange = null
|
|
|
const messageHandler = async (msg) => {
|
|
|
- // CMD.start_task(msg['data']['data'])
|
|
|
- console.log("msg:",msg)
|
|
|
+ let PullDataConfig = await redis_help.getKeyValue("PullDataConfig")
|
|
|
+ PullDataConfig = JSON.parse(PullDataConfig)
|
|
|
+ _24HourRanges = PullDataConfig
|
|
|
+ CMD.startScheduler(taskCallback)
|
|
|
+};
|
|
|
+
|
|
|
+const taskCallback = async (context) => {
|
|
|
+ var { timeRange, executionTime } = context;
|
|
|
+
|
|
|
+ console.log('执行任务:', {
|
|
|
+ 时间区间: timeRange.name,
|
|
|
+ 开始时间: timeRange.start,
|
|
|
+ 执行时间: executionTime
|
|
|
+ });
|
|
|
+ const today = new Date().toISOString().slice(0, 10);
|
|
|
+ cur_day_data.forEach((v,k)=>{
|
|
|
+ if(k==today){
|
|
|
+
|
|
|
+ }else{
|
|
|
+ cur_day_data.set(k, [])
|
|
|
+ }
|
|
|
+ })
|
|
|
+ if(!cur_day_data.has(today)){
|
|
|
+ cur_day_data.set(today,[])
|
|
|
+ }
|
|
|
+
|
|
|
+ if(!cur_day_data.get(today)[timeRange.start]){
|
|
|
+ cur_day_data.get(today)[timeRange.start] = {name:timeRange.name}
|
|
|
+ }
|
|
|
+ console.log("cur_day_data[today]:",cur_day_data.get(today),today)
|
|
|
+ const existingRecord = await manager.getRecord(today);
|
|
|
+
|
|
|
+ if (existingRecord) {
|
|
|
+ // 如果存在今天的记录,则更新
|
|
|
+ const result = await manager.updateRecord(today, cur_day_data.get(today));
|
|
|
+ console.log("更新记录结果:", result);
|
|
|
+ } else {
|
|
|
+ // 如果不存在今天的记录,则创建新记录
|
|
|
+ const result = await manager.createRecord(today, cur_day_data.get(today));
|
|
|
+ console.log("创建新记录结果:", result);
|
|
|
+ }
|
|
|
+ // const result = await manager.createRecord(today,cur_day_data[today]);
|
|
|
+ // if(result.success){
|
|
|
+
|
|
|
+ // }
|
|
|
+ cur_page = 0
|
|
|
+ cur_timeRange = timeRange
|
|
|
+ // 示例:实际任务逻辑
|
|
|
+ await processTask(timeRange);
|
|
|
};
|
|
|
|
|
|
// 启动消费者
|
|
@@ -50,6 +93,10 @@ async function processTask(timeRange){
|
|
|
cur_page = -1
|
|
|
throw "没有数据了"
|
|
|
}
|
|
|
+ if(response.result == undefined){
|
|
|
+ cur_page = -1
|
|
|
+ throw "没有数据了"
|
|
|
+ }
|
|
|
if(response.result.list == undefined){
|
|
|
cur_page = -1
|
|
|
throw "没有数据了"
|
|
@@ -59,16 +106,15 @@ async function processTask(timeRange){
|
|
|
throw "没有数据了"
|
|
|
}
|
|
|
if(timeRange!=null){
|
|
|
- const today = new Date().toISOString().split('T')[0];
|
|
|
- console.log("response.result:",response.result)
|
|
|
- if(cur_day_data[today][timeRange.start]){
|
|
|
+ const today = new Date().toISOString().slice(0, 10);
|
|
|
+ console.log("response.result:",response.result,today)
|
|
|
+ if(cur_day_data.get(today)[timeRange.start]){
|
|
|
console.log("response.result.count:",response.result.count)
|
|
|
- console.log("(cur_day_data[today][timeRange.start]:",cur_day_data[today][timeRange.start])
|
|
|
- cur_day_data[today][timeRange.start].count = response.result.count
|
|
|
+ console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[timeRange.start])
|
|
|
+ cur_day_data.get(today)[timeRange.start].count = response.result.count
|
|
|
}else{
|
|
|
console.log("cur_day_data空")
|
|
|
}
|
|
|
-
|
|
|
const updateResult = await manager.updateRecord(today,cur_day_data[today]);
|
|
|
}
|
|
|
|
|
@@ -125,7 +171,7 @@ async function processTask(timeRange){
|
|
|
// console.log("get_detail:",response)
|
|
|
// await pull_log_controllers.createData()
|
|
|
}catch(e){
|
|
|
- console.log("PULL_DATA: error:",e)
|
|
|
+ console.error("PULL_DATA: error:",e)
|
|
|
}finally{
|
|
|
if(cur_page!=-1){
|
|
|
cur_page++;
|
|
@@ -172,8 +218,14 @@ CMD.getToken = async function(){
|
|
|
|
|
|
CMD.get_novel_material_list = async function(page,size=500) {
|
|
|
const now = new Date(); // 当前时间
|
|
|
+
|
|
|
+ let interval_minute = 60;
|
|
|
+
|
|
|
+ if(cur_timeRange!=null){
|
|
|
+ interval_minute = cur_timeRange.interval_minute
|
|
|
+ }
|
|
|
// const fiveMinutesAgo = new Date(now - 5 * 60 * 1000); // 5分钟前的时间
|
|
|
- const fiveMinutesAgo = new Date(now - 60 * 60 * 1000); // 60分钟前的时间
|
|
|
+ const fiveMinutesAgo = new Date(now - interval_minute * interval_minute * 1000); // 60分钟前的时间
|
|
|
let data = {
|
|
|
startTime:fiveMinutesAgo.getTime(),
|
|
|
endTime:now.getTime()
|
|
@@ -212,51 +264,14 @@ CMD.init = async function(){
|
|
|
|
|
|
// await startConsumer();
|
|
|
redis_help.connect(async ()=>{
|
|
|
+ startConsumer()
|
|
|
+ let PullDataConfig = await redis_help.getKeyValue("PullDataConfig")
|
|
|
+ PullDataConfig = JSON.parse(PullDataConfig)
|
|
|
let response = await CMD.getToken()
|
|
|
await redis_help.setKeyValue("pull_data_token",response.result.token)
|
|
|
// processTask()
|
|
|
- _24HourRanges = CMD.generate24HourRanges()
|
|
|
+ _24HourRanges = PullDataConfig
|
|
|
// 定义任务回调函数
|
|
|
- const taskCallback = async (context) => {
|
|
|
- var { timeRange, executionTime } = context;
|
|
|
-
|
|
|
- console.log('执行任务:', {
|
|
|
- 时间区间: timeRange.name,
|
|
|
- 开始时间: timeRange.start,
|
|
|
- 执行时间: executionTime
|
|
|
- });
|
|
|
- const today = new Date().toISOString().split('T')[0];
|
|
|
- for (const key in cur_day_data) {
|
|
|
- if (Object.prototype.hasOwnProperty.call(cur_day_data, key)) {
|
|
|
- if(key==today){
|
|
|
-
|
|
|
- }else{
|
|
|
- cur_day_data[key] = []
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if(!cur_day_data[today]){
|
|
|
- cur_day_data[today] = {}
|
|
|
- }
|
|
|
-
|
|
|
- if(!cur_day_data[today][timeRange.start]){
|
|
|
- cur_day_data[today][timeRange.start] = {name:timeRange.name}
|
|
|
- }
|
|
|
- console.log("cur_day_data[today]:",cur_day_data[today])
|
|
|
- const result = await manager.createRecord(today,cur_day_data[today]);
|
|
|
- if(result.success){
|
|
|
-
|
|
|
- }
|
|
|
- cur_page = 0
|
|
|
- // 示例:实际任务逻辑
|
|
|
- await processTask(timeRange);
|
|
|
- };
|
|
|
-
|
|
|
- let key = new Date().toISOString().split('T')[0]
|
|
|
- let record_res = await manager.getRecord(key);
|
|
|
- if(record_res!=null){
|
|
|
- cur_day_data[key] = record_res.content
|
|
|
- }
|
|
|
// 启动调度器
|
|
|
CMD.startScheduler(taskCallback);
|
|
|
|