904118851 8 月之前
父節點
當前提交
c0ef6c2ec4
共有 4 個文件被更改,包括 247 次插入129 次删除
  1. 105 129
      PULL_DATA_FACTORY/PULL_DATA.js
  2. 2 0
      etc/config.json
  3. 124 0
      src/PullDataService.js
  4. 16 0
      src/helper.js

+ 105 - 129
PULL_DATA_FACTORY/PULL_DATA.js

@@ -8,12 +8,15 @@ const DailyRecordManager = require('./daily_records');
 const schedule = require('node-schedule');
 const manager = new DailyRecordManager();
 const rabbitMq = require('../src/mq/rabbit-mq');
-var session = null
+const helper = require('../src/helper');
+const PullDataService = require('../src/PullDataService');
 var cur_page = 0
 var _24HourRanges = []
 var jobs = []
 var cur_day_data = new Map()
 var cur_timeRange = null
+var pullDataService = new PullDataService()
+var detail_task = []
 const messageHandler = async (msg) => {
     let PullDataConfig = await redis_help.getKeyValue("PullDataConfig")
     PullDataConfig = JSON.parse(PullDataConfig)
@@ -21,21 +24,7 @@ const messageHandler = async (msg) => {
     CMD.startScheduler(taskCallback)
 };
 
-function getLocalDate() {
-    const d = new Date();
-    const year = d.getFullYear();
-    const month = String(d.getMonth() + 1).padStart(2, '0');
-    const day = String(d.getDate()).padStart(2, '0');
-    return `${year}-${month}-${day}`;
-}
 
-function getTimeStampByHourMinute(timeStr) {
-    const [hours, minutes] = timeStr.split(':').map(Number);
-    const today = new Date();
-    today.setHours(hours, minutes, 0, 0);
-    // return today.getTime();
-    return today;
-}
 const taskCallback = async (context) => {
     var { timeRange, executionTime } = context;
     
@@ -44,7 +33,7 @@ const taskCallback = async (context) => {
         开始时间: timeRange.start,
         执行时间: executionTime
     });
-    const today = getLocalDate();
+    const today = helper.getLocalDate();
     cur_day_data.forEach((v,k)=>{
         if(k==today){
 
@@ -91,13 +80,61 @@ async function startConsumer() {
     }
 }
 
+async function processDetailTask() {
+    try{
+        if(detail_task.length>0){
+            let task_item = detail_task.shift()
+            let materialId_list = task_item.materialId_list
+            let materialId_data_list = task_item.materialId_data_list
+            let response = await pullDataService.get_detail(materialId_list)
+            if(!response.success){
+                console.log("get_detail:",response)
+                throw {msg:response,timeRange:cur_timeRange,fun:"get_detail"}
+            }
+
+            if(cur_timeRange!=null){
+                cur_timeRange.finish_count+=response.data.list.length
+                const today = helper.getLocalDate();
+                cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count
+                await manager.updateRecord(today,cur_day_data.get(today));
+            }
+            for (let index = 0; index < response.data.list.length; index++) {
+                const element = response.data.list[index];
+                const info = materialId_data_list[element.materialId]
+                if(element.hasAnchorInfo){
+                    await origin_data_controllers.createOriginData(
+                        {
+                            video_id:element.awemeId,
+                            materialId:element.materialId,
+                            video_link:element.awemeUrl,
+                            title:CMD.subTitle(element.title),
+                            publish_time:new Date(element.publishTime),
+                            kepp_num:info.kepp_num,
+                            comment_num:info.comment_num,
+                            like_num:info.like_num,
+                            shared_num:info.shared_num,
+                            is_guajian:1,
+                            guajian_link:element.landingUrl,
+                            status:0
+                        }
+                    )
+                }
+            
+            }
+        }
+    }catch (error) {
+        console.error("processDetailTask:",error)
+    }finally{
+        global.setTimeout(processDetailTask,500)
+    }
+}
+
 async function processTask(timeRange){
     try{
         if(cur_page==-1){
             throw "没有数据了"
         }
-        let rs = await CMD.getToken()
-        await redis_help.setKeyValue("pull_data_token",rs.data.token)
+        await pullDataService.getToken()
         let FilterConfig = await redis_help.getKeyValue("FilterConfig")
         FilterConfig = JSON.parse(FilterConfig)
         let size = 500
@@ -107,7 +144,7 @@ async function processTask(timeRange){
                 size = sy_count
             }
         }
-        let response = await CMD.get_novel_material_list(cur_page,size)
+        let response = await pullDataService.get_novel_material_list(cur_timeRange,cur_page,size)
         // console.log("get_novel_material_list:",response)
         let materialId_list = []
         let materialId_data_list = []
@@ -130,13 +167,15 @@ async function processTask(timeRange){
             throw "没有数据了"
         }
         if(timeRange!=null){
-            const today = getLocalDate();
+            const today = helper.getLocalDate();
             console.log("response.data:",response.data,today)
             if(cur_day_data.get(today)[timeRange.start]){
                 console.log("response.data.count:",response.data.count)
                 console.log("(cur_day_data[today][timeRange.start]:",cur_day_data.get(today)[timeRange.start])
                 cur_timeRange.count = response.data.count
+                cur_timeRange.cur_page = cur_page
                 cur_day_data.get(today)[timeRange.start].count = response.data.count
+                cur_day_data.get(today)[timeRange.start].cur_page = cur_page
             }else{
                 console.log("cur_day_data空")
             }
@@ -148,13 +187,13 @@ async function processTask(timeRange){
             const result = await origin_data_controllers.getOriginData({
                 materialId:origin_element.materialId})
             if(result.success){
-                await origin_data_controllers.updateOriginData({id:result.data.id},
-                    {
-                        kepp_num:origin_element.favoriteCount.count,
-                        comment_num:origin_element.commentCount.count,
-                        like_num:origin_element.likeCount.count,
-                        shared_num:origin_element.shareCount.count
-                    })
+                // await origin_data_controllers.updateOriginData({id:result.data.id},
+                //     {
+                //         kepp_num:origin_element.favoriteCount.count,
+                //         comment_num:origin_element.commentCount.count,
+                //         like_num:origin_element.likeCount.count,
+                //         shared_num:origin_element.shareCount.count
+                //     })
             }else{
                 if(CMD.isRight(FilterConfig,origin_element)){
                     materialId_list.push(origin_element.materialId)
@@ -168,44 +207,43 @@ async function processTask(timeRange){
             }
         }
 
-
-        response = await CMD.get_detail(materialId_list)
-        if(!response.success){
-            console.log("get_detail:",response)
-            throw {msg:response,timeRange:cur_timeRange,fun:"get_detail"}
-        }
-
-        if(cur_timeRange!=null){
-            cur_timeRange.finish_count+=response.data.list.length
-            const today = getLocalDate();
-            cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count
-            await manager.updateRecord(today,cur_day_data.get(today));
-        }
-        for (let index = 0; index < response.data.list.length; index++) {
-            const element = response.data.list[index];
-            const info = materialId_data_list[element.materialId]
-            if(element.hasAnchorInfo){
-                await origin_data_controllers.createOriginData(
-                    {
-                        video_id:element.awemeId,
-                        materialId:element.materialId,
-                        video_link:element.awemeUrl,
-                        title:CMD.subTitle(element.title),
-                        publish_time:new Date(element.publishTime),
-                        kepp_num:info.kepp_num,
-                        comment_num:info.comment_num,
-                        like_num:info.like_num,
-                        shared_num:info.shared_num,
-                        is_guajian:1,
-                        guajian_link:element.landingUrl,
-                        status:0
-                    }
-                )
-            }
+        detail_task.push({materialId_list:materialId_list,materialId_data_list:materialId_data_list})
+        // response = await pullDataService.get_detail(materialId_list)
+        // if(!response.success){
+        //     console.log("get_detail:",response)
+        //     throw {msg:response,timeRange:cur_timeRange,fun:"get_detail"}
+        // }
+
+        // if(cur_timeRange!=null){
+        //     cur_timeRange.finish_count+=response.data.list.length
+        //     const today = helper.getLocalDate();
+        //     cur_day_data.get(today)[cur_timeRange.start].finish_count = cur_timeRange.finish_count
+        //     await manager.updateRecord(today,cur_day_data.get(today));
+        // }
+        // for (let index = 0; index < response.data.list.length; index++) {
+        //     const element = response.data.list[index];
+        //     const info = materialId_data_list[element.materialId]
+        //     if(element.hasAnchorInfo){
+        //         await origin_data_controllers.createOriginData(
+        //             {
+        //                 video_id:element.awemeId,
+        //                 materialId:element.materialId,
+        //                 video_link:element.awemeUrl,
+        //                 title:CMD.subTitle(element.title),
+        //                 publish_time:new Date(element.publishTime),
+        //                 kepp_num:info.kepp_num,
+        //                 comment_num:info.comment_num,
+        //                 like_num:info.like_num,
+        //                 shared_num:info.shared_num,
+        //                 is_guajian:1,
+        //                 guajian_link:element.landingUrl,
+        //                 status:0
+        //             }
+        //         )
+        //     }
         
-        }
-        // console.log("get_detail:",response)
-        // await pull_log_controllers.createData()
+        // }
+
     }catch(e){
         console.error("PULL_DATA: error:",e)
     }finally{
@@ -241,81 +279,19 @@ CMD.isRight = function(FilterConfig,origin_element){
     return true
 }
 
-CMD.getToken = async function(){
-    const response = await session.post(config.pull_data_config.get_token,{
-        userName:config.pull_data_config.userName,
-        verCode:"7061",
-        password:config.pull_data_config.password,
-        loginType:"OPENAPI"
-    });
-    return response.data
-}
-
-CMD.get_novel_material_list = async function(page,size=500) {
-    var now = new Date(); // 当前时间
-
-    if(cur_timeRange!=null){
-        now = getTimeStampByHourMinute(cur_timeRange.start)
-    }
-    let interval_minute = 60;
-
-    if(cur_timeRange!=null){
-        interval_minute = cur_timeRange.interval_minute
-    }
-    // const fiveMinutesAgo = new Date(now - 5 * 60 * 1000); // 5分钟前的时间
-    const fiveMinutesAgo = new Date(now - interval_minute * interval_minute * 1000); // 60分钟前的时间
-    let data = {
-        startTime:fiveMinutesAgo.getTime(),
-        endTime:now.getTime()
-    }
-
-    if(cur_timeRange!=null){
-        cur_timeRange.pull_time = data
-    }
-    let params = `page=${page}&size=${size}&sort=update_time,desc`
-    let url = config.pull_data_config.get_novel_material_list+params
-    console.log(url,data)
-    var response = await session.post(url,data, {
-        headers: {
-            'token':await redis_help.getKeyValue("pull_data_token")
-        }
-    });
-    return response.data
-}
-
-CMD.get_detail = async function(materialId_list) {
-    let data = {
-        list:materialId_list
-    }
-    const response = await session.post(config.pull_data_config.get_detail,data, {
-        headers: {
-            'token':await redis_help.getKeyValue("pull_data_token")
-        }
-    });
-    return response.data
-}
-
 
 CMD.init = async function(){
-    session = axios.create({
-        baseURL: config.isDebug?config.pull_data_config.debug_host:config.pull_data_config.release_host,
-        headers: {
-          'Accept': 'application/json, text/plain, */*',
-        }
-    });
-
     // await startConsumer();
     redis_help.connect(async ()=>{
         startConsumer()
         let PullDataConfig = await redis_help.getKeyValue("PullDataConfig")
         PullDataConfig = JSON.parse(PullDataConfig)
-        let response = await CMD.getToken()
+        let response = await pullDataService.getToken()
         console.log("token:",response)
-        await redis_help.setKeyValue("pull_data_token",response.data.token)
         // processTask()
         _24HourRanges = PullDataConfig
         // 定义任务回调函数
-        let key = getLocalDate();
+        let key = helper.getLocalDate();
         let record_res =  await manager.getRecord(key);
         if(record_res!=null){
             if(!cur_day_data.has(key)){
@@ -324,7 +300,7 @@ CMD.init = async function(){
         }
          // 启动调度器
         CMD.startScheduler(taskCallback);
-
+        processDetailTask()
 
     })
 }

+ 2 - 0
etc/config.json

@@ -77,6 +77,8 @@
         "debug_host":"https://t-playletweb.tarsocial.com/",
         "release_host":"https://t-playletweb.tarsocial.com/",
         "get_token":"/api/login/account",
+        "verCode":"7061",
+        "loginType":"OPENAPI",
         "get_detail":"/api/v4/data/novel/detail",
         "get_novel_material_list":"/api/v4/data/novel/material?",
         "userName":"15975037061",

+ 124 - 0
src/PullDataService.js

@@ -0,0 +1,124 @@
+const axios = require('axios');
+const config = require('../etc/config.json');
+const helper = require('./helper');
+const redis_help = require('../src/use_redis');
+class PullDataService {
+    constructor() {
+        this.session = axios.create({
+            baseURL: config.isDebug?config.pull_data_config.debug_host:config.pull_data_config.release_host,
+            headers: {
+              'Accept': 'application/json, text/plain, */*',
+            }
+        });
+        redis_help.connect(()=>{
+
+        })
+        this.token = ""
+    }
+
+
+    async getToken() {
+        try {
+            let maxRetries = 3
+            let delay = 1000;
+            for (let i = 0; i < maxRetries; i++) {
+                const response = await this.session.post(config.pull_data_config.get_token,{
+                    userName:config.pull_data_config.userName,
+                    verCode:config.pull_data_config.verCode,
+                    password:config.pull_data_config.password,
+                    loginType:config.pull_data_config.loginType
+                });
+
+                if(!response.data.success){
+                    if (i === maxRetries - 1) throw response;
+                    console.log(`Retry getToken ${i + 1} of ${maxRetries}`);
+                    await new Promise(resolve => setTimeout(resolve, delay * (i + 1)));
+                }else{
+                    this.token = response.data.data.token
+                    redis_help.setKeyValue("pull_data_token",this.token)
+                    return response.data;
+                }
+            }
+        } catch (error) {
+            return  {data:null,success:false,msg:error}
+        }
+    }
+
+
+    async get_novel_material_list(cur_timeRange,page,size=500) {
+        try {
+            let maxRetries = 3
+            let delay = 1000;
+            for (let i = 0; i < maxRetries; i++) {
+                var now = new Date(); // 当前时间
+
+                if(cur_timeRange!=null){
+                    now = helper.getTimeStampByHourMinute(cur_timeRange.start)
+                }
+                let interval_minute = 60;
+            
+                if(cur_timeRange!=null){
+                    interval_minute = cur_timeRange.interval_minute
+                }
+                // const fiveMinutesAgo = new Date(now - 5 * 60 * 1000); // 5分钟前的时间
+                const fiveMinutesAgo = new Date(now - interval_minute * interval_minute * 1000); // 60分钟前的时间
+                let data = {
+                    startTime:fiveMinutesAgo.getTime(),
+                    endTime:now.getTime()
+                }
+            
+                if(cur_timeRange!=null){
+                    cur_timeRange.pull_time = data
+                }
+                let params = `page=${page}&size=${size}&sort=update_time,desc`
+                let url = config.pull_data_config.get_novel_material_list+params
+                console.log(url,data)
+                var response = await session.post(url,data, {
+                    headers: {
+                        'token':this.token
+                    }
+                });
+                if(!response.data.success){
+                    if (i === maxRetries - 1) throw response;
+                    console.log(`Retry get_novel_material_list ${i + 1} of ${maxRetries}`);
+                    await new Promise(resolve => setTimeout(resolve, delay * (i + 1)));
+                }else{
+                    return response.data;
+                }
+            }
+        } catch (error) {
+            return {data:null,success:false,msg:error}
+        }
+    }
+
+
+    async get_detail(materialId_list) {
+        try {
+            let maxRetries = 3
+            let delay = 1000;
+            for (let i = 0; i < maxRetries; i++) {
+                let data = {
+                    list:materialId_list
+                }
+                const response = await session.post(config.pull_data_config.get_detail,data, {
+                    headers: {
+                        'token':this.token
+                    }
+                });
+                if(!response.data.success){
+                    if (i === maxRetries - 1) throw response;
+                    console.log(`Retry get_detail ${i + 1} of ${maxRetries}`);
+                    await new Promise(resolve => setTimeout(resolve, delay * (i + 1)));
+                }else{
+                    return response.data;
+                }
+            }
+        } catch (error) {
+            return  {data:null,success:false,msg:error}
+        }
+
+    }
+
+}
+
+module.exports = PullDataService;

+ 16 - 0
src/helper.js

@@ -66,4 +66,20 @@ helper.getHyCreateLinkOpt = function(token){
     options.headers['Authorization'] = `Bearer ${token}`
     return options
 }
+
+helper.getTimeStampByHourMinute = function(timeStr) {
+    const [hours, minutes] = timeStr.split(':').map(Number);
+    const today = new Date();
+    today.setHours(hours, minutes, 0, 0);
+    // return today.getTime();
+    return today;
+}
+
+helper.getLocalDate = function() {
+    const d = new Date();
+    const year = d.getFullYear();
+    const month = String(d.getMonth() + 1).padStart(2, '0');
+    const day = String(d.getDate()).padStart(2, '0');
+    return `${year}-${month}-${day}`;
+}
 module.exports = helper;