|
@@ -0,0 +1,169 @@
|
|
|
+const CMD = {}
|
|
|
+const rabbitMq = require('../src/mq/rabbit-mq');
|
|
|
+const helper = require('../src/helper');
|
|
|
+const mysql = require('mysql2/promise');
|
|
|
+const crypto = require('crypto');
|
|
|
+const config = require('../etc/config.json')
|
|
|
+const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
|
|
|
+
|
|
|
+// 时间戳转MySQL datetime格式
|
|
|
+function timestampToMySQLDatetime(timestamp) {
|
|
|
+ if (!timestamp) return null;
|
|
|
+ // 将毫秒时间戳转换为Date对象
|
|
|
+ const date = new Date(parseInt(timestamp));
|
|
|
+ // 格式化为MySQL datetime格式
|
|
|
+ return date.toISOString().slice(0, 19).replace('T', ' ');
|
|
|
+}
|
|
|
+
|
|
|
+// 创建表的SQL模板(保持不变)
|
|
|
+const createTableSQL = (tableName) => `
|
|
|
+ CREATE TABLE IF NOT EXISTS ${tableName} (
|
|
|
+ id BIGINT NOT NULL AUTO_INCREMENT,
|
|
|
+ materialId VARCHAR(64) DEFAULT NULL,
|
|
|
+ awemeId VARCHAR(64) DEFAULT NULL,
|
|
|
+ title VARCHAR(500) DEFAULT NULL,
|
|
|
+ awemeUrl VARCHAR(500) DEFAULT NULL,
|
|
|
+ publishTime DATETIME DEFAULT NULL,
|
|
|
+ hasAnchorInfo TINYINT(1) DEFAULT NULL,
|
|
|
+ landingUrl VARCHAR(2048) DEFAULT NULL,
|
|
|
+ createTime DATETIME DEFAULT NULL,
|
|
|
+ updateTime DATETIME DEFAULT NULL,
|
|
|
+ created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
|
|
|
+ likeCount INT(11) DEFAULT NULL,
|
|
|
+ commentCount INT(11) DEFAULT NULL,
|
|
|
+ shareCount INT(11) DEFAULT NULL,
|
|
|
+ favoriteCount INT(11) DEFAULT NULL,
|
|
|
+ PRIMARY KEY (id),
|
|
|
+ KEY idx_material_id (awemeId),
|
|
|
+ KEY idx_aweme_id_like_id (awemeId, likeCount, id)
|
|
|
+ ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COLUMNAR=1'
|
|
|
+`;
|
|
|
+
|
|
|
+// 生成MD5(保持不变)
|
|
|
+function generateMD5(text) {
|
|
|
+ return crypto.createHash('md5').update(text).digest('hex');
|
|
|
+}
|
|
|
+
|
|
|
+// 确保表存在(保持不变)
|
|
|
+async function ensureTable(connection, tableName) {
|
|
|
+ try {
|
|
|
+ await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
|
|
|
+ await connection.query(`USE ${dbConfig.database}`);
|
|
|
+ const sql = createTableSQL(tableName);
|
|
|
+ await connection.query(sql);
|
|
|
+ } catch (error) {
|
|
|
+ console.error(`创建表失败: ${error.message}`);
|
|
|
+ throw error;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+// 处理视频标题(保持不变)
|
|
|
+async function processVideoTitles(connection, titles) {
|
|
|
+ if (!titles || titles.length === 0) return;
|
|
|
+ const titleData = titles.map(title => ({
|
|
|
+ title,
|
|
|
+ md5_tag: generateMD5(title)
|
|
|
+ }));
|
|
|
+
|
|
|
+ try {
|
|
|
+ const values = titleData.map(data => [data.title, data.md5_tag]);
|
|
|
+ const insertSQL = `
|
|
|
+ INSERT IGNORE INTO origin_video_titles
|
|
|
+ (title, md5_tag)
|
|
|
+ VALUES ?
|
|
|
+ `;
|
|
|
+ const [result] = await connection.query(insertSQL, [values]);
|
|
|
+ console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
|
|
|
+ } catch (error) {
|
|
|
+ console.error('处理视频标题失败:', error);
|
|
|
+ throw error;
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+const insertOriginData = async(connection,values) =>{
|
|
|
+ const insertSQL = `
|
|
|
+ INSERT INTO origin_data
|
|
|
+ (video_id, materialId, title, video_link, publish_time,
|
|
|
+ is_guajian, guajian_link, createTime, create_time,
|
|
|
+ like_num, comment_num, shared_num, kepp_num)
|
|
|
+ VALUES ?
|
|
|
+ ON DUPLICATE KEY UPDATE
|
|
|
+ video_id = VALUES(video_id)
|
|
|
+ `;
|
|
|
+
|
|
|
+ await connection.query(insertSQL, [values]);
|
|
|
+};
|
|
|
+// 处理消息的主函数(修改时间戳处理部分)
|
|
|
+const messageHandler = async (msg) => {
|
|
|
+ let message = JSON.parse(msg)
|
|
|
+ if(message.cmd!="on_recv_pull_data"){
|
|
|
+ return
|
|
|
+ }
|
|
|
+ let today = message.today
|
|
|
+ // let date = helper.getLocalDate().replace(/-/g, '');
|
|
|
+ let date = today.replace(/-/g, '');
|
|
|
+ let table_name = `pull_data_${date}`;
|
|
|
+ let connection = null
|
|
|
+ try {
|
|
|
+ connection = await mysql.createConnection({
|
|
|
+ ...dbConfig,
|
|
|
+ multipleStatements: true
|
|
|
+ });
|
|
|
+
|
|
|
+ await ensureTable(connection, table_name);
|
|
|
+ const data = message.data;
|
|
|
+ await processVideoTitles(connection, data.map(item => item.contentTitle));
|
|
|
+
|
|
|
+ // 转换时间戳为MySQL datetime格式
|
|
|
+ const values = data.map(item => [
|
|
|
+ item.contentId,
|
|
|
+ item.contentId,
|
|
|
+ item.contentTitle,
|
|
|
+ item.contentUrl,
|
|
|
+ new Date(item.publishTime), // 转换时间格式
|
|
|
+ item.landingUrlType!=0,
|
|
|
+ decodeURIComponent(item.landingInfo.landingUrl),
|
|
|
+ new Date(item.updateTime), // 转换时间格式
|
|
|
+ new Date(item.updateTime), // 转换时间格式
|
|
|
+ item.likeNum,
|
|
|
+ item.commentNum,
|
|
|
+ item.shareNum,
|
|
|
+ item.collectNum
|
|
|
+ ]);
|
|
|
+
|
|
|
+ const insertSQL = `
|
|
|
+ INSERT INTO ${table_name}
|
|
|
+ (materialId, awemeId, title, awemeUrl, publishTime,
|
|
|
+ hasAnchorInfo, landingUrl, createTime, updateTime,
|
|
|
+ likeCount, commentCount, shareCount, favoriteCount)
|
|
|
+ VALUES ?
|
|
|
+ `;
|
|
|
+
|
|
|
+ await insertOriginData(connection,values)
|
|
|
+ await connection.query(insertSQL, [values]);
|
|
|
+
|
|
|
+
|
|
|
+ console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
|
|
|
+ } catch (error) {
|
|
|
+ console.error('处理消息失败:', error);
|
|
|
+ } finally{
|
|
|
+ if(connection!=null){
|
|
|
+ await connection.end();
|
|
|
+ }
|
|
|
+ }
|
|
|
+};
|
|
|
+
|
|
|
+async function startConsumer() {
|
|
|
+ try {
|
|
|
+ await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system_new", "on_recv_pull_data");
|
|
|
+ } catch (error) {
|
|
|
+ console.error('启动消费者失败:', error);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+CMD.init = function() {
|
|
|
+ startConsumer();
|
|
|
+}
|
|
|
+CMD.init()
|
|
|
+module.exports = CMD
|