123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- const CMD = {}
- const redis_help = require('../src/use_redis');
- const rabbitMq = require('../src/mq/rabbit-mq');
- const config = require('../etc/config.json')
- const axios = require('axios');
- const mysql = require('mysql2/promise');
- const http = require('http');
- const tools = require('../tools');
- const dbConfig = config.isDebug?config.debug_task_mysql:config.release_task_mysql
- async function processTask(tg_platform_id, main_info) {
- try {
- console.log(`处理第 ${cur_page} 页任务`);
-
- const response = await CMD.syncMain(tg_platform_id, main_info);
-
- if (response !== null) {
- cur_page++;
- total = response.total;
- task_que.push(response);
-
- // 使用 Promise 延迟执行下一页
- await new Promise(resolve => setTimeout(resolve, 1000));
- return processTask(tg_platform_id, main_info);
- } else {
- console.log("完成所有创建:", main_info, cur_page);
- cur_page = -1;
- isStartTask = false;
- await redis_help.setKeyValue("isPauseTask", "false");
- }
- } catch (e) {
- console.error("processTask 错误:", e);
- isStartTask = false;
- await redis_help.setKeyValue("isPauseTask", "false");
- // if (cur_page !== -1) {
- // console.log("5秒后重试当前页...");
- // await new Promise(resolve => setTimeout(resolve, 5000));
- // return processTask(tg_platform_id, main_info);
- // }
- }
- }
- async function processSyncMainTask(){
- try{
- if(task_que.length>0){
- let sync_main_task_item = task_que.shift()
- let product_list = sync_main_task_item.data;
- let main_info = sync_main_task_item.main_info;
- let main_id = main_info.id
- const connection = await mysql.createConnection({
- ...dbConfig,
- multipleStatements: true
- });
- const values = product_list.map(item => [
- item.product_name,
- item.product_id,
- item.book_platform,
- main_id,
- main_info.app_id,
- 0,
- 0
- ]);
- // let table_name = "video_applet_product"
- // const insertSQL = `
- // INSERT INTO ${table_name}
- // (product_name, product_id, book_platform, main_id, dy_small_applet_app_id,status,
- // wait_status)
- // VALUES ?
- // `;
- const insertSQL = `
- INSERT INTO video_applet_product
- (product_name, product_id, book_platform, main_id, dy_small_applet_app_id, status, wait_status)
- VALUES ?
- ON DUPLICATE KEY UPDATE
- product_name = VALUES(product_name),
- dy_small_applet_app_id = VALUES(dy_small_applet_app_id),
- status = VALUES(status),
- wait_status = VALUES(wait_status)
- `;
- await connection.query(insertSQL, [values]);
- await connection.end();
- }
- }catch(e){
- console.error("processSyncMainTask:",e)
- }finally{
- setTimeout(processSyncMainTask, 1000);
- }
- }
- const messageHandler = async (msg) => {
- // CMD.start_task(msg['data']['data'])
- if(isStartTask){
- return
- }
- cur_page = 1;
- total = 0;
- isStartTask = true
- let main_info = msg['data']['data'].main_info
- console.log("main_info:",main_info)
- if(main_info.parent_platform_id!=0){
- tg_platform_id = main_info.parent_platform_id
- }else{
- tg_platform_id = main_info.tg_platform_id
- }
- await redis_help.setKeyValue("isPauseTask","true")
- await processTask(tg_platform_id,main_info)
- };
- // // 启动消费者
- // async function startConsumer() {
- // try {
- // await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","addMain");
- // } catch (error) {
- // console.error('启动消费者失败:', error);
- // }
- // }
- // 启动消费者
- async function startConsumer() {
- try {
- await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","syncMain");
- } catch (error) {
- console.error('启动消费者失败:', error);
- }
- processSyncMainTask()
- }
- CMD.init = async function(){
- redis_help.connect(()=>{
- })
- await startConsumer();
- }
- let task_que = []
- let cur_page = 1;
- let total = 0;
- let isStartTask = false
- CMD.syncMain = async function(tg_platform_id, main_info) {
- console.log(`====== 开始第 ${cur_page} 页请求 =====`);
- await new Promise(resolve => setTimeout(resolve, 500));
- try {
- let postData = {
- cmd: "video_product",
- fun: "search_book_data",
- data: {
- "product_name": "",
- "product_id": "",
- "tg_platform_id": tg_platform_id,
- "oce_material_id": "",
- "page_size": 500,
- "page_number": cur_page,
- "is_auto": "",
- "is_store": "",
- "genre": "",
- "alias_name": ""
- }
- };
-
- console.log("发送请求数据:", postData);
- let client = tools.getOneNewClinet()
- let response = await client.post('http://127.0.0.1:9100/tg/back/api', postData, {
- headers: {
- 'Content-Type': 'application/json',
- 'X-Request-Type': 'API', // 标记 API 请求
- }
- });
- response = response.data
- // 检查响应数据结构
- if (!response || !response.data || !Array.isArray(response.data)) {
- console.log("响应数据无效或格式错误");
- return null;
- }
- // 检查是否有数据
- if (response.data.length <= 0) {
- console.log("没有更多数据");
- return null;
- }
- // 返回正确的数据结构
- return {
- data: response.data,
- main_info: main_info,
- total: response.total || 0
- };
- } catch (e) {
- console.error("请求错误:", {
- message: e.message,
- code: e.code,
- stack: e.stack
- });
- return null;
- }
- };
- // CMD.start_task = async function(msgBody){
- // let PlatformConfig = JSON.parse(await redis_help.getKeyValue("PlatformConfig"))
- // let main_info = msgBody.main_info
- // let list = msgBody.list
- // let PlatformInfo = null
- // if(main_info.running_status==0){
- // console.log("创建了一个关闭的主体:",main_info)
- // return
- // }
- // console.log("main_info:",main_info)
- // console.log("list:",list)
- // for (let index = 0; index < PlatformConfig.length; index++) {
- // const element = PlatformConfig[index];
- // if(element.tg_platform_id == main_info.tg_platform_id){
- // PlatformInfo = element
- // break
- // }
- // }
- // if(PlatformInfo==null){
- // console.log("平台配置错误:",msgBody)
- // }else{
- // redis_help.setKeyValue("isPauseTask","true")
- // console.log("list::",list)
- // for (let index = 0; index < list.length; index++) {
- // const element = list[index];
- // let n_data = {book_id:element.product_id,
- // book_name:element.product_name,
- // tg_platform_id:element.book_platform,
- // app_id:element.dy_small_applet_app_id,
- // main_id:main_info.id
- // }
- // const result = await video_applet_product_controllers.createAppletProductData({
- // book_platform:n_data.tg_platform_id,
- // product_name:n_data.book_name,
- // product_id:n_data.book_id,
- // dy_small_applet_app_id:n_data.app_id,
- // status:0,
- // main_id:n_data.main_id,
- // promotion_id:'',
- // dy_small_program_start:'',
- // dy_small_program_start_data:'',
- // wait_status:0,
- // })
- // }
- // redis_help.setKeyValue("isPauseTask","false")
- // }
- // }
- CMD.init()
|