123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- const CMD = {}
- const rabbitMq = require('../src/mq/rabbit-mq');
- const helper = require('../src/helper');
- const mysql = require('mysql2/promise');
- const crypto = require('crypto');
- const config = require('../etc/config.json');
- const tools = require('../tools');
- const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
- // 时间戳转MySQL datetime格式
- function timestampToMySQLDatetime(timestamp) {
- if (!timestamp) return null;
- // 将毫秒时间戳转换为Date对象
- const date = new Date(parseInt(timestamp));
- // 格式化为MySQL datetime格式
- return date.toISOString().slice(0, 19).replace('T', ' ');
- }
- // 创建表的SQL模板(保持不变)
- const createTableSQL = (tableName) => `
- CREATE TABLE IF NOT EXISTS ${tableName} (
- id BIGINT NOT NULL AUTO_INCREMENT,
- materialId VARCHAR(64) DEFAULT NULL,
- awemeId VARCHAR(64) DEFAULT NULL,
- title VARCHAR(500) DEFAULT NULL,
- awemeUrl VARCHAR(500) DEFAULT NULL,
- publishTime DATETIME DEFAULT NULL,
- hasAnchorInfo TINYINT(1) DEFAULT NULL,
- landingUrl VARCHAR(2048) DEFAULT NULL,
- createTime DATETIME DEFAULT NULL,
- updateTime DATETIME DEFAULT NULL,
- created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
- likeCount INT(11) DEFAULT NULL,
- commentCount INT(11) DEFAULT NULL,
- shareCount INT(11) DEFAULT NULL,
- favoriteCount INT(11) DEFAULT NULL,
- PRIMARY KEY (id),
- KEY idx_material_id (awemeId),
- KEY idx_aweme_id_like_id (awemeId, likeCount, id)
- ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COLUMNAR=1'
- `;
- // 生成MD5(保持不变)
- function generateMD5(text) {
- return crypto.createHash('md5').update(text).digest('hex');
- }
- // 确保表存在(保持不变)
- async function ensureTable(connection, tableName) {
- try {
- await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
- await connection.query(`USE ${dbConfig.database}`);
- const sql = createTableSQL(tableName);
- await connection.query(sql);
- } catch (error) {
- console.error(`创建表失败: ${error.message}`);
- throw error;
- }
- }
- // 处理视频标题(保持不变)
- async function processVideoTitles(connection, titles) {
- if (!titles || titles.length === 0) return;
- const titleData = titles.map(title => ({
- title,
- md5_tag: generateMD5(title)
- }));
- try {
- const values = titleData.map(data => [data.title, data.md5_tag]);
- const insertSQL = `
- INSERT IGNORE INTO origin_video_titles
- (title, md5_tag)
- VALUES ?
- `;
- const [result] = await connection.query(insertSQL, [values]);
- console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
- } catch (error) {
- console.error('处理视频标题失败:', error);
- throw error;
- }
- }
- const insertOriginData = async(connection,values) =>{
-
- const insertSQL = `
- INSERT INTO origin_data
- (video_id, materialId, title, video_link, publish_time,
- is_guajian, guajian_link, createTime, create_time,
- like_num, comment_num, shared_num, kepp_num)
- VALUES ?
- ON DUPLICATE KEY UPDATE
- video_id = VALUES(video_id)
- `;
-
- await connection.query(insertSQL, [values]);
- };
- // 处理消息的主函数(修改时间戳处理部分)
- const messageHandler = async (msg) => {
- let message = JSON.parse(msg)
- if(message.cmd!="on_recv_pull_data"){
- return
- }
- let today = message.today
- // let date = helper.getLocalDate().replace(/-/g, '');
- let date = today.replace(/-/g, '');
- let table_name = `pull_data_${date}`;
- let connection = null
- try {
- connection = await mysql.createConnection({
- ...dbConfig,
- multipleStatements: true
- });
-
- await ensureTable(connection, table_name);
- const data = message.data;
- await processVideoTitles(connection, data.map(item => item.contentTitle));
-
- // 转换时间戳为MySQL datetime格式
- const values = data.map(item => [
- tools.getOriginVideoId(item.contentUrl),
- item.contentId,
- item.contentTitle,
- item.contentUrl,
- new Date(item.publishTime), // 转换时间格式
- item.landingUrlType!=0,
- decodeURIComponent(item.landingInfo.landingUrl),
- new Date(item.updateTime), // 转换时间格式
- new Date(item.updateTime), // 转换时间格式
- item.likeNum,
- item.commentNum,
- item.shareNum,
- item.collectNum
- ]);
-
- const insertSQL = `
- INSERT INTO ${table_name}
- (materialId, awemeId, title, awemeUrl, publishTime,
- hasAnchorInfo, landingUrl, createTime, updateTime,
- likeCount, commentCount, shareCount, favoriteCount)
- VALUES ?
- `;
-
- await insertOriginData(connection,values)
- await connection.query(insertSQL, [values]);
-
- console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
- } catch (error) {
- console.error('处理消息失败:', error);
- } finally{
- if(connection!=null){
- await connection.end();
- }
- }
- };
- async function startConsumer() {
- try {
- await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system_new", "on_recv_pull_data");
- } catch (error) {
- console.error('启动消费者失败:', error);
- }
- }
- CMD.init = function() {
- startConsumer();
- }
- CMD.init()
- module.exports = CMD
|