PULL_TIME_DATA.js 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. const CMD = {}
  2. const rabbitMq = require('../src/mq/rabbit-mq');
  3. const helper = require('../src/helper');
  4. const mysql = require('mysql2/promise');
  5. const crypto = require('crypto');
  6. const config = require('../etc/config.json')
  7. const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
  8. // 创建表的SQL模板
  9. const createTableSQL = (tableName) => `
  10. CREATE TABLE IF NOT EXISTS ${tableName} (
  11. id BIGINT PRIMARY KEY AUTO_INCREMENT,
  12. material_id VARCHAR(64),
  13. aweme_id VARCHAR(255),
  14. title VARCHAR(500),
  15. aweme_url VARCHAR(500),
  16. publish_time BIGINT,
  17. has_anchor_info BOOLEAN,
  18. landing_url VARCHAR(1024),
  19. create_time BIGINT,
  20. update_time BIGINT,
  21. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  22. )
  23. `;
  24. // 生成MD5
  25. function generateMD5(text) {
  26. return crypto.createHash('md5').update(text).digest('hex');
  27. }
  28. // 确保表存在
  29. async function ensureTable(connection, tableName) {
  30. try {
  31. // 首先确保数据库存在
  32. await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
  33. await connection.query(`USE ${dbConfig.database}`);
  34. // 创建当天的表
  35. const sql = createTableSQL(tableName);
  36. await connection.query(sql);
  37. } catch (error) {
  38. console.error(`创建表失败: ${error.message}`);
  39. throw error;
  40. }
  41. }
  42. // 优化后的视频标题处理函数
  43. async function processVideoTitles(connection, titles) {
  44. if (!titles || titles.length === 0) return;
  45. // 批量生成MD5
  46. const titleData = titles.map(title => ({
  47. title,
  48. md5_tag: generateMD5(title)
  49. }));
  50. try {
  51. // 准备批量插入数据
  52. const values = titleData.map(data => [data.title, data.md5_tag]);
  53. // 使用INSERT IGNORE语法,忽略重复的md5_tag
  54. const insertSQL = `
  55. INSERT IGNORE INTO origin_video_titles
  56. (title, md5_tag)
  57. VALUES ?
  58. `;
  59. // 执行批量插入
  60. const [result] = await connection.query(insertSQL, [values]);
  61. console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
  62. } catch (error) {
  63. console.error('处理视频标题失败:', error);
  64. throw error;
  65. }
  66. }
  67. // 处理消息的主函数
  68. const messageHandler = async (msg) => {
  69. let message = JSON.parse(msg)
  70. if(message.cmd!="on_recv_pull_data"){
  71. return
  72. }
  73. let date = helper.getLocalDate();
  74. let table_name = `pull_data_${date}`;
  75. try {
  76. // 创建数据库连接
  77. const connection = await mysql.createConnection({
  78. ...dbConfig,
  79. multipleStatements: true // 启用多语句查询
  80. });
  81. // 确保表存在
  82. await ensureTable(connection, table_name);
  83. // 解析消息数据
  84. const data = message.data;
  85. // 处理视频标题(批量处理)
  86. await processVideoTitles(connection, data.map(item => item.title));
  87. // 准备批量插入的数据
  88. const values = data.map(item => [
  89. item.materialId,
  90. item.awemeId,
  91. item.title,
  92. item.awemeUrl,
  93. item.publishTime,
  94. item.hasAnchorInfo,
  95. item.landingUrl,
  96. item.createTime,
  97. item.updateTime
  98. ]);
  99. // 执行批量插入
  100. const insertSQL = `
  101. INSERT INTO ${table_name}
  102. (material_id, aweme_id, title, aweme_url, publish_time,
  103. has_anchor_info, landing_url, create_time, update_time)
  104. VALUES ?
  105. `;
  106. await connection.query(insertSQL, [values]);
  107. // 关闭连接
  108. await connection.end();
  109. console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
  110. } catch (error) {
  111. console.error('处理消息失败:', error);
  112. throw error;
  113. }
  114. };
  115. async function startConsumer() {
  116. try {
  117. await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system", "on_recv_pull_data");
  118. } catch (error) {
  119. console.error('启动消费者失败:', error);
  120. }
  121. }
  122. CMD.init = function() {
  123. startConsumer();
  124. }
  125. module.exports = CMD