const CMD = {} const rabbitMq = require('../src/mq/rabbit-mq'); const helper = require('../src/helper'); const mysql = require('mysql2/promise'); const crypto = require('crypto'); const config = require('../etc/config.json') const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql // 时间戳转MySQL datetime格式 function timestampToMySQLDatetime(timestamp) { if (!timestamp) return null; // 将毫秒时间戳转换为Date对象 const date = new Date(parseInt(timestamp)); // 格式化为MySQL datetime格式 return date.toISOString().slice(0, 19).replace('T', ' '); } // 创建表的SQL模板(保持不变) const createTableSQL = (tableName) => ` CREATE TABLE IF NOT EXISTS ${tableName} ( id BIGINT PRIMARY KEY AUTO_INCREMENT, materialId VARCHAR(64), awemeId VARCHAR(64), title VARCHAR(500), awemeUrl VARCHAR(500), publishTime datetime DEFAULT NULL, hasAnchorInfo BOOLEAN, landingUrl VARCHAR(2048), createTime datetime DEFAULT NULL, updateTime datetime DEFAULT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, likeCount int, commentCount int, shareCount int, favoriteCount int, INDEX idx_aweme_id_like_id (awemeId, likeCount, id) ) `; // 生成MD5(保持不变) function generateMD5(text) { return crypto.createHash('md5').update(text).digest('hex'); } // 确保表存在(保持不变) async function ensureTable(connection, tableName) { try { await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`); await connection.query(`USE ${dbConfig.database}`); const sql = createTableSQL(tableName); await connection.query(sql); } catch (error) { console.error(`创建表失败: ${error.message}`); throw error; } } // 处理视频标题(保持不变) async function processVideoTitles(connection, titles) { if (!titles || titles.length === 0) return; const titleData = titles.map(title => ({ title, md5_tag: generateMD5(title) })); try { const values = titleData.map(data => [data.title, data.md5_tag]); const insertSQL = ` INSERT IGNORE INTO origin_video_titles (title, md5_tag) VALUES ? `; const [result] = await connection.query(insertSQL, [values]); console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`); } catch (error) { console.error('处理视频标题失败:', error); throw error; } } // 处理消息的主函数(修改时间戳处理部分) const messageHandler = async (msg) => { let message = JSON.parse(msg) if(message.cmd!="on_recv_pull_data"){ return } let today = message.today // let date = helper.getLocalDate().replace(/-/g, ''); let date = today.replace(/-/g, ''); let table_name = `pull_data_${date}`; try { const connection = await mysql.createConnection({ ...dbConfig, multipleStatements: true }); await ensureTable(connection, table_name); const data = message.data; await processVideoTitles(connection, data.map(item => item.title)); // 转换时间戳为MySQL datetime格式 const values = data.map(item => [ item.materialId, item.awemeId, item.title, item.awemeUrl, new Date(item.publishTime), // 转换时间格式 item.hasAnchorInfo, item.landingUrl, new Date(item.createTime), // 转换时间格式 new Date(item.updateTime), // 转换时间格式 item.likeCount, item.commentCount, item.shareCount, item.favoriteCount ]); const insertSQL = ` INSERT INTO ${table_name} (materialId, awemeId, title, awemeUrl, publishTime, hasAnchorInfo, landingUrl, createTime, updateTime, likeCount, commentCount, shareCount, favoriteCount) VALUES ? `; await connection.query(insertSQL, [values]); await connection.end(); console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`); } catch (error) { console.error('处理消息失败:', error); throw error; } }; async function startConsumer() { try { await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system", "on_recv_pull_data"); } catch (error) { console.error('启动消费者失败:', error); } } CMD.init = function() { startConsumer(); } CMD.init() module.exports = CMD