const CMD = {} const time_count = 1000; const redis_help = require('../src/use_redis'); const origin_data_controllers = require('../src/data_manager/Controllers/origin_data_controllers'); const axios = require('axios') const config = require('../etc/config.json'); const DailyRecordManager = require('./daily_records'); const schedule = require('node-schedule'); const manager = new DailyRecordManager(); const rabbitMq = require('../src/mq/rabbit-mq'); const helper = require('../src/helper'); const PullDataService = require('../src/PullDataService'); const pull_data_task_item = require('./pull_data_task_item'); var _24HourRanges = [] var jobs = [] var cur_day_data = new Map() var pull_data_task_map = [] var cur_timeRange = null var pullDataService = new PullDataService(redis_help) var detail_task = [] var task_params = [] const messageHandler = async (msg) => { let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) _24HourRanges = PullDataConfig CMD.startScheduler(taskCallback) }; const get_pull_data_task = function(){ for (let index = 0; index < pull_data_task_map.length; index++) { const element = pull_data_task_map[index]; if(!element.isRunTask()){ return element } } let item = new pull_data_task_item(redis_help) pull_data_task_map.push(item) return item } const taskCallback = async (context) => { var { timeRange, executionTime } = context; console.log('执行任务:', { 时间区间: timeRange.name, 开始时间: timeRange.start, 执行时间: executionTime }); const today = helper.getLocalDate(); cur_day_data.forEach((v,k)=>{ if(k==today){ }else{ cur_day_data.set(k, {}) } }) if(!cur_day_data.has(today)){ cur_day_data.set(today,{}) } if(!cur_day_data.get(today)[timeRange.start]){ cur_day_data.get(today)[timeRange.start] = {name:timeRange.name,interval_minute:timeRange.interval_minute,finish_count:0} } console.log("cur_day_data[today]:",cur_day_data.get(today),today) const existingRecord = await manager.getRecord(today); if (existingRecord) { // 如果存在今天的记录,则更新 const result = await manager.updateRecord(today, cur_day_data.get(today)); console.log("更新记录结果:", result); } else { // 如果不存在今天的记录,则创建新记录 const result = await manager.createRecord(today, cur_day_data.get(today)); console.log("创建新记录结果:", result); } // const result = await manager.createRecord(today,cur_day_data[today]); // if(result.success){ // } cur_timeRange = timeRange cur_timeRange.finish_count = 0 process_update_token() // 示例:实际任务逻辑 await processTask(timeRange); }; // 启动消费者 async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler,"exchange_update_pull_config","updatePullConig"); } catch (error) { console.error('启动消费者失败:', error); } } async function processDetailTask() { try{ if(detail_task.length>0){ console.log("processDetailTask") let task_item = detail_task.shift() let materialId_list = task_item.materialId_list let materialId_data_list = task_item.materialId_data_list let response = await pullDataService.get_detail(materialId_list) if(!response.success){ console.log("get_detail:",response) throw {msg:response,timeRange:cur_timeRange,fun:"get_detail"} } if(cur_timeRange!=null){ cur_timeRange.finish_count+=response.data.list.length const today = helper.getLocalDate(); cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count await manager.updateRecord(today,cur_day_data.get(today)); } for (let index = 0; index < response.data.list.length; index++) { const element = response.data.list[index]; const info = materialId_data_list[element.materialId] if(element.hasAnchorInfo){ await origin_data_controllers.createOriginData( { video_id:element.awemeId, materialId:element.materialId, video_link:element.awemeUrl, title:CMD.subTitle(element.title), publish_time:new Date(element.publishTime), kepp_num:info.kepp_num, comment_num:info.comment_num, like_num:info.like_num, shared_num:info.shared_num, is_guajian:1, guajian_link:element.landingUrl, status:0, createTime:info.createTime, updateTime:info.updateTime } ) } } console.log(" finish processDetailTask") } }catch (error) { console.error("processDetailTask:",error) }finally{ global.setTimeout(processDetailTask,500) } } async function process_material_list(response,page,timeRange) { let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let materialId_list = [] let materialId_data_list = [] if(!response.success){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data == undefined||response.data == null){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data.list == undefined||response.data.list == null){ throw "没有数据了" } if(response.data.list.length<=0){ throw "没有数据了" } if(timeRange!=null){ const today = helper.getLocalDate(); console.log("response.data:",response.data,today) if(cur_day_data.get(today)[timeRange.start]){ console.log("response.data.count:",response.data.count) console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[timeRange.start]) cur_timeRange.count = response.data.count cur_timeRange.cur_page = page cur_day_data.get(today)[timeRange.start].count = response.data.count cur_day_data.get(today)[timeRange.start].cur_page = page }else{ console.log("cur_day_data空") } manager.updateRecord(today,cur_day_data.get(today)); } for (let index = 0; index < response.data.list.length; index++) { const origin_element = response.data.list[index]; const result = await origin_data_controllers.getOriginData({ materialId:origin_element.materialId}) if(result.success){ // await origin_data_controllers.updateOriginData({id:result.data.id}, // { // kepp_num:origin_element.favoriteCount.count, // comment_num:origin_element.commentCount.count, // like_num:origin_element.likeCount.count, // shared_num:origin_element.shareCount.count // }) }else{ if(CMD.isRight(FilterConfig,origin_element)){ materialId_list.push(origin_element.materialId) materialId_data_list[origin_element.materialId] = { kepp_num:origin_element.favoriteCount.count, comment_num:origin_element.commentCount.count, like_num:origin_element.likeCount.count, shared_num:origin_element.shareCount.count } } } } return {materialId_list:materialId_list,materialId_data_list:materialId_data_list,count:response.data.count} } async function finish_material_list(response,page) { if(!response.success){ console.error("finish_material_list:",response) }else{ let detail_item = await process_material_list(response,page,null) detail_task.push(detail_item) } } async function process_update_token() { try{ let response = await pullDataService.getToken() console.log("token:",response.data.token) }catch(e){ console.error("process_update_token:",e) }finally{ // setTimeout(process_update_token,200) } } async function process_novel_material_list() { try{ if(task_params.length>0){ let element = task_params.shift() const pull_task_item = get_pull_data_task(); await pull_task_item.runTask( finish_material_list, cur_timeRange, element.page, element.limit ); await new Promise(resolve => setTimeout(resolve, 200)); } }catch(e){ }finally{ if(task_params.length>0){ setTimeout(process_novel_material_list,200) } } } async function processTask(timeRange){ try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let response = await pullDataService.get_novel_material_list(cur_timeRange,0,500) // console.log("get_novel_material_list:",response) let detail_item = await process_material_list(response,0,timeRange) // { page: 1, offset: 0, limit: 500 }, task_params = helper.getPaginationParams(detail_item.count,500) task_params.shift() detail_task.push(detail_item) await process_novel_material_list() console.log("processTask over!") }catch(e){ console.error("PULL_DATA: error:",e) }finally{ } } CMD.subTitle = function(title){ if(title.length>299){ return title.substring(0, 299); } return title } CMD.isRight = function(FilterConfig,origin_element){ if(origin_element.likeCount.count < FilterConfig.like_num){// console.log("点赞数正确") return false } if(origin_element.favoriteCount.count < FilterConfig.kepp_num){ // console.log("收藏数正确") return false } if(origin_element.shareCount.count < FilterConfig.shared_num){ // console.log("分享数正确") return false } if(origin_element.commentCount.count < FilterConfig.comment_num){ // console.log("评论") return false } return true } CMD.init = async function(){ // await startConsumer(); redis_help.connect(async ()=>{ startConsumer() let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) let response = await pullDataService.getToken() console.log("token:",response) // processTask() _24HourRanges = PullDataConfig // 定义任务回调函数 let key = helper.getLocalDate(); let record_res = await manager.getRecord(key); if(record_res!=null){ if(!cur_day_data.has(key)){ cur_day_data.set(key,record_res.content) } } await pullDataService.getToken() // 启动调度器 CMD.startScheduler(taskCallback); processDetailTask() // process_update_token() }) } // 生成24小时的时间区间 CMD.generate24HourRanges = function() { // const ranges = []; // const today = new Date(); // today.setMinutes(0); // today.setSeconds(0); // today.setMilliseconds(0); // for (let hour = 0; hour < 24; hour++) { // const timeString = `${hour.toString().padStart(2, '0')}:00`; // let periodName = ''; // // 设置当天的小时 // today.setHours(hour); // const timestamp = today.getTime(); // // 根据时间划分时段 // if (hour >= 0 && hour < 6) { // periodName = '凌晨区间'; // } else if (hour >= 6 && hour < 9) { // periodName = '早晨区间'; // } else if (hour >= 9 && hour < 12) { // periodName = '上午区间'; // } else if (hour >= 12 && hour < 14) { // periodName = '中午区间'; // } else if (hour >= 14 && hour < 18) { // periodName = '下午区间'; // } else if (hour >= 18 && hour < 22) { // periodName = '晚上区间'; // } else { // periodName = '深夜区间'; // } // ranges.push({ // name: periodName, // start: timeString, // timestamp: timestamp, // hour:hour, // timeFormat: new Date(timestamp).toLocaleString() // 可读的时间格式 // }); // } let ranges = [ { name: '凌晨区间', start: '00:00', timestamp: 1733760000000, hour: 0, timeFormat: '12/10/2024, 12:00:00 AM' }, { name: '凌晨区间', start: '01:00', timestamp: 1733763600000, hour: 1, timeFormat: '12/10/2024, 1:00:00 AM' }, { name: '凌晨区间', start: '02:00', timestamp: 1733767200000, hour: 2, timeFormat: '12/10/2024, 2:00:00 AM' }, { name: '凌晨区间', start: '03:00', timestamp: 1733770800000, hour: 3, timeFormat: '12/10/2024, 3:00:00 AM' }, { name: '凌晨区间', start: '04:00', timestamp: 1733774400000, hour: 4, timeFormat: '12/10/2024, 4:00:00 AM' }, { name: '凌晨区间', start: '05:00', timestamp: 1733778000000, hour: 5, timeFormat: '12/10/2024, 5:00:00 AM' }, { name: '早晨区间', start: '06:00', timestamp: 1733781600000, hour: 6, timeFormat: '12/10/2024, 6:00:00 AM' }, { name: '早晨区间', start: '07:00', timestamp: 1733785200000, hour: 7, timeFormat: '12/10/2024, 7:00:00 AM' }, { name: '早晨区间', start: '08:00', timestamp: 1733788800000, hour: 8, timeFormat: '12/10/2024, 8:00:00 AM' }, { name: '上午区间', start: '09:00', timestamp: 1733792400000, hour: 9, timeFormat: '12/10/2024, 9:00:00 AM' }, { name: '上午区间', start: '10:00', timestamp: 1733796000000, hour: 10, timeFormat: '12/10/2024, 10:00:00 AM' }, { name: '上午区间', start: '11:00', timestamp: 1733799600000, hour: 11, timeFormat: '12/10/2024, 11:00:00 AM' }, { name: '中午区间', start: '12:00', timestamp: 1733803200000, hour: 12, timeFormat: '12/10/2024, 12:00:00 PM' }, { name: '中午区间', start: '13:00', timestamp: 1733806800000, hour: 13, timeFormat: '12/10/2024, 1:00:00 PM' }, { name: '下午区间', start: '14:00', timestamp: 1733810400000, hour: 14, timeFormat: '12/10/2024, 2:00:00 PM' }, { name: '下午区间', start: '15:00', timestamp: 1733814000000, hour: 15, timeFormat: '12/10/2024, 3:00:00 PM' }, { name: '下午区间', start: '16:00', timestamp: 1733817600000, hour: 16, timeFormat: '12/10/2024, 4:00:00 PM' }, { name: '下午区间', start: '17:00', timestamp: 1733821200000, hour: 17, timeFormat: '12/10/2024, 5:00:00 PM' }, { name: '晚上区间', start: '18:00', timestamp: 1733824800000, hour: 18, timeFormat: '12/10/2024, 6:00:00 PM' }, { name: '晚上区间', start: '19:00', timestamp: 1733828400000, hour: 19, timeFormat: '12/10/2024, 7:00:00 PM' }, { name: '晚上区间', start: '20:45', timestamp: 1733832000000, hour: 20, timeFormat: '12/10/2024, 8:00:00 PM' }, { name: '晚上区间', start: '21:00', timestamp: 1733835600000, hour: 21, timeFormat: '12/10/2024, 9:00:00 PM' }, { name: '深夜区间', start: '22:00', timestamp: 1733839200000, hour: 22, timeFormat: '12/10/2024, 10:00:00 PM' }, { name: '深夜区间', start: '23:00', timestamp: 1733842800000, hour: 23, timeFormat: '12/10/2024, 11:00:00 PM' } ] return ranges; } CMD.stopScheduler = function() { jobs.forEach(({ job }) => job.cancel()); jobs = []; console.log('调度器已停止,所有任务已清除'); } CMD.startScheduler = function(taskCallback) { CMD.stopScheduler() // 为每个时间点创建定时任务 _24HourRanges.forEach(timeRange => { // 解析小时 const hour = parseInt(timeRange.start.split(':')[0]); const minute = parseInt(timeRange.start.split(':')[1]); // 创建定时任务 - 在每天的指定小时整点执行 const job = schedule.scheduleJob(`${minute} ${hour} * * *`, async () => { try { console.log(`开始执行任务: ${timeRange.name} ${timeRange.start}`); // 执行回调函数 await taskCallback({ timeRange, executionTime: new Date() }); } catch (error) { // 记录失败 console.error('任务执行错误:', error); } }); jobs.push({ job, timeRange }); console.log(`已安排任务: ${timeRange.name} ${timeRange.start}`); }); console.log(`调度器已启动,共设置 ${jobs.length} 个定时任务`); } if(!config.isDebug){ CMD.init() }