sender.js 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. 'use strict'
  2. const { WebsocketFrameSend } = require('./frame')
  3. const { opcodes, sendHints } = require('./constants')
  4. const FixedQueue = require('../../dispatcher/fixed-queue')
  5. /**
  6. * @typedef {object} SendQueueNode
  7. * @property {Promise<void> | null} promise
  8. * @property {((...args: any[]) => any)} callback
  9. * @property {Buffer | null} frame
  10. */
  11. class SendQueue {
  12. /**
  13. * @type {FixedQueue}
  14. */
  15. #queue = new FixedQueue()
  16. /**
  17. * @type {boolean}
  18. */
  19. #running = false
  20. /** @type {import('node:net').Socket} */
  21. #socket
  22. constructor (socket) {
  23. this.#socket = socket
  24. }
  25. add (item, cb, hint) {
  26. if (hint !== sendHints.blob) {
  27. if (!this.#running) {
  28. // TODO(@tsctx): support fast-path for string on running
  29. if (hint === sendHints.text) {
  30. // special fast-path for string
  31. const { 0: head, 1: body } = WebsocketFrameSend.createFastTextFrame(item)
  32. this.#socket.cork()
  33. this.#socket.write(head)
  34. this.#socket.write(body, cb)
  35. this.#socket.uncork()
  36. } else {
  37. // direct writing
  38. this.#socket.write(createFrame(item, hint), cb)
  39. }
  40. } else {
  41. /** @type {SendQueueNode} */
  42. const node = {
  43. promise: null,
  44. callback: cb,
  45. frame: createFrame(item, hint)
  46. }
  47. this.#queue.push(node)
  48. }
  49. return
  50. }
  51. /** @type {SendQueueNode} */
  52. const node = {
  53. promise: item.arrayBuffer().then((ab) => {
  54. node.promise = null
  55. node.frame = createFrame(ab, hint)
  56. }),
  57. callback: cb,
  58. frame: null
  59. }
  60. this.#queue.push(node)
  61. if (!this.#running) {
  62. this.#run()
  63. }
  64. }
  65. async #run () {
  66. this.#running = true
  67. const queue = this.#queue
  68. while (!queue.isEmpty()) {
  69. const node = queue.shift()
  70. // wait pending promise
  71. if (node.promise !== null) {
  72. await node.promise
  73. }
  74. // write
  75. this.#socket.write(node.frame, node.callback)
  76. // cleanup
  77. node.callback = node.frame = null
  78. }
  79. this.#running = false
  80. }
  81. }
  82. function createFrame (data, hint) {
  83. return new WebsocketFrameSend(toBuffer(data, hint)).createFrame(hint === sendHints.text ? opcodes.TEXT : opcodes.BINARY)
  84. }
  85. function toBuffer (data, hint) {
  86. switch (hint) {
  87. case sendHints.text:
  88. case sendHints.typedArray:
  89. return new Uint8Array(data.buffer, data.byteOffset, data.byteLength)
  90. case sendHints.arrayBuffer:
  91. case sendHints.blob:
  92. return new Uint8Array(data)
  93. }
  94. }
  95. module.exports = { SendQueue }