receive_stream.js 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. const amqp = require('amqplib');
  2. (async () => {
  3. try {
  4. const connection = await amqp.connect('amqp://localhost');
  5. process.once('SIGINT', connection.close);
  6. const channel = await connection.createChannel();
  7. const queue = 'my_first_stream';
  8. // Define the queue stream
  9. // Mandatory: exclusive: false, durable: true autoDelete: false
  10. await channel.assertQueue(queue, {
  11. exclusive: false,
  12. durable: true,
  13. autoDelete: false,
  14. arguments: {
  15. 'x-queue-type': 'stream', // Mandatory to define stream queue
  16. 'x-max-length-bytes': 2_000_000_000 // Set the queue retention to 2GB else the stream doesn't have any limit
  17. }
  18. });
  19. channel.qos(100); // This is mandatory
  20. channel.consume(queue, (msg) => {
  21. console.log(" [x] Received '%s'", msg.content.toString());
  22. channel.ack(msg); // Mandatory
  23. }, {
  24. noAck: false,
  25. arguments: {
  26. /*
  27. Here you can specify the offset: : first, last, next, offset, timestamp and interval, i.e.
  28. 'x-stream-offset': 'first'
  29. 'x-stream-offset': 'last'
  30. 'x-stream-offset': 'next'
  31. 'x-stream-offset': 5
  32. 'x-stream-offset': { '!': 'timestamp', value: 1686519750 }
  33. 'x-stream-offset': '1h'
  34. The timestamp must be the desired number of seconds since 00:00:00 UTC, 1970-01-01
  35. The interval units can be Y, M, D, h, m, s
  36. */
  37. 'x-stream-offset': 'first'
  38. }
  39. });
  40. console.log(' [*] Waiting for messages. To exit press CTRL+C');
  41. }
  42. // Catch and display any errors in the console
  43. catch(e) {
  44. console.log(e)
  45. }
  46. })();