PULL_TIME_DATA.js 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. const CMD = {}
  2. const rabbitMq = require('../src/mq/rabbit-mq');
  3. const helper = require('../src/helper');
  4. const mysql = require('mysql2/promise');
  5. const crypto = require('crypto');
  6. const config = require('../etc/config.json')
  7. const dbConfig = config.isDebug?config.debug_mysql:config.release_mysql
  8. // 时间戳转MySQL datetime格式
  9. function timestampToMySQLDatetime(timestamp) {
  10. if (!timestamp) return null;
  11. // 将毫秒时间戳转换为Date对象
  12. const date = new Date(parseInt(timestamp));
  13. // 格式化为MySQL datetime格式
  14. return date.toISOString().slice(0, 19).replace('T', ' ');
  15. }
  16. // 创建表的SQL模板(保持不变)
  17. const createTableSQL = (tableName) => `
  18. CREATE TABLE IF NOT EXISTS ${tableName} (
  19. id BIGINT PRIMARY KEY AUTO_INCREMENT,
  20. publishTime datetime DEFAULT NULL,
  21. landingUrl VARCHAR(2048),
  22. videoId VARCHAR(64),
  23. status int,
  24. miniseriesId VARCHAR(64),
  25. uniqueId VARCHAR(64) UNIQUE,
  26. materialId VARCHAR(64),
  27. miniseriesName VARCHAR(64),
  28. title VARCHAR(500),
  29. width int,
  30. height int,
  31. duration int,
  32. coverUrl VARCHAR(255),
  33. videoUrl VARCHAR(255),
  34. likeCount int,
  35. commentCount int,
  36. shareCount int,
  37. favoriteCount int,
  38. heat VARCHAR(64),
  39. materialType VARCHAR(64),
  40. videoMd5 VARCHAR(64),
  41. createTime datetime DEFAULT NULL,
  42. updateTime datetime DEFAULT NULL,
  43. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  44. INDEX idx_md5_id_like_id_status (videoMd5, likeCount, id,status)
  45. )
  46. `;
  47. // 生成MD5(保持不变)
  48. function generateMD5(text) {
  49. return crypto.createHash('md5').update(text).digest('hex');
  50. }
  51. // 确保表存在(保持不变)
  52. async function ensureTable(connection, tableName) {
  53. try {
  54. await connection.query(`CREATE DATABASE IF NOT EXISTS ${dbConfig.database}`);
  55. await connection.query(`USE ${dbConfig.database}`);
  56. const sql = createTableSQL(tableName);
  57. await connection.query(sql);
  58. } catch (error) {
  59. console.error(`创建表失败: ${error.message}`);
  60. throw error;
  61. }
  62. }
  63. // 处理视频标题(保持不变)
  64. async function processVideoTitles(connection, titles) {
  65. if (!titles || titles.length === 0) return;
  66. const titleData = titles.map(title => ({
  67. title,
  68. md5_tag: generateMD5(title)
  69. }));
  70. try {
  71. const values = titleData.map(data => [data.title, data.md5_tag]);
  72. const insertSQL = `
  73. INSERT IGNORE INTO origin_video_titles
  74. (title, md5_tag)
  75. VALUES ?
  76. `;
  77. const [result] = await connection.query(insertSQL, [values]);
  78. console.log(`处理了 ${titles.length} 个标题,新增 ${result.affectedRows} 条记录`);
  79. } catch (error) {
  80. console.error('处理视频标题失败:', error);
  81. throw error;
  82. }
  83. }
  84. // 处理消息的主函数(修改时间戳处理部分)
  85. const messageHandler = async (msg) => {
  86. let message = JSON.parse(msg)
  87. if(message.cmd!="on_recv_pull_data"){
  88. return
  89. }
  90. let today = message.today
  91. // let date = helper.getLocalDate().replace(/-/g, '');
  92. let date = today.replace(/-/g, '');
  93. let table_name = `pull_data_${date}`;
  94. try {
  95. const connection = await mysql.createConnection({
  96. ...dbConfig,
  97. multipleStatements: true
  98. });
  99. await ensureTable(connection, table_name);
  100. const data = message.data;
  101. // await processVideoTitles(connection, data.map(item => item.title));
  102. console.log("data:",data)
  103. // 转换时间戳为MySQL datetime格式
  104. const values = data.map(item => [
  105. item.miniseriesId,
  106. item.uniqueId,
  107. item.materialId,
  108. item.miniseriesName,
  109. item.title,
  110. item.width,
  111. item.height,
  112. item.duration,
  113. item.coverUrl,
  114. item.videoUrl,
  115. item.likeCount.count,
  116. item.commentCount.count,
  117. item.shareCount.count,
  118. item.favoriteCount.count,
  119. item.heat.countStr,
  120. item.materialType,
  121. item.videoMd5,
  122. new Date(item.createTime), // 转换时间格式
  123. new Date(item.updateTime), // 转换时间格式
  124. 0
  125. ]);
  126. const insertSQL = `
  127. INSERT INTO ${table_name}
  128. (miniseriesId, uniqueId, materialId, miniseriesName, title,
  129. width, height, duration, coverUrl,
  130. videoUrl, likeCount, commentCount, shareCount,
  131. favoriteCount, heat, materialType, videoMd5,
  132. createTime, updateTime,status)
  133. VALUES ?
  134. ON DUPLICATE KEY UPDATE
  135. miniseriesId = VALUES(miniseriesId),
  136. materialId = VALUES(materialId),
  137. miniseriesName = VALUES(miniseriesName),
  138. title = VALUES(title),
  139. width = VALUES(width),
  140. height = VALUES(height),
  141. duration = VALUES(duration),
  142. coverUrl = VALUES(coverUrl),
  143. videoUrl = VALUES(videoUrl),
  144. likeCount = VALUES(likeCount),
  145. commentCount = VALUES(commentCount),
  146. shareCount = VALUES(shareCount),
  147. favoriteCount = VALUES(favoriteCount),
  148. heat = VALUES(heat),
  149. materialType = VALUES(materialType),
  150. videoMd5 = VALUES(videoMd5),
  151. createTime = VALUES(createTime),
  152. updateTime = VALUES(updateTime)
  153. `;
  154. await connection.query(insertSQL, [values]);
  155. await connection.end();
  156. console.log(`成功插入 ${values.length} 条记录到表 ${table_name}`);
  157. } catch (error) {
  158. console.error('处理消息失败:', error);
  159. throw error;
  160. }
  161. };
  162. async function startConsumer() {
  163. try {
  164. await rabbitMq.consumerDirectMsg(messageHandler, "exchange_pull_data_system", "on_recv_pull_data");
  165. } catch (error) {
  166. console.error('启动消费者失败:', error);
  167. }
  168. }
  169. CMD.init = function() {
  170. startConsumer();
  171. }
  172. CMD.init()
  173. module.exports = CMD