const CMD = {} const redis_help = require('../src/use_redis'); const origin_data_controllers = require('../src/data_manager/Controllers/origin_data_controllers'); const config = require('../etc/config.json'); const DailyRecordManager = require('./daily_records'); const schedule = require('node-schedule'); const manager = new DailyRecordManager(); manager.init("daily_records2") const rabbitMq = require('../src/mq/rabbit-mq'); const helper = require('../src/helper'); const PullDataServiceNew = require('../src/PullDataServiceNew'); var _24HourRanges = [] var jobs = [] var cur_day_data = new Map() var cur_timeRange = null var pullDataService = new PullDataServiceNew(redis_help) var cur_page = 0 const messageHandler = async (msg) => { let PullDataConfig = await redis_help.getKeyValue("PullDataConfig2") PullDataConfig = JSON.parse(PullDataConfig) _24HourRanges = PullDataConfig CMD.startScheduler(taskCallback) }; const taskCallback = async (context) => { var { timeRange, executionTime } = context; console.log('执行任务:', { 时间区间: timeRange.name, 开始时间: timeRange.start, 执行时间: executionTime }); const today = helper.getLocalDate(); cur_day_data.forEach((v,k)=>{ if(k==today){ }else{ cur_day_data.set(k, {}) } }) if(!cur_day_data.has(today)){ cur_day_data.set(today,{}) } if(!cur_day_data.get(today)[timeRange.start]){ cur_day_data.get(today)[timeRange.start] = {name:timeRange.name,interval_minute:timeRange.interval_minute,finish_count:0} } console.log("cur_day_data[today]:",cur_day_data.get(today),today) const existingRecord = await manager.getRecord(today); if (existingRecord) { // 如果存在今天的记录,则更新 const result = await manager.updateRecord(today, cur_day_data.get(today)); console.log("更新记录结果:", result); } else { // 如果不存在今天的记录,则创建新记录 const result = await manager.createRecord(today, cur_day_data.get(today)); console.log("创建新记录结果:", result); } cur_timeRange = timeRange cur_timeRange.today = today cur_timeRange.finish_count = 0 cur_page = 0 await processTask(); }; // 启动消费者 async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler,"exchange_update_pull_config2","updatePullConig2"); } catch (error) { console.error('启动消费者失败:', error); } } function get_detail_by_materialId(list,materialId){ for (let index = 0; index < list.length; index++) { const element = list[index]; if(element.materialId==materialId){ return element } } return null } function generate_pull_time_data(list,material_list){ let pull_data = [] for (let index = 0; index < material_list.length; index++) { const element = material_list[index]; let info = get_detail_by_materialId(list,element.materialId) if(info!=null){ let new_data = { awemeId:info.awemeId, materialId:info.materialId, awemeUrl:info.awemeUrl, title:info.title, publishTime:info.publishTime, hasAnchorInfo:info.hasAnchorInfo, landingUrl:info.landingUrl, createTime:info.createTime, updateTime:info.updateTime, favoriteCount:element.favoriteCount.count, commentCount:element.commentCount.count, likeCount:element.likeCount.count, shareCount:element.shareCount.count } pull_data.push(new_data) } } sendPullDataMQMessage(JSON.stringify({cmd:"on_recv_pull_data",data:pull_data,today:cur_timeRange.today})) } async function process_material_list(response,page) { try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let materialId_list = [] let materialId_data_list = [] if(!response.success){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data == undefined||response.data == null){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data.list == undefined||response.data.list == null){ throw "没有数据了" } if(response.data.list.length<=0){ throw "没有数据了" } if(cur_timeRange!=null){ //统计 const today = cur_timeRange.today; // console.log("response.data:",response.data,today) if(cur_day_data.get(today)[cur_timeRange.start]){ console.log("response.data.count:",response.data.count) console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[cur_timeRange.start]) cur_timeRange.count = cur_timeRange.count + response.data.list.length cur_timeRange.cur_page = page cur_day_data.get(today)[cur_timeRange.start].start_time = cur_timeRange.start_time cur_day_data.get(today)[cur_timeRange.start].pull_day = cur_timeRange.pull_day cur_day_data.get(today)[cur_timeRange.start].pull_time = cur_timeRange.pull_time ||{} cur_day_data.get(today)[cur_timeRange.start].count = response.data.count cur_day_data.get(today)[cur_timeRange.start].cur_page = page }else{ console.log("cur_day_data空") } manager.updateRecord(today,cur_day_data.get(today)); } for (let index = 0; index < response.data.list.length; index++) { const origin_element = response.data.list[index]; materialId_list.push(origin_element.materialId) materialId_data_list[origin_element.materialId] = { kepp_num:origin_element.favoriteCount.count, comment_num:origin_element.commentCount.count, like_num:origin_element.likeCount.count, shared_num:origin_element.shareCount.count } } let result = await origin_data_controllers.findNonExistentMaterialIds(materialId_list) return {list:response.data.list,id_list:materialId_list,materialId_list:result.data.nonExistentIds,materialId_data_list:materialId_data_list,count:response.data.list.length} }catch(e){ console.log("materialId_list:",e) return null } } async function processTask(){ try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let response = await pullDataService.get_novel_material_list(cur_timeRange,cur_page,500) if(response.msg=="success"){ if(response.data.list.length<=0){ cur_page = -1; throw response } let last_data = response.data.list[response.data.list.length-1] cur_page = new Date(last_data["updateTime"]); let endTime = new Date(cur_timeRange.pull_time.endTime); console.log("endTime:",cur_page.getTime(),endTime.getTime()) if(cur_page.getTime()>=endTime.getTime()){ cur_page = -1; throw response } }else{ cur_page = -1; throw response } if(cur_timeRange!=null){ cur_timeRange.finish_count+=response.data.list.length const today = cur_timeRange.today; cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count await manager.updateRecord(today,cur_day_data.get(today)); } sendPullDataMQMessage(JSON.stringify({cmd:"on_recv_pull_data",data:response.data.list,today:cur_timeRange.today})) // if(detail_item.materialId_list.length<=0){ // }else{ // await processDetailTask(detail_item) // } console.log("processTask over!") }catch(e){ console.error("PULL_DATA: error:",e) }finally{ if(cur_page!=-1){ cur_page++; global.setTimeout(processTask,1000) } } } CMD.subTitle = function(title){ if(title.length>299){ return title.substring(0, 299); } return title } CMD.isRight = function(FilterConfig,origin_element){ if(origin_element.likeCount.count < FilterConfig.like_num){// console.log("点赞数正确") return false } if(origin_element.favoriteCount.count < FilterConfig.kepp_num){ // console.log("收藏数正确") return false } if(origin_element.shareCount.count < FilterConfig.shared_num){ // console.log("分享数正确") return false } if(origin_element.commentCount.count < FilterConfig.comment_num){ // console.log("评论") return false } return true } CMD.init = async function(){ // await startConsumer(); redis_help.connect(async ()=>{ startConsumer() let PullDataConfig = await redis_help.getKeyValue("PullDataConfig2") PullDataConfig = JSON.parse(PullDataConfig) // processTask() _24HourRanges = PullDataConfig // 定义任务回调函数 let key = helper.getLocalDate(); let record_res = await manager.getRecord(key); if(record_res!=null){ if(!cur_day_data.has(key)){ cur_day_data.set(key,record_res.content) } } // 启动调度器 CMD.startScheduler(taskCallback); }) } // 生成24小时的时间区间 CMD.generate24HourRanges = function() { return {}; } CMD.stopScheduler = function() { jobs.forEach(({ job }) => job.cancel()); jobs = []; console.log('调度器已停止,所有任务已清除'); } CMD.startScheduler = function(taskCallback) { CMD.stopScheduler() // 为每个时间点创建定时任务 console.error("_24HourRanges:",_24HourRanges) if(Object.keys(_24HourRanges).length<=0){ return } _24HourRanges.forEach(timeRange => { // 解析小时 const hour = parseInt(timeRange.start.split(':')[0]); const minute = parseInt(timeRange.start.split(':')[1]); // 创建定时任务 - 在每天的指定小时整点执行 const job = schedule.scheduleJob(`${minute} ${hour} * * *`, async () => { try { console.log(`开始执行任务: ${timeRange.name} ${timeRange.start}`); // 执行回调函数 await taskCallback({ timeRange, executionTime: new Date() }); } catch (error) { // 记录失败 console.error('任务执行错误:', error); } }); jobs.push({ job, timeRange }); console.log(`已安排任务: ${timeRange.name} ${timeRange.start}`); }); console.log(`调度器已启动,共设置 ${jobs.length} 个定时任务`); } async function sendPullDataMQMessage(message,routingKey = "on_recv_pull_data") { try { if(routingKey!=null){ await rabbitMq.producerDirectMsg( message,"exchange_pull_data_system_new",routingKey); }else{ await rabbitMq.producerDirectMsg( message,"exchange_pull_data_system_new"); } console.log('消息发送成功'); } catch (error) { console.error('发送消息失败:', error); } } // if(config.isDebug){ CMD.init() // }