const CMD = {} const redis_help = require('../src/use_redis'); const origin_data_controllers = require('../src/data_manager/Controllers/origin_data_controllers'); const config = require('../etc/config.json'); const DailyRecordManager = require('./daily_records'); const schedule = require('node-schedule'); const manager = new DailyRecordManager(); const rabbitMq = require('../src/mq/rabbit-mq'); const helper = require('../src/helper'); const PullDataServiceNew = require('../src/PullDataServiceNew'); var _24HourRanges = [] var jobs = [] var cur_day_data = new Map() var cur_timeRange = null var pullDataService = new PullDataServiceNew(redis_help) var cur_page = 0 const messageHandler = async (msg) => { let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) _24HourRanges = PullDataConfig CMD.startScheduler(taskCallback) }; const taskCallback = async (context) => { var { timeRange, executionTime } = context; console.log('执行任务:', { 时间区间: timeRange.name, 开始时间: timeRange.start, 执行时间: executionTime }); const today = helper.getLocalDate(); cur_day_data.forEach((v,k)=>{ if(k==today){ }else{ cur_day_data.set(k, {}) } }) if(!cur_day_data.has(today)){ cur_day_data.set(today,{}) } if(!cur_day_data.get(today)[timeRange.start]){ cur_day_data.get(today)[timeRange.start] = {name:timeRange.name,interval_minute:timeRange.interval_minute,finish_count:0} } console.log("cur_day_data[today]:",cur_day_data.get(today),today) const existingRecord = await manager.getRecord(today); if (existingRecord) { // 如果存在今天的记录,则更新 const result = await manager.updateRecord(today, cur_day_data.get(today)); console.log("更新记录结果:", result); } else { // 如果不存在今天的记录,则创建新记录 const result = await manager.createRecord(today, cur_day_data.get(today)); console.log("创建新记录结果:", result); } cur_timeRange = timeRange cur_timeRange.today = today cur_timeRange.finish_count = 0 cur_page = 0 await processTask(); }; // 启动消费者 async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler,"exchange_update_pull_config","updatePullConig"); } catch (error) { console.error('启动消费者失败:', error); } } function get_detail_by_materialId(list,materialId){ for (let index = 0; index < list.length; index++) { const element = list[index]; if(element.materialId==materialId){ return element } } return null } function generate_pull_time_data(list,material_list){ let pull_data = [] for (let index = 0; index < material_list.length; index++) { const element = material_list[index]; let info = get_detail_by_materialId(list,element.materialId) if(info!=null){ let new_data = { awemeId:info.awemeId, materialId:info.materialId, awemeUrl:info.awemeUrl, title:info.title, publishTime:info.publishTime, hasAnchorInfo:info.hasAnchorInfo, landingUrl:info.landingUrl, createTime:info.createTime, updateTime:info.updateTime, favoriteCount:element.favoriteCount.count, commentCount:element.commentCount.count, likeCount:element.likeCount.count, shareCount:element.shareCount.count } pull_data.push(new_data) } } sendPullDataMQMessage(JSON.stringify({cmd:"on_recv_pull_data",data:pull_data,today:cur_timeRange.today})) } async function process_material_list(response,page) { try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let materialId_list = [] let materialId_data_list = [] if(!response.success){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data == undefined||response.data == null){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data.list == undefined||response.data.list == null){ throw "没有数据了" } if(response.data.list.length<=0){ throw "没有数据了" } if(cur_timeRange!=null){ //统计 const today = cur_timeRange.today; // console.log("response.data:",response.data,today) if(cur_day_data.get(today)[cur_timeRange.start]){ console.log("response.data.count:",response.data.count) console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[cur_timeRange.start]) cur_timeRange.count = cur_timeRange.count + response.data.list.length cur_timeRange.cur_page = page cur_day_data.get(today)[cur_timeRange.start].start_time = cur_timeRange.start_time cur_day_data.get(today)[cur_timeRange.start].pull_day = cur_timeRange.pull_day cur_day_data.get(today)[cur_timeRange.start].pull_time = cur_timeRange.pull_time ||{} cur_day_data.get(today)[cur_timeRange.start].count = response.data.count cur_day_data.get(today)[cur_timeRange.start].cur_page = page }else{ console.log("cur_day_data空") } manager.updateRecord(today,cur_day_data.get(today)); } for (let index = 0; index < response.data.list.length; index++) { const origin_element = response.data.list[index]; materialId_list.push(origin_element.materialId) materialId_data_list[origin_element.materialId] = { kepp_num:origin_element.favoriteCount.count, comment_num:origin_element.commentCount.count, like_num:origin_element.likeCount.count, shared_num:origin_element.shareCount.count } } let result = await origin_data_controllers.findNonExistentMaterialIds(materialId_list) return {list:response.data.list,id_list:materialId_list,materialId_list:result.data.nonExistentIds,materialId_data_list:materialId_data_list,count:response.data.list.length} }catch(e){ console.log("materialId_list:",e) return null } } async function processTask(){ try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let response = await pullDataService.get_novel_material_list(cur_timeRange,cur_page,500) if(response.msg=="success"){ if(response.data.list.length<=0){ cur_page = -1; throw response } let last_data = response.data.list[response.data.list.length-1] cur_page = new Date(last_data["updateTime"]); let endTime = new Date(cur_timeRange.pull_day); if(cur_page>=endTime){ cur_page = -1; throw response } }else{ cur_page = -1; throw response } sendPullDataMQMessage(JSON.stringify({cmd:"on_recv_pull_data",data:response.data.list,today:cur_timeRange.today})) // if(detail_item.materialId_list.length<=0){ // }else{ // await processDetailTask(detail_item) // } console.log("processTask over!") }catch(e){ console.error("PULL_DATA: error:",e) }finally{ if(cur_page!=-1){ cur_page++; global.setTimeout(processTask,1000) } } } CMD.subTitle = function(title){ if(title.length>299){ return title.substring(0, 299); } return title } CMD.isRight = function(FilterConfig,origin_element){ if(origin_element.likeCount.count < FilterConfig.like_num){// console.log("点赞数正确") return false } if(origin_element.favoriteCount.count < FilterConfig.kepp_num){ // console.log("收藏数正确") return false } if(origin_element.shareCount.count < FilterConfig.shared_num){ // console.log("分享数正确") return false } if(origin_element.commentCount.count < FilterConfig.comment_num){ // console.log("评论") return false } return true } CMD.init = async function(){ // await startConsumer(); redis_help.connect(async ()=>{ startConsumer() let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) // processTask() _24HourRanges = PullDataConfig // 定义任务回调函数 let key = helper.getLocalDate(); let record_res = await manager.getRecord(key); if(record_res!=null){ if(!cur_day_data.has(key)){ cur_day_data.set(key,record_res.content) } } // 启动调度器 CMD.startScheduler(taskCallback); }) } // 生成24小时的时间区间 CMD.generate24HourRanges = function() { // const ranges = []; // const today = new Date(); // today.setMinutes(0); // today.setSeconds(0); // today.setMilliseconds(0); // for (let hour = 0; hour < 24; hour++) { // const timeString = `${hour.toString().padStart(2, '0')}:00`; // let periodName = ''; // // 设置当天的小时 // today.setHours(hour); // const timestamp = today.getTime(); // // 根据时间划分时段 // if (hour >= 0 && hour < 6) { // periodName = '凌晨区间'; // } else if (hour >= 6 && hour < 9) { // periodName = '早晨区间'; // } else if (hour >= 9 && hour < 12) { // periodName = '上午区间'; // } else if (hour >= 12 && hour < 14) { // periodName = '中午区间'; // } else if (hour >= 14 && hour < 18) { // periodName = '下午区间'; // } else if (hour >= 18 && hour < 22) { // periodName = '晚上区间'; // } else { // periodName = '深夜区间'; // } // ranges.push({ // name: periodName, // start: timeString, // timestamp: timestamp, // hour:hour, // timeFormat: new Date(timestamp).toLocaleString() // 可读的时间格式 // }); // } let ranges = [ { name: '凌晨区间', start: '00:00', timestamp: 1733760000000, hour: 0, timeFormat: '12/10/2024, 12:00:00 AM' }, { name: '凌晨区间', start: '01:00', timestamp: 1733763600000, hour: 1, timeFormat: '12/10/2024, 1:00:00 AM' }, { name: '凌晨区间', start: '02:00', timestamp: 1733767200000, hour: 2, timeFormat: '12/10/2024, 2:00:00 AM' }, { name: '凌晨区间', start: '03:00', timestamp: 1733770800000, hour: 3, timeFormat: '12/10/2024, 3:00:00 AM' }, { name: '凌晨区间', start: '04:00', timestamp: 1733774400000, hour: 4, timeFormat: '12/10/2024, 4:00:00 AM' }, { name: '凌晨区间', start: '05:00', timestamp: 1733778000000, hour: 5, timeFormat: '12/10/2024, 5:00:00 AM' }, { name: '早晨区间', start: '06:00', timestamp: 1733781600000, hour: 6, timeFormat: '12/10/2024, 6:00:00 AM' }, { name: '早晨区间', start: '07:00', timestamp: 1733785200000, hour: 7, timeFormat: '12/10/2024, 7:00:00 AM' }, { name: '早晨区间', start: '08:00', timestamp: 1733788800000, hour: 8, timeFormat: '12/10/2024, 8:00:00 AM' }, { name: '上午区间', start: '09:00', timestamp: 1733792400000, hour: 9, timeFormat: '12/10/2024, 9:00:00 AM' }, { name: '上午区间', start: '10:00', timestamp: 1733796000000, hour: 10, timeFormat: '12/10/2024, 10:00:00 AM' }, { name: '上午区间', start: '11:00', timestamp: 1733799600000, hour: 11, timeFormat: '12/10/2024, 11:00:00 AM' }, { name: '中午区间', start: '12:00', timestamp: 1733803200000, hour: 12, timeFormat: '12/10/2024, 12:00:00 PM' }, { name: '中午区间', start: '13:00', timestamp: 1733806800000, hour: 13, timeFormat: '12/10/2024, 1:00:00 PM' }, { name: '下午区间', start: '14:00', timestamp: 1733810400000, hour: 14, timeFormat: '12/10/2024, 2:00:00 PM' }, { name: '下午区间', start: '15:00', timestamp: 1733814000000, hour: 15, timeFormat: '12/10/2024, 3:00:00 PM' }, { name: '下午区间', start: '16:00', timestamp: 1733817600000, hour: 16, timeFormat: '12/10/2024, 4:00:00 PM' }, { name: '下午区间', start: '17:00', timestamp: 1733821200000, hour: 17, timeFormat: '12/10/2024, 5:00:00 PM' }, { name: '晚上区间', start: '18:00', timestamp: 1733824800000, hour: 18, timeFormat: '12/10/2024, 6:00:00 PM' }, { name: '晚上区间', start: '19:00', timestamp: 1733828400000, hour: 19, timeFormat: '12/10/2024, 7:00:00 PM' }, { name: '晚上区间', start: '20:45', timestamp: 1733832000000, hour: 20, timeFormat: '12/10/2024, 8:00:00 PM' }, { name: '晚上区间', start: '21:00', timestamp: 1733835600000, hour: 21, timeFormat: '12/10/2024, 9:00:00 PM' }, { name: '深夜区间', start: '22:00', timestamp: 1733839200000, hour: 22, timeFormat: '12/10/2024, 10:00:00 PM' }, { name: '深夜区间', start: '23:00', timestamp: 1733842800000, hour: 23, timeFormat: '12/10/2024, 11:00:00 PM' } ] return ranges; } CMD.stopScheduler = function() { jobs.forEach(({ job }) => job.cancel()); jobs = []; console.log('调度器已停止,所有任务已清除'); } CMD.startScheduler = function(taskCallback) { CMD.stopScheduler() // 为每个时间点创建定时任务 _24HourRanges.forEach(timeRange => { // 解析小时 const hour = parseInt(timeRange.start.split(':')[0]); const minute = parseInt(timeRange.start.split(':')[1]); // 创建定时任务 - 在每天的指定小时整点执行 const job = schedule.scheduleJob(`${minute} ${hour} * * *`, async () => { try { console.log(`开始执行任务: ${timeRange.name} ${timeRange.start}`); // 执行回调函数 await taskCallback({ timeRange, executionTime: new Date() }); } catch (error) { // 记录失败 console.error('任务执行错误:', error); } }); jobs.push({ job, timeRange }); console.log(`已安排任务: ${timeRange.name} ${timeRange.start}`); }); console.log(`调度器已启动,共设置 ${jobs.length} 个定时任务`); } async function sendPullDataMQMessage(message,routingKey = "on_recv_pull_data") { try { if(routingKey!=null){ await rabbitMq.producerDirectMsg( message,"exchange_pull_data_system_new",routingKey); }else{ await rabbitMq.producerDirectMsg( message,"exchange_pull_data_system_new"); } console.log('消息发送成功'); } catch (error) { console.error('发送消息失败:', error); } } if(config.isDebug){ // CMD.init() }