123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- const CMD = {}
- const rabbitMq = require('../src/mq/rabbit-mq');
- const helper = require('../src/helper');
- const mysql = require('mysql2/promise');
- const crypto = require('crypto');
- const config = require('../etc/config.json')
- const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
- // 创建表的SQL模板
- const createTableSQL = (tableName) => `
- CREATE TABLE IF NOT EXISTS ${tableName} (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- material_id VARCHAR(64),
- aweme_id VARCHAR(255),
- title VARCHAR(500),
- aweme_url VARCHAR(500),
- publish_time BIGINT,
- has_anchor_info BOOLEAN,
- landing_url VARCHAR(1024),
- create_time BIGINT,
- update_time BIGINT,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
- )
- `;
- // 生成MD5
- function generateMD5(text) {
- return crypto.createHash('md5').update(text).digest('hex');
- }
- // 确保表存在
- async function ensureTable(connection, tableName) {
- try {
- // 首先确保数据库存在
- await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
- await connection.query(`USE ${dbConfig.database}`);
-
- // 创建当天的表
- const sql = createTableSQL(tableName);
- await connection.query(sql);
-
- } catch (error) {
- console.error(`创建表失败: ${error.message}`);
- throw error;
- }
- }
- // 优化后的视频标题处理函数
- async function processVideoTitles(connection, titles) {
- if (!titles || titles.length === 0) return;
- // 批量生成MD5
- const titleData = titles.map(title => ({
- title,
- md5_tag: generateMD5(title)
- }));
- try {
- // 准备批量插入数据
- const values = titleData.map(data => [data.title, data.md5_tag]);
-
- // 使用INSERT IGNORE语法,忽略重复的md5_tag
- const insertSQL = `
- INSERT IGNORE INTO origin_video_titles
- (title, md5_tag)
- VALUES ?
- `;
-
- // 执行批量插入
- const [result] = await connection.query(insertSQL, [values]);
- console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
-
- } catch (error) {
- console.error('处理视频标题失败:', error);
- throw error;
- }
- }
- // 处理消息的主函数
- const messageHandler = async (msg) => {
- let message = JSON.parse(msg)
- if(message.cmd!="on_recv_pull_data"){
- return
- }
- let date = helper.getLocalDate();
- let table_name = `pull_data_${date}`;
- try {
- // 创建数据库连接
- const connection = await mysql.createConnection({
- ...dbConfig,
- multipleStatements: true // 启用多语句查询
- });
-
- // 确保表存在
- await ensureTable(connection, table_name);
-
- // 解析消息数据
- const data = message.data;
-
- // 处理视频标题(批量处理)
- await processVideoTitles(connection, data.map(item => item.title));
-
- // 准备批量插入的数据
- const values = data.map(item => [
- item.materialId,
- item.awemeId,
- item.title,
- item.awemeUrl,
- item.publishTime,
- item.hasAnchorInfo,
- item.landingUrl,
- item.createTime,
- item.updateTime
- ]);
-
- // 执行批量插入
- const insertSQL = `
- INSERT INTO ${table_name}
- (material_id, aweme_id, title, aweme_url, publish_time,
- has_anchor_info, landing_url, create_time, update_time)
- VALUES ?
- `;
-
- await connection.query(insertSQL, [values]);
-
- // 关闭连接
- await connection.end();
-
- console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
- } catch (error) {
- console.error('处理消息失败:', error);
- throw error;
- }
- };
- async function startConsumer() {
- try {
- await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system", "on_recv_pull_data");
- } catch (error) {
- console.error('启动消费者失败:', error);
- }
- }
- CMD.init = function() {
- startConsumer();
- }
- module.exports = CMD
|