frame.js 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. // The river sweeps through
  2. // Silt and twigs, gravel and leaves
  3. // Driving the wheel on
  4. 'use strict';
  5. var defs = require('./defs');
  6. var constants = defs.constants;
  7. var decode = defs.decode;
  8. var Bits = require('@acuminous/bitsyntax');
  9. module.exports.PROTOCOL_HEADER = "AMQP" + String.fromCharCode(0, 0, 9, 1);
  10. /*
  11. Frame format:
  12. 0 1 3 7 size+7 size+8
  13. +------+---------+-------------+ +------------+ +-----------+
  14. | type | channel | size | | payload | | frame-end |
  15. +------+---------+-------------+ +------------+ +-----------+
  16. octet short long size octets octet
  17. In general I want to know those first three things straight away, so I
  18. can discard frames early.
  19. */
  20. // framing constants
  21. var FRAME_METHOD = constants.FRAME_METHOD,
  22. FRAME_HEARTBEAT = constants.FRAME_HEARTBEAT,
  23. FRAME_HEADER = constants.FRAME_HEADER,
  24. FRAME_BODY = constants.FRAME_BODY,
  25. FRAME_END = constants.FRAME_END;
  26. var bodyCons =
  27. Bits.builder(FRAME_BODY,
  28. 'channel:16, size:32, payload:size/binary',
  29. FRAME_END);
  30. // %%% TESTME possibly better to cons the first bit and write the
  31. // second directly, in the absence of IO lists
  32. module.exports.makeBodyFrame = function(channel, payload) {
  33. return bodyCons({channel: channel, size: payload.length, payload: payload});
  34. };
  35. var frameHeaderPattern = Bits.matcher('type:8', 'channel:16',
  36. 'size:32', 'rest/binary');
  37. function parseFrame(bin, max) {
  38. var fh = frameHeaderPattern(bin);
  39. if (fh) {
  40. var size = fh.size, rest = fh.rest;
  41. if (size > max) {
  42. throw new Error('Frame size exceeds frame max');
  43. }
  44. else if (rest.length > size) {
  45. if (rest[size] !== FRAME_END)
  46. throw new Error('Invalid frame');
  47. return {
  48. type: fh.type,
  49. channel: fh.channel,
  50. size: size,
  51. payload: rest.slice(0, size),
  52. rest: rest.slice(size + 1)
  53. };
  54. }
  55. }
  56. return false;
  57. }
  58. module.exports.parseFrame = parseFrame;
  59. var headerPattern = Bits.matcher('class:16',
  60. '_weight:16',
  61. 'size:64',
  62. 'flagsAndfields/binary');
  63. var methodPattern = Bits.matcher('id:32, args/binary');
  64. var HEARTBEAT = {channel: 0};
  65. module.exports.decodeFrame = function(frame) {
  66. var payload = frame.payload;
  67. switch (frame.type) {
  68. case FRAME_METHOD:
  69. var idAndArgs = methodPattern(payload);
  70. var id = idAndArgs.id;
  71. var fields = decode(id, idAndArgs.args);
  72. return {id: id, channel: frame.channel, fields: fields};
  73. case FRAME_HEADER:
  74. var parts = headerPattern(payload);
  75. var id = parts['class'];
  76. var fields = decode(id, parts.flagsAndfields);
  77. return {id: id, channel: frame.channel,
  78. size: parts.size, fields: fields};
  79. case FRAME_BODY:
  80. return {channel: frame.channel, content: frame.payload};
  81. case FRAME_HEARTBEAT:
  82. return HEARTBEAT;
  83. default:
  84. throw new Error('Unknown frame type ' + frame.type);
  85. }
  86. }
  87. // encoded heartbeat
  88. module.exports.HEARTBEAT_BUF = Buffer.from([constants.FRAME_HEARTBEAT,
  89. 0, 0, 0, 0, // size = 0
  90. 0, 0, // channel = 0
  91. constants.FRAME_END]);
  92. module.exports.HEARTBEAT = HEARTBEAT;