123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
- // Use of this source code is governed by a BSD-style
- // license that can be found in the LICENSE file.
- // Source code and contact info at http://github.com/streadway/amqp
- package amqp
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "io"
- "time"
- )
- /*
- Reads a frame from an input stream and returns an interface that can be cast into
- one of the following:
- methodFrame
- PropertiesFrame
- bodyFrame
- heartbeatFrame
- 2.3.5 frame Details
- All frames consist of a header (7 octets), a payload of arbitrary size, and a
- 'frame-end' octet that detects malformed frames:
- 0 1 3 7 size+7 size+8
- +------+---------+-------------+ +------------+ +-----------+
- | type | channel | size | | payload | | frame-end |
- +------+---------+-------------+ +------------+ +-----------+
- octet short long size octets octet
- To read a frame, we:
- 1. Read the header and check the frame type and channel.
- 2. Depending on the frame type, we read the payload and process it.
- 3. Read the frame end octet.
- In realistic implementations where performance is a concern, we would use
- “read-ahead buffering” or
- “gathering reads” to avoid doing three separate system calls to read a frame.
- */
- func (me *reader) ReadFrame() (frame frame, err error) {
- var scratch [7]byte
- if _, err = io.ReadFull(me.r, scratch[:7]); err != nil {
- return
- }
- typ := uint8(scratch[0])
- channel := binary.BigEndian.Uint16(scratch[1:3])
- size := binary.BigEndian.Uint32(scratch[3:7])
- switch typ {
- case frameMethod:
- if frame, err = me.parseMethodFrame(channel, size); err != nil {
- return
- }
- case frameHeader:
- if frame, err = me.parseHeaderFrame(channel, size); err != nil {
- return
- }
- case frameBody:
- if frame, err = me.parseBodyFrame(channel, size); err != nil {
- return nil, err
- }
- case frameHeartbeat:
- if frame, err = me.parseHeartbeatFrame(channel, size); err != nil {
- return
- }
- default:
- return nil, ErrFrame
- }
- if _, err = io.ReadFull(me.r, scratch[:1]); err != nil {
- return nil, err
- }
- if scratch[0] != frameEnd {
- return nil, ErrFrame
- }
- return
- }
- func readShortstr(r io.Reader) (v string, err error) {
- var length uint8
- if err = binary.Read(r, binary.BigEndian, &length); err != nil {
- return
- }
- bytes := make([]byte, length)
- if _, err = io.ReadFull(r, bytes); err != nil {
- return
- }
- return string(bytes), nil
- }
- func readLongstr(r io.Reader) (v string, err error) {
- var length uint32
- if err = binary.Read(r, binary.BigEndian, &length); err != nil {
- return
- }
- bytes := make([]byte, length)
- if _, err = io.ReadFull(r, bytes); err != nil {
- return
- }
- return string(bytes), nil
- }
- func readDecimal(r io.Reader) (v Decimal, err error) {
- if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil {
- return
- }
- if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil {
- return
- }
- return
- }
- func readFloat32(r io.Reader) (v float32, err error) {
- if err = binary.Read(r, binary.BigEndian, &v); err != nil {
- return
- }
- return
- }
- func readFloat64(r io.Reader) (v float64, err error) {
- if err = binary.Read(r, binary.BigEndian, &v); err != nil {
- return
- }
- return
- }
- func readTimestamp(r io.Reader) (v time.Time, err error) {
- var sec int64
- if err = binary.Read(r, binary.BigEndian, &sec); err != nil {
- return
- }
- return time.Unix(sec, 0), nil
- }
- /*
- 'A': []interface{}
- 'D': Decimal
- 'F': Table
- 'I': int32
- 'S': string
- 'T': time.Time
- 'V': nil
- 'b': byte
- 'd': float64
- 'f': float32
- 'l': int64
- 's': int16
- 't': bool
- 'x': []byte
- */
- func readField(r io.Reader) (v interface{}, err error) {
- var typ byte
- if err = binary.Read(r, binary.BigEndian, &typ); err != nil {
- return
- }
- switch typ {
- case 't':
- var value uint8
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return (value != 0), nil
- case 'b':
- var value [1]byte
- if _, err = io.ReadFull(r, value[0:1]); err != nil {
- return
- }
- return value[0], nil
- case 's':
- var value int16
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return value, nil
- case 'I':
- var value int32
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return value, nil
- case 'l':
- var value int64
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return value, nil
- case 'f':
- var value float32
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return value, nil
- case 'd':
- var value float64
- if err = binary.Read(r, binary.BigEndian, &value); err != nil {
- return
- }
- return value, nil
- case 'D':
- return readDecimal(r)
- case 'S':
- return readLongstr(r)
- case 'A':
- return readArray(r)
- case 'T':
- return readTimestamp(r)
- case 'F':
- return readTable(r)
- case 'x':
- var len int32
- if err = binary.Read(r, binary.BigEndian, &len); err != nil {
- return nil, err
- }
- value := make([]byte, len)
- if _, err = io.ReadFull(r, value); err != nil {
- return nil, err
- }
- return value, err
- case 'V':
- return nil, nil
- }
- return nil, ErrSyntax
- }
- /*
- Field tables are long strings that contain packed name-value pairs. The
- name-value pairs are encoded as short string defining the name, and octet
- defining the values type and then the value itself. The valid field types for
- tables are an extension of the native integer, bit, string, and timestamp
- types, and are shown in the grammar. Multi-octet integer fields are always
- held in network byte order.
- */
- func readTable(r io.Reader) (table Table, err error) {
- var nested bytes.Buffer
- var str string
- if str, err = readLongstr(r); err != nil {
- return
- }
- nested.Write([]byte(str))
- table = make(Table)
- for nested.Len() > 0 {
- var key string
- var value interface{}
- if key, err = readShortstr(&nested); err != nil {
- return
- }
- if value, err = readField(&nested); err != nil {
- return
- }
- table[key] = value
- }
- return
- }
- func readArray(r io.Reader) ([]interface{}, error) {
- var size uint32
- var err error
- if err = binary.Read(r, binary.BigEndian, &size); err != nil {
- return nil, err
- }
- lim := &io.LimitedReader{R: r, N: int64(size)}
- arr := make([]interface{}, 0)
- var field interface{}
- for {
- if field, err = readField(lim); err != nil {
- if err == io.EOF {
- break
- }
- return nil, err
- }
- arr = append(arr, field)
- }
- return arr, nil
- }
- // Checks if this bit mask matches the flags bitset
- func hasProperty(mask uint16, prop int) bool {
- return int(mask)&prop > 0
- }
- func (me *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) {
- hf := &headerFrame{
- ChannelId: channel,
- }
- if err = binary.Read(me.r, binary.BigEndian, &hf.ClassId); err != nil {
- return
- }
- if err = binary.Read(me.r, binary.BigEndian, &hf.weight); err != nil {
- return
- }
- if err = binary.Read(me.r, binary.BigEndian, &hf.Size); err != nil {
- return
- }
- var flags uint16
- if err = binary.Read(me.r, binary.BigEndian, &flags); err != nil {
- return
- }
- if hasProperty(flags, flagContentType) {
- if hf.Properties.ContentType, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagContentEncoding) {
- if hf.Properties.ContentEncoding, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagHeaders) {
- if hf.Properties.Headers, err = readTable(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagDeliveryMode) {
- if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil {
- return
- }
- }
- if hasProperty(flags, flagPriority) {
- if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.Priority); err != nil {
- return
- }
- }
- if hasProperty(flags, flagCorrelationId) {
- if hf.Properties.CorrelationId, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagReplyTo) {
- if hf.Properties.ReplyTo, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagExpiration) {
- if hf.Properties.Expiration, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagMessageId) {
- if hf.Properties.MessageId, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagTimestamp) {
- if hf.Properties.Timestamp, err = readTimestamp(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagType) {
- if hf.Properties.Type, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagUserId) {
- if hf.Properties.UserId, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagAppId) {
- if hf.Properties.AppId, err = readShortstr(me.r); err != nil {
- return
- }
- }
- if hasProperty(flags, flagReserved1) {
- if hf.Properties.reserved1, err = readShortstr(me.r); err != nil {
- return
- }
- }
- return hf, nil
- }
- func (me *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) {
- bf := &bodyFrame{
- ChannelId: channel,
- Body: make([]byte, size),
- }
- if _, err = io.ReadFull(me.r, bf.Body); err != nil {
- return nil, err
- }
- return bf, nil
- }
- var errHeartbeatPayload = errors.New("Heartbeats should not have a payload")
- func (me *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) {
- hf := &heartbeatFrame{
- ChannelId: channel,
- }
- if size > 0 {
- return nil, errHeartbeatPayload
- }
- return hf, nil
- }
|