|
@@ -6,7 +6,16 @@ const crypto = require('crypto');
|
|
|
const config = require('../etc/config.json')
|
|
|
const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
|
|
|
|
|
|
-// 创建表的SQL模板
|
|
|
+// 时间戳转MySQL datetime格式
|
|
|
+function timestampToMySQLDatetime(timestamp) {
|
|
|
+ if (!timestamp) return null;
|
|
|
+ // 将毫秒时间戳转换为Date对象
|
|
|
+ const date = new Date(parseInt(timestamp));
|
|
|
+ // 格式化为MySQL datetime格式
|
|
|
+ return date.toISOString().slice(0, 19).replace('T', ' ');
|
|
|
+}
|
|
|
+
|
|
|
+// 创建表的SQL模板(保持不变)
|
|
|
const createTableSQL = (tableName) => `
|
|
|
CREATE TABLE IF NOT EXISTS ${tableName} (
|
|
|
id BIGINT PRIMARY KEY AUTO_INCREMENT,
|
|
@@ -23,60 +32,48 @@ const createTableSQL = (tableName) => `
|
|
|
)
|
|
|
`;
|
|
|
|
|
|
-// 生成MD5
|
|
|
+// 生成MD5(保持不变)
|
|
|
function generateMD5(text) {
|
|
|
return crypto.createHash('md5').update(text).digest('hex');
|
|
|
}
|
|
|
|
|
|
-// 确保表存在
|
|
|
+// 确保表存在(保持不变)
|
|
|
async function ensureTable(connection, tableName) {
|
|
|
try {
|
|
|
- // 首先确保数据库存在
|
|
|
await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
|
|
|
await connection.query(`USE ${dbConfig.database}`);
|
|
|
-
|
|
|
- // 创建当天的表
|
|
|
const sql = createTableSQL(tableName);
|
|
|
await connection.query(sql);
|
|
|
-
|
|
|
} catch (error) {
|
|
|
console.error(`创建表失败: ${error.message}`);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 优化后的视频标题处理函数
|
|
|
+// 处理视频标题(保持不变)
|
|
|
async function processVideoTitles(connection, titles) {
|
|
|
if (!titles || titles.length === 0) return;
|
|
|
-
|
|
|
- // 批量生成MD5
|
|
|
const titleData = titles.map(title => ({
|
|
|
title,
|
|
|
md5_tag: generateMD5(title)
|
|
|
}));
|
|
|
|
|
|
try {
|
|
|
- // 准备批量插入数据
|
|
|
const values = titleData.map(data => [data.title, data.md5_tag]);
|
|
|
-
|
|
|
- // 使用INSERT IGNORE语法,忽略重复的md5_tag
|
|
|
const insertSQL = `
|
|
|
INSERT IGNORE INTO origin_video_titles
|
|
|
(title, md5_tag)
|
|
|
VALUES ?
|
|
|
`;
|
|
|
-
|
|
|
- // 执行批量插入
|
|
|
const [result] = await connection.query(insertSQL, [values]);
|
|
|
console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
|
|
|
-
|
|
|
} catch (error) {
|
|
|
console.error('处理视频标题失败:', error);
|
|
|
throw error;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// 处理消息的主函数
|
|
|
+// 处理消息的主函数(修改时间戳处理部分)
|
|
|
const messageHandler = async (msg) => {
|
|
|
let message = JSON.parse(msg)
|
|
|
if(message.cmd!="on_recv_pull_data"){
|
|
@@ -86,35 +83,28 @@ const messageHandler = async (msg) => {
|
|
|
let table_name = `pull_data_${date}`;
|
|
|
|
|
|
try {
|
|
|
- // 创建数据库连接
|
|
|
const connection = await mysql.createConnection({
|
|
|
...dbConfig,
|
|
|
- multipleStatements: true // 启用多语句查询
|
|
|
+ multipleStatements: true
|
|
|
});
|
|
|
|
|
|
- // 确保表存在
|
|
|
await ensureTable(connection, table_name);
|
|
|
-
|
|
|
- // 解析消息数据
|
|
|
const data = message.data;
|
|
|
-
|
|
|
- // 处理视频标题(批量处理)
|
|
|
await processVideoTitles(connection, data.map(item => item.title));
|
|
|
|
|
|
- // 准备批量插入的数据
|
|
|
+ // 转换时间戳为MySQL datetime格式
|
|
|
const values = data.map(item => [
|
|
|
item.materialId,
|
|
|
item.awemeId,
|
|
|
item.title,
|
|
|
item.awemeUrl,
|
|
|
- item.publishTime,
|
|
|
+ new Date(item.publishTime), // 转换时间格式
|
|
|
item.hasAnchorInfo,
|
|
|
item.landingUrl,
|
|
|
- item.createTime,
|
|
|
- item.updateTime
|
|
|
+ new Date(item.createTime), // 转换时间格式
|
|
|
+ new Date(item.updateTime) // 转换时间格式
|
|
|
]);
|
|
|
|
|
|
- // 执行批量插入
|
|
|
const insertSQL = `
|
|
|
INSERT INTO ${table_name}
|
|
|
(materialId, awemeId, title, awemeUrl, publishTime,
|
|
@@ -123,8 +113,6 @@ const messageHandler = async (msg) => {
|
|
|
`;
|
|
|
|
|
|
await connection.query(insertSQL, [values]);
|
|
|
-
|
|
|
- // 关闭连接
|
|
|
await connection.end();
|
|
|
|
|
|
console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
|
|
@@ -142,7 +130,6 @@ async function startConsumer() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
CMD.init = function() {
|
|
|
startConsumer();
|
|
|
}
|