PULL_TIME_DATA_NEW.js 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. const CMD = {}
  2. const rabbitMq = require('../src/mq/rabbit-mq');
  3. const helper = require('../src/helper');
  4. const mysql = require('mysql2/promise');
  5. const crypto = require('crypto');
  6. const config = require('../etc/config.json');
  7. const tools = require('../tools');
  8. const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
  9. // 时间戳转MySQL datetime格式
  10. function timestampToMySQLDatetime(timestamp) {
  11. if (!timestamp) return null;
  12. // 将毫秒时间戳转换为Date对象
  13. const date = new Date(parseInt(timestamp));
  14. // 格式化为MySQL datetime格式
  15. return date.toISOString().slice(0, 19).replace('T', ' ');
  16. }
  17. // 创建表的SQL模板(保持不变)
  18. const createTableSQL = (tableName) => `
  19. CREATE TABLE IF NOT EXISTS ${tableName} (
  20. id BIGINT NOT NULL AUTO_INCREMENT,
  21. materialId VARCHAR(64) DEFAULT NULL,
  22. awemeId VARCHAR(64) DEFAULT NULL,
  23. title VARCHAR(500) DEFAULT NULL,
  24. awemeUrl VARCHAR(500) DEFAULT NULL,
  25. publishTime DATETIME DEFAULT NULL,
  26. hasAnchorInfo TINYINT(1) DEFAULT NULL,
  27. landingUrl VARCHAR(2048) DEFAULT NULL,
  28. createTime DATETIME DEFAULT NULL,
  29. updateTime DATETIME DEFAULT NULL,
  30. created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
  31. likeCount INT(11) DEFAULT NULL,
  32. commentCount INT(11) DEFAULT NULL,
  33. shareCount INT(11) DEFAULT NULL,
  34. favoriteCount INT(11) DEFAULT NULL,
  35. PRIMARY KEY (id),
  36. KEY idx_material_id (awemeId),
  37. KEY idx_aweme_id_like_id (awemeId, likeCount, id)
  38. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci COMMENT='COLUMNAR=1'
  39. `;
  40. // 生成MD5(保持不变)
  41. function generateMD5(text) {
  42. return crypto.createHash('md5').update(text).digest('hex');
  43. }
  44. // 确保表存在(保持不变)
  45. async function ensureTable(connection, tableName) {
  46. try {
  47. await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
  48. await connection.query(`USE ${dbConfig.database}`);
  49. const sql = createTableSQL(tableName);
  50. await connection.query(sql);
  51. } catch (error) {
  52. console.error(`创建表失败: ${error.message}`);
  53. throw error;
  54. }
  55. }
  56. // 处理视频标题(保持不变)
  57. async function processVideoTitles(connection, titles) {
  58. if (!titles || titles.length === 0) return;
  59. const titleData = titles.map(title => ({
  60. title,
  61. md5_tag: generateMD5(title)
  62. }));
  63. try {
  64. const values = titleData.map(data => [data.title, data.md5_tag]);
  65. const insertSQL = `
  66. INSERT IGNORE INTO origin_video_titles
  67. (title, md5_tag)
  68. VALUES ?
  69. `;
  70. const [result] = await connection.query(insertSQL, [values]);
  71. console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
  72. } catch (error) {
  73. console.error('处理视频标题失败:', error);
  74. throw error;
  75. }
  76. }
  77. const insertOriginData = async(connection,values) =>{
  78. const insertSQL = `
  79. INSERT INTO origin_data
  80. (video_id, materialId, title, video_link, publish_time,
  81. is_guajian, guajian_link, createTime, create_time,
  82. like_num, comment_num, shared_num, kepp_num)
  83. VALUES ?
  84. ON DUPLICATE KEY UPDATE
  85. video_id = VALUES(video_id)
  86. `;
  87. await connection.query(insertSQL, [values]);
  88. };
  89. // 处理消息的主函数(修改时间戳处理部分)
  90. const messageHandler = async (msg) => {
  91. let message = JSON.parse(msg)
  92. if(message.cmd!="on_recv_pull_data"){
  93. return
  94. }
  95. let today = message.today
  96. // let date = helper.getLocalDate().replace(/-/g, '');
  97. let date = today.replace(/-/g, '');
  98. let table_name = `pull_data_${date}`;
  99. let connection = null
  100. try {
  101. connection = await mysql.createConnection({
  102. ...dbConfig,
  103. multipleStatements: true
  104. });
  105. await ensureTable(connection, table_name);
  106. const data = message.data;
  107. await processVideoTitles(connection, data.map(item => item.contentTitle));
  108. // 转换时间戳为MySQL datetime格式
  109. const values = data.map(item => [
  110. tools.getOriginVideoId(item.contentUrl),
  111. item.contentId,
  112. item.contentTitle,
  113. item.contentUrl,
  114. new Date(item.publishTime), // 转换时间格式
  115. item.landingUrlType!=0,
  116. decodeURIComponent(item.landingInfo.landingUrl),
  117. new Date(item.updateTime), // 转换时间格式
  118. new Date(item.updateTime), // 转换时间格式
  119. item.likeNum,
  120. item.commentNum,
  121. item.shareNum,
  122. item.collectNum
  123. ]);
  124. const insertSQL = `
  125. INSERT INTO ${table_name}
  126. (materialId, awemeId, title, awemeUrl, publishTime,
  127. hasAnchorInfo, landingUrl, createTime, updateTime,
  128. likeCount, commentCount, shareCount, favoriteCount)
  129. VALUES ?
  130. `;
  131. await insertOriginData(connection,values)
  132. await connection.query(insertSQL, [values]);
  133. console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
  134. } catch (error) {
  135. console.error('处理消息失败:', error);
  136. } finally{
  137. if(connection!=null){
  138. await connection.end();
  139. }
  140. }
  141. };
  142. async function startConsumer() {
  143. try {
  144. await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system_new", "on_recv_pull_data");
  145. } catch (error) {
  146. console.error('启动消费者失败:', error);
  147. }
  148. }
  149. CMD.init = function() {
  150. startConsumer();
  151. }
  152. CMD.init()
  153. module.exports = CMD