const CMD = {} const time_count = 1000; const redis_help = require('../src/use_redis'); const origin_data_controllers = require('../src/data_manager/Controllers/origin_data_controllers'); const axios = require('axios') const config = require('../etc/config.json'); const DailyRecordManager = require('./daily_records'); const schedule = require('node-schedule'); const manager = new DailyRecordManager(); const rabbitMq = require('../src/mq/rabbit-mq'); const helper = require('../src/helper'); const PullDataService = require('../src/PullDataService'); const pull_data_task_item = require('./pull_data_task_item'); var _24HourRanges = [] var jobs = [] var cur_day_data = new Map() var pull_data_task_map = [] var cur_timeRange = null var pullDataService = new PullDataService(redis_help) var detail_task = [] var task_params = [] var cur_page = 0 const messageHandler = async (msg) => { let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) _24HourRanges = PullDataConfig CMD.startScheduler(taskCallback) }; const get_pull_data_task = function(){ for (let index = 0; index < pull_data_task_map.length; index++) { const element = pull_data_task_map[index]; if(!element.isRunTask()){ return element } } let item = new pull_data_task_item(redis_help) pull_data_task_map.push(item) return item } const taskCallback = async (context) => { var { timeRange, executionTime } = context; console.log('执行任务:', { 时间区间: timeRange.name, 开始时间: timeRange.start, 执行时间: executionTime }); const today = helper.getLocalDate(); cur_day_data.forEach((v,k)=>{ if(k==today){ }else{ cur_day_data.set(k, {}) } }) if(!cur_day_data.has(today)){ cur_day_data.set(today,{}) } if(!cur_day_data.get(today)[timeRange.start]){ cur_day_data.get(today)[timeRange.start] = {name:timeRange.name,interval_minute:timeRange.interval_minute,finish_count:0} } console.log("cur_day_data[today]:",cur_day_data.get(today),today) const existingRecord = await manager.getRecord(today); if (existingRecord) { // 如果存在今天的记录,则更新 const result = await manager.updateRecord(today, cur_day_data.get(today)); console.log("更新记录结果:", result); } else { // 如果不存在今天的记录,则创建新记录 const result = await manager.createRecord(today, cur_day_data.get(today)); console.log("创建新记录结果:", result); } cur_timeRange = timeRange cur_timeRange.finish_count = 0 cur_page = 0 await processTask(); }; // 启动消费者 async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler,"exchange_update_pull_config","updatePullConig"); } catch (error) { console.error('启动消费者失败:', error); } } async function processDetailTask(task_item) { try{ console.log(" start processDetailTask") let materialId_list = task_item.materialId_list let materialId_data_list = task_item.materialId_data_list let response = await pullDataService.get_detail(materialId_list) if(!response.success){ console.log("get_detail:",response) throw {msg:response,timeRange:cur_timeRange,fun:"get_detail",materialId_list:materialId_list} } if(cur_timeRange!=null){ cur_timeRange.finish_count+=response.data.list.length const today = helper.getLocalDate(); cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count await manager.updateRecord(today,cur_day_data.get(today)); } for (let index = 0; index < response.data.list.length; index++) { const element = response.data.list[index]; const info = materialId_data_list[element.materialId] if(element.hasAnchorInfo){ await origin_data_controllers.createOriginData( { video_id:element.awemeId, materialId:element.materialId, video_link:element.awemeUrl, title:CMD.subTitle(element.title), publish_time:new Date(element.publishTime), kepp_num:info.kepp_num, comment_num:info.comment_num, like_num:info.like_num, shared_num:info.shared_num, is_guajian:1, guajian_link:element.landingUrl, status:0, createTime:info.createTime, updateTime:info.updateTime } ) } } console.log(" finish processDetailTask") }catch (error) { console.error("processDetailTask:",error) }finally{ // global.setTimeout(processDetailTask,500) } } async function process_material_list(response,page) { try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let materialId_list = [] let materialId_data_list = [] if(!response.success){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data == undefined||response.data == null){ throw {msg:response,timeRange:cur_timeRange,fun:"get_novel_material_list"} } if(response.data.list == undefined||response.data.list == null){ throw "没有数据了" } if(response.data.list.length<=0){ throw "没有数据了" } if(cur_timeRange!=null){ const today = helper.getLocalDate(); // console.log("response.data:",response.data,today) if(cur_day_data.get(today)[cur_timeRange.start]){ console.log("response.data.count:",response.data.count) console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[cur_timeRange.start]) cur_timeRange.count = cur_timeRange.count + response.data.list.length cur_timeRange.cur_page = page cur_day_data.get(today)[cur_timeRange.start].count = response.data.count cur_day_data.get(today)[cur_timeRange.start].cur_page = page }else{ console.log("cur_day_data空") } manager.updateRecord(today,cur_day_data.get(today)); } console.log("materialId_list:",response.data.list.length) for (let index = 0; index < response.data.list.length; index++) { const origin_element = response.data.list[index]; const result = await origin_data_controllers.getOriginData({ materialId:origin_element.materialId}) if(result.success){ // await origin_data_controllers.updateOriginData({id:result.data.id}, // { // kepp_num:origin_element.favoriteCount.count, // comment_num:origin_element.commentCount.count, // like_num:origin_element.likeCount.count, // shared_num:origin_element.shareCount.count // }) }else{ if(CMD.isRight(FilterConfig,origin_element)){ // materialId_list.push(origin_element.materialId) // materialId_data_list[origin_element.materialId] = { // kepp_num:origin_element.favoriteCount.count, // comment_num:origin_element.commentCount.count, // like_num:origin_element.likeCount.count, // shared_num:origin_element.shareCount.count // } console.log("materialId:",origin_element.materialId) } } } console.log("materialId_list:",materialId_list) return {materialId_list:materialId_list,materialId_data_list:materialId_data_list,count:response.data.list.length} }catch(e){ console.log("materialId_list:",e) return null } } async function finish_material_list(response,page) { if(!response.success){ console.error("finish_material_list:",response) }else{ let detail_item = await process_material_list(response,page,null) detail_task.push(detail_item) } } async function process_update_token() { try{ let response = await pullDataService.getToken() console.log("token:",response.data.token) }catch(e){ console.error("process_update_token:",e) }finally{ // setTimeout(process_update_token,200) } } async function process_novel_material_list() { try{ if(task_params.length>0){ let element = task_params.shift() const pull_task_item = get_pull_data_task(); await pull_task_item.runTask( finish_material_list, cur_timeRange, element.page, element.limit ); await new Promise(resolve => setTimeout(resolve, 200)); } }catch(e){ }finally{ if(task_params.length>0){ setTimeout(process_novel_material_list,200) } } } async function processTask(){ try{ let FilterConfig = await redis_help.getKeyValue("FilterConfig") FilterConfig = JSON.parse(FilterConfig) let response = await pullDataService.get_novel_material_list(cur_timeRange,cur_page,500) if(response.success){ if(response.data.list.length<=0){ cur_page = -1; throw response } }else{ cur_page = -1; throw response } // console.log("get_novel_material_list:",response) let detail_item = await process_material_list(response,cur_page,cur_timeRange) // { page: 1, offset: 0, limit: 500 }, // task_params = helper.getPaginationParams(detail_item.count,500) // task_params.shift() // detail_task.push(detail_item) console.log("detail_item:",detail_item) await processDetailTask(detail_item) console.log("processTask over!") }catch(e){ console.error("PULL_DATA: error:",e) }finally{ if(cur_page!=-1){ cur_page++; global.setTimeout(processTask,1000) } } } CMD.subTitle = function(title){ if(title.length>299){ return title.substring(0, 299); } return title } CMD.isRight = function(FilterConfig,origin_element){ console.log("origin_element.likeCount.count:",origin_element.likeCount.count,FilterConfig.like_num) if(origin_element.likeCount.count < FilterConfig.like_num){// console.log("点赞数正确") return false } if(origin_element.favoriteCount.count < FilterConfig.kepp_num){ // console.log("收藏数正确") return false } if(origin_element.shareCount.count < FilterConfig.shared_num){ // console.log("分享数正确") return false } if(origin_element.commentCount.count < FilterConfig.comment_num){ // console.log("评论") return false } return true } CMD.init = async function(){ // await startConsumer(); redis_help.connect(async ()=>{ startConsumer() let PullDataConfig = await redis_help.getKeyValue("PullDataConfig") PullDataConfig = JSON.parse(PullDataConfig) let response = await pullDataService.getToken() console.log("token:",response) // processTask() _24HourRanges = PullDataConfig // 定义任务回调函数 let key = helper.getLocalDate(); let record_res = await manager.getRecord(key); if(record_res!=null){ if(!cur_day_data.has(key)){ cur_day_data.set(key,record_res.content) } } await pullDataService.getToken() // 启动调度器 CMD.startScheduler(taskCallback); // process_update_token() }) } // 生成24小时的时间区间 CMD.generate24HourRanges = function() { // const ranges = []; // const today = new Date(); // today.setMinutes(0); // today.setSeconds(0); // today.setMilliseconds(0); // for (let hour = 0; hour < 24; hour++) { // const timeString = `${hour.toString().padStart(2, '0')}:00`; // let periodName = ''; // // 设置当天的小时 // today.setHours(hour); // const timestamp = today.getTime(); // // 根据时间划分时段 // if (hour >= 0 && hour < 6) { // periodName = '凌晨区间'; // } else if (hour >= 6 && hour < 9) { // periodName = '早晨区间'; // } else if (hour >= 9 && hour < 12) { // periodName = '上午区间'; // } else if (hour >= 12 && hour < 14) { // periodName = '中午区间'; // } else if (hour >= 14 && hour < 18) { // periodName = '下午区间'; // } else if (hour >= 18 && hour < 22) { // periodName = '晚上区间'; // } else { // periodName = '深夜区间'; // } // ranges.push({ // name: periodName, // start: timeString, // timestamp: timestamp, // hour:hour, // timeFormat: new Date(timestamp).toLocaleString() // 可读的时间格式 // }); // } let ranges = [ { name: '凌晨区间', start: '00:00', timestamp: 1733760000000, hour: 0, timeFormat: '12/10/2024, 12:00:00 AM' }, { name: '凌晨区间', start: '01:00', timestamp: 1733763600000, hour: 1, timeFormat: '12/10/2024, 1:00:00 AM' }, { name: '凌晨区间', start: '02:00', timestamp: 1733767200000, hour: 2, timeFormat: '12/10/2024, 2:00:00 AM' }, { name: '凌晨区间', start: '03:00', timestamp: 1733770800000, hour: 3, timeFormat: '12/10/2024, 3:00:00 AM' }, { name: '凌晨区间', start: '04:00', timestamp: 1733774400000, hour: 4, timeFormat: '12/10/2024, 4:00:00 AM' }, { name: '凌晨区间', start: '05:00', timestamp: 1733778000000, hour: 5, timeFormat: '12/10/2024, 5:00:00 AM' }, { name: '早晨区间', start: '06:00', timestamp: 1733781600000, hour: 6, timeFormat: '12/10/2024, 6:00:00 AM' }, { name: '早晨区间', start: '07:00', timestamp: 1733785200000, hour: 7, timeFormat: '12/10/2024, 7:00:00 AM' }, { name: '早晨区间', start: '08:00', timestamp: 1733788800000, hour: 8, timeFormat: '12/10/2024, 8:00:00 AM' }, { name: '上午区间', start: '09:00', timestamp: 1733792400000, hour: 9, timeFormat: '12/10/2024, 9:00:00 AM' }, { name: '上午区间', start: '10:00', timestamp: 1733796000000, hour: 10, timeFormat: '12/10/2024, 10:00:00 AM' }, { name: '上午区间', start: '11:00', timestamp: 1733799600000, hour: 11, timeFormat: '12/10/2024, 11:00:00 AM' }, { name: '中午区间', start: '12:00', timestamp: 1733803200000, hour: 12, timeFormat: '12/10/2024, 12:00:00 PM' }, { name: '中午区间', start: '13:00', timestamp: 1733806800000, hour: 13, timeFormat: '12/10/2024, 1:00:00 PM' }, { name: '下午区间', start: '14:00', timestamp: 1733810400000, hour: 14, timeFormat: '12/10/2024, 2:00:00 PM' }, { name: '下午区间', start: '15:00', timestamp: 1733814000000, hour: 15, timeFormat: '12/10/2024, 3:00:00 PM' }, { name: '下午区间', start: '16:00', timestamp: 1733817600000, hour: 16, timeFormat: '12/10/2024, 4:00:00 PM' }, { name: '下午区间', start: '17:00', timestamp: 1733821200000, hour: 17, timeFormat: '12/10/2024, 5:00:00 PM' }, { name: '晚上区间', start: '18:00', timestamp: 1733824800000, hour: 18, timeFormat: '12/10/2024, 6:00:00 PM' }, { name: '晚上区间', start: '19:00', timestamp: 1733828400000, hour: 19, timeFormat: '12/10/2024, 7:00:00 PM' }, { name: '晚上区间', start: '20:45', timestamp: 1733832000000, hour: 20, timeFormat: '12/10/2024, 8:00:00 PM' }, { name: '晚上区间', start: '21:00', timestamp: 1733835600000, hour: 21, timeFormat: '12/10/2024, 9:00:00 PM' }, { name: '深夜区间', start: '22:00', timestamp: 1733839200000, hour: 22, timeFormat: '12/10/2024, 10:00:00 PM' }, { name: '深夜区间', start: '23:00', timestamp: 1733842800000, hour: 23, timeFormat: '12/10/2024, 11:00:00 PM' } ] return ranges; } CMD.stopScheduler = function() { jobs.forEach(({ job }) => job.cancel()); jobs = []; console.log('调度器已停止,所有任务已清除'); } CMD.startScheduler = function(taskCallback) { CMD.stopScheduler() // 为每个时间点创建定时任务 _24HourRanges.forEach(timeRange => { // 解析小时 const hour = parseInt(timeRange.start.split(':')[0]); const minute = parseInt(timeRange.start.split(':')[1]); // 创建定时任务 - 在每天的指定小时整点执行 const job = schedule.scheduleJob(`${minute} ${hour} * * *`, async () => { try { console.log(`开始执行任务: ${timeRange.name} ${timeRange.start}`); // 执行回调函数 await taskCallback({ timeRange, executionTime: new Date() }); } catch (error) { // 记录失败 console.error('任务执行错误:', error); } }); jobs.push({ job, timeRange }); console.log(`已安排任务: ${timeRange.name} ${timeRange.start}`); }); console.log(`调度器已启动,共设置 ${jobs.length} 个定时任务`); } if(!config.isDebug){ CMD.init() }