123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- // The river sweeps through
- // Silt and twigs, gravel and leaves
- // Driving the wheel on
- 'use strict';
- var defs = require('./defs');
- var constants = defs.constants;
- var decode = defs.decode;
- var Bits = require('@acuminous/bitsyntax');
- module.exports.PROTOCOL_HEADER = "AMQP" + String.fromCharCode(0, 0, 9, 1);
- /*
- Frame format:
- 0 1 3 7 size+7 size+8
- +------+---------+-------------+ +------------+ +-----------+
- | type | channel | size | | payload | | frame-end |
- +------+---------+-------------+ +------------+ +-----------+
- octet short long size octets octet
- In general I want to know those first three things straight away, so I
- can discard frames early.
- */
- // framing constants
- var FRAME_METHOD = constants.FRAME_METHOD,
- FRAME_HEARTBEAT = constants.FRAME_HEARTBEAT,
- FRAME_HEADER = constants.FRAME_HEADER,
- FRAME_BODY = constants.FRAME_BODY,
- FRAME_END = constants.FRAME_END;
- var bodyCons =
- Bits.builder(FRAME_BODY,
- 'channel:16, size:32, payload:size/binary',
- FRAME_END);
- // %%% TESTME possibly better to cons the first bit and write the
- // second directly, in the absence of IO lists
- module.exports.makeBodyFrame = function(channel, payload) {
- return bodyCons({channel: channel, size: payload.length, payload: payload});
- };
- var frameHeaderPattern = Bits.matcher('type:8', 'channel:16',
- 'size:32', 'rest/binary');
- function parseFrame(bin, max) {
- var fh = frameHeaderPattern(bin);
- if (fh) {
- var size = fh.size, rest = fh.rest;
- if (size > max) {
- throw new Error('Frame size exceeds frame max');
- }
- else if (rest.length > size) {
- if (rest[size] !== FRAME_END)
- throw new Error('Invalid frame');
- return {
- type: fh.type,
- channel: fh.channel,
- size: size,
- payload: rest.slice(0, size),
- rest: rest.slice(size + 1)
- };
- }
- }
- return false;
- }
- module.exports.parseFrame = parseFrame;
- var headerPattern = Bits.matcher('class:16',
- '_weight:16',
- 'size:64',
- 'flagsAndfields/binary');
- var methodPattern = Bits.matcher('id:32, args/binary');
- var HEARTBEAT = {channel: 0};
- module.exports.decodeFrame = function(frame) {
- var payload = frame.payload;
- switch (frame.type) {
- case FRAME_METHOD:
- var idAndArgs = methodPattern(payload);
- var id = idAndArgs.id;
- var fields = decode(id, idAndArgs.args);
- return {id: id, channel: frame.channel, fields: fields};
- case FRAME_HEADER:
- var parts = headerPattern(payload);
- var id = parts['class'];
- var fields = decode(id, parts.flagsAndfields);
- return {id: id, channel: frame.channel,
- size: parts.size, fields: fields};
- case FRAME_BODY:
- return {channel: frame.channel, content: frame.payload};
- case FRAME_HEARTBEAT:
- return HEARTBEAT;
- default:
- throw new Error('Unknown frame type ' + frame.type);
- }
- }
- // encoded heartbeat
- module.exports.HEARTBEAT_BUF = Buffer.from([constants.FRAME_HEARTBEAT,
- 0, 0, 0, 0, // size = 0
- 0, 0, // channel = 0
- constants.FRAME_END]);
- module.exports.HEARTBEAT = HEARTBEAT;
|