MAIN_SERVE_FACTORY.js 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. const CMD = {}
  2. const redis_help = require('../src/use_redis');
  3. const rabbitMq = require('../src/mq/rabbit-mq');
  4. const config = require('../etc/config.json')
  5. const axios = require('axios');
  6. const mysql = require('mysql2/promise');
  7. const http = require('http');
  8. const tools = require('../tools');
  9. const dbConfig = config.isDebug?config.debug_task_mysql:config.release_task_mysql
  10. async function processTask(tg_platform_id, main_info) {
  11. try {
  12. console.log(`处理第 ${cur_page} 页任务`);
  13. const response = await CMD.syncMain(tg_platform_id, main_info);
  14. if (response !== null) {
  15. cur_page++;
  16. total = response.total;
  17. task_que.push(response);
  18. // 使用 Promise 延迟执行下一页
  19. await new Promise(resolve => setTimeout(resolve, 1000));
  20. return processTask(tg_platform_id, main_info);
  21. } else {
  22. console.log("完成所有创建:", main_info, cur_page);
  23. cur_page = -1;
  24. isStartTask = false;
  25. await redis_help.setKeyValue("isPauseTask", "false");
  26. }
  27. } catch (e) {
  28. console.error("processTask 错误:", e);
  29. isStartTask = false;
  30. await redis_help.setKeyValue("isPauseTask", "false");
  31. // if (cur_page !== -1) {
  32. // console.log("5秒后重试当前页...");
  33. // await new Promise(resolve => setTimeout(resolve, 5000));
  34. // return processTask(tg_platform_id, main_info);
  35. // }
  36. }
  37. }
  38. async function processSyncMainTask(){
  39. try{
  40. if(task_que.length>0){
  41. let sync_main_task_item = task_que.shift()
  42. let product_list = sync_main_task_item.data;
  43. let main_info = sync_main_task_item.main_info;
  44. let main_id = main_info.id
  45. let material_platform_id = main_info.material_platform_id
  46. const connection = await mysql.createConnection({
  47. ...dbConfig,
  48. multipleStatements: true
  49. });
  50. const values = product_list.map(item => [
  51. item.product_name,
  52. item.product_id,
  53. item.book_platform,
  54. main_id,
  55. main_info.app_id,
  56. 0,
  57. 0,
  58. material_platform_id,
  59. item.product_parent_id,
  60. main_info.butler_id,
  61. item.is_top
  62. ]);
  63. // let table_name = "video_applet_product"
  64. // const insertSQL = `
  65. // INSERT INTO ${table_name}
  66. // (product_name, product_id, book_platform, main_id, dy_small_applet_app_id,status,
  67. // wait_status)
  68. // VALUES ?
  69. // `;
  70. const insertSQL = `
  71. INSERT INTO video_applet_product
  72. (product_name, product_id, book_platform, main_id, dy_small_applet_app_id, status, wait_status,material_platform_id,product_parent_id,butler_id,weight)
  73. VALUES ?
  74. ON DUPLICATE KEY UPDATE
  75. product_name = VALUES(product_name),
  76. dy_small_applet_app_id = VALUES(dy_small_applet_app_id)
  77. `;
  78. await connection.query(insertSQL, [values]);
  79. await connection.end();
  80. }
  81. }catch(e){
  82. console.error("processSyncMainTask:",e)
  83. }finally{
  84. setTimeout(processSyncMainTask, 1000);
  85. }
  86. }
  87. const messageHandler = async (msg) => {
  88. // CMD.start_task(msg['data']['data'])
  89. if(isStartTask){
  90. return
  91. }
  92. cur_page = 1;
  93. total = 0;
  94. isStartTask = true
  95. let main_info = msg['data']['data'].main_info
  96. console.log("main_info:",main_info)
  97. if(main_info.parent_platform_id!=0){
  98. tg_platform_id = main_info.parent_platform_id
  99. }else{
  100. tg_platform_id = main_info.tg_platform_id
  101. }
  102. await redis_help.setKeyValue("isPauseTask","true")
  103. await processTask(tg_platform_id,main_info)
  104. };
  105. // // 启动消费者
  106. // async function startConsumer() {
  107. // try {
  108. // await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","addMain");
  109. // } catch (error) {
  110. // console.error('启动消费者失败:', error);
  111. // }
  112. // }
  113. // 启动消费者
  114. async function startConsumer() {
  115. try {
  116. await rabbitMq.consumerDirectMsg(messageHandler,"exchange_system","syncMain");
  117. } catch (error) {
  118. console.error('启动消费者失败:', error);
  119. }
  120. processSyncMainTask()
  121. }
  122. CMD.init = async function(){
  123. redis_help.connect(()=>{
  124. })
  125. await startConsumer();
  126. }
  127. let task_que = []
  128. let cur_page = 1;
  129. let total = 0;
  130. let isStartTask = false
  131. CMD.syncMain = async function(tg_platform_id, main_info) {
  132. console.log(`====== 开始第 ${cur_page} 页请求 =====`);
  133. await new Promise(resolve => setTimeout(resolve, 500));
  134. try {
  135. let min_book_word = main_info.min_book_word
  136. let max_book_word = main_info.max_book_word
  137. let min_stat_cost = parseFloat(main_info.min_stat_cost)
  138. let max_stat_cost = parseFloat(main_info.max_stat_cost)
  139. let start_publish_time = main_info.start_publish_time
  140. let end_publish_time = main_info.end_publish_time
  141. if(start_publish_time.length>0){
  142. const timestampMs = Date.parse(start_publish_time);
  143. start_publish_time = timestampMs
  144. }
  145. if(end_publish_time.length>0){
  146. const timestampMs = Date.parse(end_publish_time);
  147. end_publish_time = timestampMs
  148. }
  149. let postData = {
  150. cmd: "video_product",
  151. fun: "search_book_data_on_main",
  152. data: {
  153. "start_publish_time": start_publish_time,
  154. "end_publish_time": end_publish_time,
  155. "product_name": "",
  156. "product_id": "",
  157. "tg_platform_id": tg_platform_id,
  158. "oce_material_id": "",
  159. "page_size": 500,
  160. "page_number": cur_page,
  161. "is_auto": "",
  162. "is_store": "",
  163. "genre": "", //main_info.genre
  164. "alias_name": "",
  165. "stat_cost":"",
  166. "min_book_word":min_book_word,
  167. "max_book_word":max_book_word,
  168. "min_stat_cost":min_stat_cost,
  169. "max_stat_cost":max_stat_cost
  170. }
  171. };
  172. console.log("发送请求数据:", postData);
  173. let client = tools.getOneNewClinet()
  174. let response = await client.post('http://127.0.0.1:9100/tg/back/api', postData, {
  175. headers: {
  176. 'Content-Type': 'application/json',
  177. 'X-Request-Type': 'API', // 标记 API 请求
  178. }
  179. });
  180. response = response.data
  181. // 检查响应数据结构
  182. if (!response || !response.data || !Array.isArray(response.data)) {
  183. console.log("响应数据无效或格式错误");
  184. return null;
  185. }
  186. // 检查是否有数据
  187. if (response.data.length <= 0) {
  188. console.log("没有更多数据");
  189. return null;
  190. }
  191. // 返回正确的数据结构
  192. return {
  193. data: response.data,
  194. main_info: main_info,
  195. total: response.total || 0
  196. };
  197. } catch (e) {
  198. console.error("请求错误:", {
  199. message: e.message,
  200. code: e.code,
  201. stack: e.stack
  202. });
  203. return null;
  204. }
  205. };
  206. // CMD.start_task = async function(msgBody){
  207. // let PlatformConfig = JSON.parse(await redis_help.getKeyValue("PlatformConfig"))
  208. // let main_info = msgBody.main_info
  209. // let list = msgBody.list
  210. // let PlatformInfo = null
  211. // if(main_info.running_status==0){
  212. // console.log("创建了一个关闭的主体:",main_info)
  213. // return
  214. // }
  215. // console.log("main_info:",main_info)
  216. // console.log("list:",list)
  217. // for (let index = 0; index < PlatformConfig.length; index++) {
  218. // const element = PlatformConfig[index];
  219. // if(element.tg_platform_id == main_info.tg_platform_id){
  220. // PlatformInfo = element
  221. // break
  222. // }
  223. // }
  224. // if(PlatformInfo==null){
  225. // console.log("平台配置错误:",msgBody)
  226. // }else{
  227. // redis_help.setKeyValue("isPauseTask","true")
  228. // console.log("list::",list)
  229. // for (let index = 0; index < list.length; index++) {
  230. // const element = list[index];
  231. // let n_data = {book_id:element.product_id,
  232. // book_name:element.product_name,
  233. // tg_platform_id:element.book_platform,
  234. // app_id:element.dy_small_applet_app_id,
  235. // main_id:main_info.id
  236. // }
  237. // const result = await video_applet_product_controllers.createAppletProductData({
  238. // book_platform:n_data.tg_platform_id,
  239. // product_name:n_data.book_name,
  240. // product_id:n_data.book_id,
  241. // dy_small_applet_app_id:n_data.app_id,
  242. // status:0,
  243. // main_id:n_data.main_id,
  244. // promotion_id:'',
  245. // dy_small_program_start:'',
  246. // dy_small_program_start_data:'',
  247. // wait_status:0,
  248. // })
  249. // }
  250. // redis_help.setKeyValue("isPauseTask","false")
  251. // }
  252. // }
  253. CMD.init()