send_stream.js 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. const amqp = require('amqplib');
  2. (async () => {
  3. try {
  4. const connection = await amqp.connect('amqp://localhost');
  5. process.once('SIGINT', connection.close);
  6. const channel = await connection.createChannel();
  7. const queue = 'my_first_stream';
  8. const msg = `Hello World! ${Date.now()}`;
  9. // Define the queue stream
  10. // Mandatory: exclusive: false, durable: true autoDelete: false
  11. await channel.assertQueue(queue, {
  12. exclusive: false,
  13. durable: true,
  14. autoDelete: false,
  15. arguments: {
  16. 'x-queue-type': 'stream', // Mandatory to define stream queue
  17. 'x-max-length-bytes': 2_000_000_000 // Set the queue retention to 2GB else the stream doesn't have any limit
  18. }
  19. });
  20. // Send the message to the stream queue
  21. await channel.sendToQueue(queue, Buffer.from(msg));
  22. console.log(" [x] Sent '%s'", msg);
  23. await channel.close();
  24. // Close connection
  25. connection.close();
  26. }
  27. // Catch and display any errors in the console
  28. catch(e) {
  29. console.log(e)
  30. }
  31. })();