123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- const CMD = {}
- const rabbitMq = require('../src/mq/rabbit-mq');
- const helper = require('../src/helper');
- const mysql = require('mysql2/promise');
- const crypto = require('crypto');
- const config = require('../etc/config.json')
- const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
- // 时间戳转MySQL datetime格式
- function timestampToMySQLDatetime(timestamp) {
- if (!timestamp) return null;
- // 将毫秒时间戳转换为Date对象
- const date = new Date(parseInt(timestamp));
- // 格式化为MySQL datetime格式
- return date.toISOString().slice(0, 19).replace('T', ' ');
- }
- // 创建表的SQL模板(保持不变)
- const createTableSQL = (tableName) => `
- CREATE TABLE IF NOT EXISTS ${tableName} (
- id BIGINT PRIMARY KEY AUTO_INCREMENT,
- publishTime datetime DEFAULT NULL,
- landingUrl VARCHAR(2048),
- videoId VARCHAR(64),
- status int,
- miniseriesId VARCHAR(64),
- uniqueId VARCHAR(64) UNIQUE,
- materialId VARCHAR(64),
-
- miniseriesName VARCHAR(64),
- title VARCHAR(500),
- width int,
- height int,
- duration int,
- coverUrl VARCHAR(255),
- videoUrl VARCHAR(255),
- likeCount int,
- commentCount int,
- shareCount int,
- favoriteCount int,
- heat VARCHAR(64),
- materialType VARCHAR(64),
- videoMd5 VARCHAR(64),
-
- createTime datetime DEFAULT NULL,
- updateTime datetime DEFAULT NULL,
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- INDEX idx_md5_id_like_id_status (videoMd5, likeCount, id,status)
- )
- `;
- // 生成MD5(保持不变)
- function generateMD5(text) {
- return crypto.createHash('md5').update(text).digest('hex');
- }
- // 确保表存在(保持不变)
- async function ensureTable(connection, tableName) {
- try {
- await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
- await connection.query(`USE ${dbConfig.database}`);
- const sql = createTableSQL(tableName);
- await connection.query(sql);
- } catch (error) {
- console.error(`创建表失败: ${error.message}`);
- throw error;
- }
- }
- // 处理视频标题(保持不变)
- async function processVideoTitles(connection, titles) {
- if (!titles || titles.length === 0) return;
- const titleData = titles.map(title => ({
- title,
- md5_tag: generateMD5(title)
- }));
- try {
- const values = titleData.map(data => [data.title, data.md5_tag]);
- const insertSQL = `
- INSERT IGNORE INTO origin_video_titles
- (title, md5_tag)
- VALUES ?
- `;
- const [result] = await connection.query(insertSQL, [values]);
- console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
- } catch (error) {
- console.error('处理视频标题失败:', error);
- throw error;
- }
- }
- // 处理消息的主函数(修改时间戳处理部分)
- const messageHandler = async (msg) => {
- let message = JSON.parse(msg)
- if(message.cmd!="on_recv_pull_data"){
- return
- }
- let today = message.today
- // let date = helper.getLocalDate().replace(/-/g, '');
- let date = today.replace(/-/g, '');
- let table_name = `pull_data_${date}`;
- try {
- const connection = await mysql.createConnection({
- ...dbConfig,
- multipleStatements: true
- });
-
- await ensureTable(connection, table_name);
- const data = message.data;
- // await processVideoTitles(connection, data.map(item => item.title));
- console.log("data:",data)
- // 转换时间戳为MySQL datetime格式
- const values = data.map(item => [
- item.miniseriesId,
- item.uniqueId,
- item.materialId,
- item.miniseriesName,
- item.title,
- item.width,
- item.height,
- item.duration,
- item.coverUrl,
- item.videoUrl,
- item.likeCount.count,
- item.commentCount.count,
- item.shareCount.count,
- item.favoriteCount.count,
- item.heat.countStr,
- item.materialType,
- item.videoMd5,
- new Date(item.createTime), // 转换时间格式
- new Date(item.updateTime), // 转换时间格式
- 0
- ]);
-
- const insertSQL = `
- INSERT INTO ${table_name}
- (miniseriesId, uniqueId, materialId, miniseriesName, title,
- width, height, duration, coverUrl,
- videoUrl, likeCount, commentCount, shareCount,
- favoriteCount, heat, materialType, videoMd5,
- createTime, updateTime,status)
- VALUES ?
- ON DUPLICATE KEY UPDATE
- miniseriesId = VALUES(miniseriesId),
- materialId = VALUES(materialId),
- miniseriesName = VALUES(miniseriesName),
- title = VALUES(title),
- width = VALUES(width),
- height = VALUES(height),
- duration = VALUES(duration),
- coverUrl = VALUES(coverUrl),
- videoUrl = VALUES(videoUrl),
- likeCount = VALUES(likeCount),
- commentCount = VALUES(commentCount),
- shareCount = VALUES(shareCount),
- favoriteCount = VALUES(favoriteCount),
- heat = VALUES(heat),
- materialType = VALUES(materialType),
- videoMd5 = VALUES(videoMd5),
- createTime = VALUES(createTime),
- updateTime = VALUES(updateTime)
- `;
-
- await connection.query(insertSQL, [values]);
- await connection.end();
-
- console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
- } catch (error) {
- console.error('处理消息失败:', error);
- throw error;
- }
- };
- async function startConsumer() {
- try {
- await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system", "on_recv_pull_data");
- } catch (error) {
- console.error('启动消费者失败:', error);
- }
- }
- CMD.init = function() {
- startConsumer();
- }
- CMD.init()
- module.exports = CMD
|