const CMD = {} const redis_help = require('../src/use_redis'); const rabbitMq = require('../src/mq/rabbit-mq'); const config = require('../etc/config.json') const axios = require('axios'); const mysql = require('mysql2/promise'); const http = require('http'); const tools = require('../tools'); const dbConfig = config.isDebug?config.debug_task_mysql:config.release_task_mysql async function processTask(tg_platform_id, main_info) { try { console.log(`处理第 ${cur_page} 页任务`); const response = await CMD.syncMain(tg_platform_id, main_info); if (response !== null) { cur_page++; total = response.total; task_que.push(response); // 使用 Promise 延迟执行下一页 await new Promise(resolve => setTimeout(resolve, 1000)); return processTask(tg_platform_id, main_info); } else { console.log("完成所有创建:", main_info, cur_page); cur_page = -1; isStartTask = false; await redis_help.setKeyValue("isPauseTask", "false"); } } catch (e) { console.error("processTask 错误:", e); isStartTask = false; await redis_help.setKeyValue("isPauseTask", "false"); // if (cur_page !== -1) { // console.log("5秒后重试当前页..."); // await new Promise(resolve => setTimeout(resolve, 5000)); // return processTask(tg_platform_id, main_info); // } } } async function processSyncMainTask(){ try{ if(task_que.length>0){ let sync_main_task_item = task_que.shift() let product_list = sync_main_task_item.data; let main_info = sync_main_task_item.main_info; let main_id = main_info.id let material_platform_id = main_info.material_platform_id const connection = await mysql.createConnection({ ...dbConfig, multipleStatements: true }); const values = product_list.map(item => [ item.product_name, item.product_id, item.book_platform, main_id, main_info.app_id, 0, 0, material_platform_id, item.product_parent_id, main_info.butler_id, item.is_top ]); // let table_name = "video_applet_product" // const insertSQL = ` // INSERT INTO ${table_name} // (product_name, product_id, book_platform, main_id, dy_small_applet_app_id,status, // wait_status) // VALUES ? // `; const insertSQL = ` INSERT INTO video_applet_product (product_name, product_id, book_platform, main_id, dy_small_applet_app_id, status, wait_status,material_platform_id,product_parent_id,butler_id,weight) VALUES ? ON DUPLICATE KEY UPDATE product_name = VALUES(product_name), dy_small_applet_app_id = VALUES(dy_small_applet_app_id) `; await connection.query(insertSQL, [values]); await connection.end(); } }catch(e){ console.error("processSyncMainTask:",e) }finally{ setTimeout(processSyncMainTask, 1000); } } const messageHandler = async (msg) => { // CMD.start_task(msg['data']['data']) if(isStartTask){ return } cur_page = 1; total = 0; isStartTask = true let main_info = msg['data']['data'].main_info console.log("main_info:",main_info) if(main_info.parent_platform_id!=0){ tg_platform_id = main_info.parent_platform_id }else{ tg_platform_id = main_info.tg_platform_id } await redis_help.setKeyValue("isPauseTask","true") await processTask(tg_platform_id,main_info) }; // // 启动消费者 // async function startConsumer() { // try { // await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","addMain"); // } catch (error) { // console.error('启动消费者失败:', error); // } // } // 启动消费者 async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","syncMain"); } catch (error) { console.error('启动消费者失败:', error); } processSyncMainTask() } CMD.init = async function(){ redis_help.connect(()=>{ }) await startConsumer(); } let task_que = [] let cur_page = 1; let total = 0; let isStartTask = false CMD.syncMain = async function(tg_platform_id, main_info) { console.log(`====== 开始第 ${cur_page} 页请求 =====`); await new Promise(resolve => setTimeout(resolve, 500)); try { let min_book_word = main_info.min_book_word let max_book_word = main_info.max_book_word let min_stat_cost = parseFloat(main_info.min_stat_cost) let max_stat_cost = parseFloat(main_info.max_stat_cost) let start_publish_time = main_info.start_publish_time let end_publish_time = main_info.end_publish_time if(start_publish_time.length>0){ const timestampMs = Date.parse(start_publish_time); start_publish_time = timestampMs } if(end_publish_time.length>0){ const timestampMs = Date.parse(end_publish_time); end_publish_time = timestampMs } let postData = { cmd: "video_product", fun: "search_book_data_on_main", data: { "start_publish_time": start_publish_time, "end_publish_time": end_publish_time, "product_name": "", "product_id": "", "tg_platform_id": tg_platform_id, "oce_material_id": "", "page_size": 500, "page_number": cur_page, "is_auto": "", "is_store": "", "genre": "", //main_info.genre "alias_name": "", "stat_cost":"", "min_book_word":min_book_word, "max_book_word":max_book_word, "min_stat_cost":min_stat_cost, "max_stat_cost":max_stat_cost } }; console.log("发送请求数据:", postData); let client = tools.getOneNewClinet() let response = await client.post('http://127.0.0.1:9100/tg/back/api', postData, { headers: { 'Content-Type': 'application/json', 'X-Request-Type': 'API', // 标记 API 请求 } }); response = response.data // 检查响应数据结构 if (!response || !response.data || !Array.isArray(response.data)) { console.log("响应数据无效或格式错误"); return null; } // 检查是否有数据 if (response.data.length <= 0) { console.log("没有更多数据"); return null; } // 返回正确的数据结构 return { data: response.data, main_info: main_info, total: response.total || 0 }; } catch (e) { console.error("请求错误:", { message: e.message, code: e.code, stack: e.stack }); return null; } }; // CMD.start_task = async function(msgBody){ // let PlatformConfig = JSON.parse(await redis_help.getKeyValue("PlatformConfig")) // let main_info = msgBody.main_info // let list = msgBody.list // let PlatformInfo = null // if(main_info.running_status==0){ // console.log("创建了一个关闭的主体:",main_info) // return // } // console.log("main_info:",main_info) // console.log("list:",list) // for (let index = 0; index < PlatformConfig.length; index++) { // const element = PlatformConfig[index]; // if(element.tg_platform_id == main_info.tg_platform_id){ // PlatformInfo = element // break // } // } // if(PlatformInfo==null){ // console.log("平台配置错误:",msgBody) // }else{ // redis_help.setKeyValue("isPauseTask","true") // console.log("list::",list) // for (let index = 0; index < list.length; index++) { // const element = list[index]; // let n_data = {book_id:element.product_id, // book_name:element.product_name, // tg_platform_id:element.book_platform, // app_id:element.dy_small_applet_app_id, // main_id:main_info.id // } // const result = await video_applet_product_controllers.createAppletProductData({ // book_platform:n_data.tg_platform_id, // product_name:n_data.book_name, // product_id:n_data.book_id, // dy_small_applet_app_id:n_data.app_id, // status:0, // main_id:n_data.main_id, // promotion_id:'', // dy_small_program_start:'', // dy_small_program_start_data:'', // wait_status:0, // }) // } // redis_help.setKeyValue("isPauseTask","false") // } // } CMD.init()