const CMD = {} const time_count = 60000*5 //60秒更新一次配置 const config = require('../etc/config.json') const redis_help = require('../src/use_redis'); const WebSocketClient = require('../src/WebSocketClient'); const rabbitMq = require('../src/mq/rabbit-mq'); const tools = require('../tools'); console.log("运行 MESSAGE_DISPATCH") CMD.updateFilterConfig = async function(){ const postData = { cmd:"db_filter_config", fun:"getConfig", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("FilterConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('updateFilterConfig error!', error); }); } CMD.updateCloseMainConfig = async function(){ const postData = { cmd:"tg_main", fun:"getCloseCreateLinkMainList", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("CloseMainConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('updateCloseMainConfig error!', error); }); } CMD.updateMainConfig = async function(){ const postData = { cmd:"tg_main", fun:"getOpenCreateLinkMainList", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { CMD.updateCloseMainConfig() redis_help.setKeyValue("MainConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('updateFilterConfig error!', error); }); } CMD.updateAppConfig = async function(){ const postData = { cmd:"tg_app", fun:"getAppList", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("AppConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('updateFilterConfig error!', error); }); } CMD.updatePlatformConfig = async function(){ const postData = { cmd:"tg_platform", fun:"getPlatformList", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { CMD.updatePlatformShared() redis_help.setKeyValue("PlatformConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('updateFilterConfig error!', error); }); } CMD.updatePlatformShared = async function(){ const postData = { cmd:"platform_shared", fun:"gePlatformSharedList", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("PlatformShared",JSON.stringify(response.data.data)) }) .catch(error => { console.error('PlatformShared error!', error); }); } CMD.updateFqMfKeyList = async function(){ const postData = { cmd:"fq_mf_book", fun:"get_all_fq_mf_key", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("all_fq_mf_key",JSON.stringify(response.data.data)) }) .catch(error => { console.error('all_fq_mf_key error!', error); }); } CMD.updateYwKeyList = async function(){ const postData = { cmd:"yw_book", fun:"get_all_yw_key", data:{} }; let client = tools.getOneNewClinet() let response = await client.post('http://127.0.0.1:9100/tg/back/api', postData) console.log("updateYwKeyList:",response.data.data) redis_help.setKeyValue("all_yw_key",JSON.stringify(response.data.data)) } CMD.updateFqKeyList = async function(){ const postData = { cmd:"fq_book", fun:"get_all_fq_key", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("all_fq_key",JSON.stringify(response.data.data)) }) .catch(error => { console.error('all_fq_key error!', error); }); } CMD.updateBlackBooks = async function(){ const postData = { cmd:"book_black_list", fun:"get_all_black_books", data:{} }; let client = tools.getOneNewClinet() client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("all_black_books",JSON.stringify(response.data.data)) }) .catch(error => { console.error('all_black_books error!', error); }); } CMD.getPullDataConfig = async function() { const postData = { cmd:"pull_data_config", fun:"get_all_pull_data_list", data:{} }; let client = tools.getOneNewClinet() return await client.post('http://127.0.0.1:9100/tg/back/api', postData) .then(response => { redis_help.setKeyValue("PullDataConfig",JSON.stringify(response.data.data)) }) .catch(error => { console.error('getPullDataConfig error!', error); }); } var ws_isOpen = false CMD.myListener = { onOpen: function (webSocket, response) { ws_isOpen = true //打开链接后,想服务器端发送一条消息 sendMessage("status_task",{work:1}) }, onMessage: function (webSocket, msg) { //msg可能是字符串,也可能是byte数组,取决于服务器送的内容 recvMessage(msg) }, onClosing: function (webSocket, code, response) { ws_isOpen = false }, onClosed: function (webSocket, code, response) { ws_isOpen = false }, onFailure: function (webSocket, t, response) { } } function sendMessage(cmd,data){ if(ws_isOpen==true){ var json = {}; json.cmd=cmd; //方法 json.data= data; //参数 var msg=JSON.stringify(json); if(CMD.wsClient!=undefined&&CMD.wsClient!=null){ CMD.wsClient.sendMsg(msg); } } } async function sendMQMessage(message,routingKey = null) { try { if(routingKey!=null){ await rabbitMq.producerDirectMsg( message,"exchange_system",routingKey); }else{ await rabbitMq.producerDirectMsg( message,"exchange_system"); } console.log('消息发送成功'); } catch (error) { console.error('发送消息失败:', error); } } async function sendPullMQMessage(message,routingKey = null) { try { if(routingKey!=null){ await rabbitMq.producerDirectMsg( message,"exchange_update_pull_config",routingKey); }else{ await rabbitMq.producerDirectMsg( message,"exchange_update_pull_config"); } console.log('消息发送成功'); } catch (error) { console.error('发送消息失败:', error); } } async function recvMessage(data){ try { // 单个任务 let json_msg = JSON.parse(data) console.log("recvMessage:",json_msg.cmd) switch (json_msg.cmd) { case "updateFilterConfig": CMD.updateFilterConfig() break; case "updateMainConfig": CMD.updateMainConfig() break; case "updateAppConfig": CMD.updateAppConfig() break; case "updatePlatformConfig": CMD.updatePlatformConfig() break; case "updateFqKeyList": CMD.updateFqKeyList() break; case "updateFqMfKeyList": CMD.updateFqMfKeyList() break; case "updateYwKeyList": await CMD.updateYwKeyList() await tools.getYwOPENSESSID() break; case "updateBlackBooks": CMD.updateBlackBooks() break; // case "addMain": case "syncMain": { const postData = { cmd:"filter_task", fun:json_msg.cmd, data:json_msg }; sendMQMessage(postData,json_msg.cmd) } break; case "getPullDataConfig": //获取拉取数据类配置 { CMD.getPullDataConfig() } break; case "updatePullConig": //同步拉取配置 { await CMD.getPullDataConfig() await new Promise(resolve=>setTimeout(resolve,100)) const postData = { cmd:"filter_task", fun:json_msg.cmd, data:json_msg }; sendPullMQMessage(postData,json_msg.cmd) } break; default: // const client = new HttpClient({ // timeout: 5000, // // headers: { // // 'User-Agent': 'Custom User Agent' // // } // }); // const response = await client.post('http://127.0.0.1:9111',postData); break; } }catch(e){ console.error("recvMessage error1!:",e) } } CMD.init = function(){ redis_help.connect((results)=>{ if(results){ CMD.wsClient = new WebSocketClient(config.ws_config.host, { reconnectDelay: 5000, // 5 seconds heartbeatInterval: 30000, // 30 seconds heartbeatTimeout: 5000 // 5 seconds },CMD.myListener); } }) CMD.updateConfig() CMD.updateTimeConfig() // setInterval(async () => { // await require('../src/api/yw/check_login').update_cookies() // }, 3000*20); } CMD.updateTimeConfig = async function(){ let create_heiyan_book_link = require('../src/api/hy/create_heiyan_book_link') create_heiyan_book_link.login((data)=>{ console.log("hei_yan_token:",data['token']) redis_help.setKeyValue("hei_yan_token",data['token']) }) let qm_login = require('../src/api/qm/qm_login') qm_login.login((data,headers)=>{ console.log("qi_mao_token:",headers['n-token']) redis_help.setKeyValue("qi_mao_token",headers['n-token']) }) let qm_mf_login = require('../src/api/qm_mf/qm_mf_login') qm_mf_login.login((data,headers)=>{ console.log("qi_mao_mf_token:",headers['n-token']) redis_help.setKeyValue("qi_mao_mf_token",headers['n-token']) }) require('../src/api/dz/dz_login').login() let zy_token = await require('../src/api/zy/zy_login').zy_login() if(zy_token!=null){ console.log("zy_token:",zy_token) await redis_help.setKeyValue("zy_token",zy_token) } } //定时更新配置 CMD.updateConfig = function(){ setInterval(()=>{ CMD.updateTimeConfig() },time_count) } CMD.init()