read.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. "errors"
  10. "io"
  11. "time"
  12. )
  13. /*
  14. Reads a frame from an input stream and returns an interface that can be cast into
  15. one of the following:
  16. methodFrame
  17. PropertiesFrame
  18. bodyFrame
  19. heartbeatFrame
  20. 2.3.5 frame Details
  21. All frames consist of a header (7 octets), a payload of arbitrary size, and a
  22. 'frame-end' octet that detects malformed frames:
  23. 0 1 3 7 size+7 size+8
  24. +------+---------+-------------+ +------------+ +-----------+
  25. | type | channel | size | | payload | | frame-end |
  26. +------+---------+-------------+ +------------+ +-----------+
  27. octet short long size octets octet
  28. To read a frame, we:
  29. 1. Read the header and check the frame type and channel.
  30. 2. Depending on the frame type, we read the payload and process it.
  31. 3. Read the frame end octet.
  32. In realistic implementations where performance is a concern, we would use
  33. “read-ahead buffering” or
  34. “gathering reads” to avoid doing three separate system calls to read a frame.
  35. */
  36. func (me *reader) ReadFrame() (frame frame, err error) {
  37. var scratch [7]byte
  38. if _, err = io.ReadFull(me.r, scratch[:7]); err != nil {
  39. return
  40. }
  41. typ := uint8(scratch[0])
  42. channel := binary.BigEndian.Uint16(scratch[1:3])
  43. size := binary.BigEndian.Uint32(scratch[3:7])
  44. switch typ {
  45. case frameMethod:
  46. if frame, err = me.parseMethodFrame(channel, size); err != nil {
  47. return
  48. }
  49. case frameHeader:
  50. if frame, err = me.parseHeaderFrame(channel, size); err != nil {
  51. return
  52. }
  53. case frameBody:
  54. if frame, err = me.parseBodyFrame(channel, size); err != nil {
  55. return nil, err
  56. }
  57. case frameHeartbeat:
  58. if frame, err = me.parseHeartbeatFrame(channel, size); err != nil {
  59. return
  60. }
  61. default:
  62. return nil, ErrFrame
  63. }
  64. if _, err = io.ReadFull(me.r, scratch[:1]); err != nil {
  65. return nil, err
  66. }
  67. if scratch[0] != frameEnd {
  68. return nil, ErrFrame
  69. }
  70. return
  71. }
  72. func readShortstr(r io.Reader) (v string, err error) {
  73. var length uint8
  74. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  75. return
  76. }
  77. bytes := make([]byte, length)
  78. if _, err = io.ReadFull(r, bytes); err != nil {
  79. return
  80. }
  81. return string(bytes), nil
  82. }
  83. func readLongstr(r io.Reader) (v string, err error) {
  84. var length uint32
  85. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  86. return
  87. }
  88. bytes := make([]byte, length)
  89. if _, err = io.ReadFull(r, bytes); err != nil {
  90. return
  91. }
  92. return string(bytes), nil
  93. }
  94. func readDecimal(r io.Reader) (v Decimal, err error) {
  95. if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil {
  96. return
  97. }
  98. if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil {
  99. return
  100. }
  101. return
  102. }
  103. func readFloat32(r io.Reader) (v float32, err error) {
  104. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  105. return
  106. }
  107. return
  108. }
  109. func readFloat64(r io.Reader) (v float64, err error) {
  110. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  111. return
  112. }
  113. return
  114. }
  115. func readTimestamp(r io.Reader) (v time.Time, err error) {
  116. var sec int64
  117. if err = binary.Read(r, binary.BigEndian, &sec); err != nil {
  118. return
  119. }
  120. return time.Unix(sec, 0), nil
  121. }
  122. /*
  123. 'A': []interface{}
  124. 'D': Decimal
  125. 'F': Table
  126. 'I': int32
  127. 'S': string
  128. 'T': time.Time
  129. 'V': nil
  130. 'b': byte
  131. 'd': float64
  132. 'f': float32
  133. 'l': int64
  134. 's': int16
  135. 't': bool
  136. 'x': []byte
  137. */
  138. func readField(r io.Reader) (v interface{}, err error) {
  139. var typ byte
  140. if err = binary.Read(r, binary.BigEndian, &typ); err != nil {
  141. return
  142. }
  143. switch typ {
  144. case 't':
  145. var value uint8
  146. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  147. return
  148. }
  149. return (value != 0), nil
  150. case 'b':
  151. var value [1]byte
  152. if _, err = io.ReadFull(r, value[0:1]); err != nil {
  153. return
  154. }
  155. return value[0], nil
  156. case 's':
  157. var value int16
  158. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  159. return
  160. }
  161. return value, nil
  162. case 'I':
  163. var value int32
  164. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  165. return
  166. }
  167. return value, nil
  168. case 'l':
  169. var value int64
  170. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  171. return
  172. }
  173. return value, nil
  174. case 'f':
  175. var value float32
  176. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  177. return
  178. }
  179. return value, nil
  180. case 'd':
  181. var value float64
  182. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  183. return
  184. }
  185. return value, nil
  186. case 'D':
  187. return readDecimal(r)
  188. case 'S':
  189. return readLongstr(r)
  190. case 'A':
  191. return readArray(r)
  192. case 'T':
  193. return readTimestamp(r)
  194. case 'F':
  195. return readTable(r)
  196. case 'x':
  197. var len int32
  198. if err = binary.Read(r, binary.BigEndian, &len); err != nil {
  199. return nil, err
  200. }
  201. value := make([]byte, len)
  202. if _, err = io.ReadFull(r, value); err != nil {
  203. return nil, err
  204. }
  205. return value, err
  206. case 'V':
  207. return nil, nil
  208. }
  209. return nil, ErrSyntax
  210. }
  211. /*
  212. Field tables are long strings that contain packed name-value pairs. The
  213. name-value pairs are encoded as short string defining the name, and octet
  214. defining the values type and then the value itself. The valid field types for
  215. tables are an extension of the native integer, bit, string, and timestamp
  216. types, and are shown in the grammar. Multi-octet integer fields are always
  217. held in network byte order.
  218. */
  219. func readTable(r io.Reader) (table Table, err error) {
  220. var nested bytes.Buffer
  221. var str string
  222. if str, err = readLongstr(r); err != nil {
  223. return
  224. }
  225. nested.Write([]byte(str))
  226. table = make(Table)
  227. for nested.Len() > 0 {
  228. var key string
  229. var value interface{}
  230. if key, err = readShortstr(&nested); err != nil {
  231. return
  232. }
  233. if value, err = readField(&nested); err != nil {
  234. return
  235. }
  236. table[key] = value
  237. }
  238. return
  239. }
  240. func readArray(r io.Reader) ([]interface{}, error) {
  241. var size uint32
  242. var err error
  243. if err = binary.Read(r, binary.BigEndian, &size); err != nil {
  244. return nil, err
  245. }
  246. lim := &io.LimitedReader{R: r, N: int64(size)}
  247. arr := make([]interface{}, 0)
  248. var field interface{}
  249. for {
  250. if field, err = readField(lim); err != nil {
  251. if err == io.EOF {
  252. break
  253. }
  254. return nil, err
  255. }
  256. arr = append(arr, field)
  257. }
  258. return arr, nil
  259. }
  260. // Checks if this bit mask matches the flags bitset
  261. func hasProperty(mask uint16, prop int) bool {
  262. return int(mask)&prop > 0
  263. }
  264. func (me *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) {
  265. hf := &headerFrame{
  266. ChannelId: channel,
  267. }
  268. if err = binary.Read(me.r, binary.BigEndian, &hf.ClassId); err != nil {
  269. return
  270. }
  271. if err = binary.Read(me.r, binary.BigEndian, &hf.weight); err != nil {
  272. return
  273. }
  274. if err = binary.Read(me.r, binary.BigEndian, &hf.Size); err != nil {
  275. return
  276. }
  277. var flags uint16
  278. if err = binary.Read(me.r, binary.BigEndian, &flags); err != nil {
  279. return
  280. }
  281. if hasProperty(flags, flagContentType) {
  282. if hf.Properties.ContentType, err = readShortstr(me.r); err != nil {
  283. return
  284. }
  285. }
  286. if hasProperty(flags, flagContentEncoding) {
  287. if hf.Properties.ContentEncoding, err = readShortstr(me.r); err != nil {
  288. return
  289. }
  290. }
  291. if hasProperty(flags, flagHeaders) {
  292. if hf.Properties.Headers, err = readTable(me.r); err != nil {
  293. return
  294. }
  295. }
  296. if hasProperty(flags, flagDeliveryMode) {
  297. if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil {
  298. return
  299. }
  300. }
  301. if hasProperty(flags, flagPriority) {
  302. if err = binary.Read(me.r, binary.BigEndian, &hf.Properties.Priority); err != nil {
  303. return
  304. }
  305. }
  306. if hasProperty(flags, flagCorrelationId) {
  307. if hf.Properties.CorrelationId, err = readShortstr(me.r); err != nil {
  308. return
  309. }
  310. }
  311. if hasProperty(flags, flagReplyTo) {
  312. if hf.Properties.ReplyTo, err = readShortstr(me.r); err != nil {
  313. return
  314. }
  315. }
  316. if hasProperty(flags, flagExpiration) {
  317. if hf.Properties.Expiration, err = readShortstr(me.r); err != nil {
  318. return
  319. }
  320. }
  321. if hasProperty(flags, flagMessageId) {
  322. if hf.Properties.MessageId, err = readShortstr(me.r); err != nil {
  323. return
  324. }
  325. }
  326. if hasProperty(flags, flagTimestamp) {
  327. if hf.Properties.Timestamp, err = readTimestamp(me.r); err != nil {
  328. return
  329. }
  330. }
  331. if hasProperty(flags, flagType) {
  332. if hf.Properties.Type, err = readShortstr(me.r); err != nil {
  333. return
  334. }
  335. }
  336. if hasProperty(flags, flagUserId) {
  337. if hf.Properties.UserId, err = readShortstr(me.r); err != nil {
  338. return
  339. }
  340. }
  341. if hasProperty(flags, flagAppId) {
  342. if hf.Properties.AppId, err = readShortstr(me.r); err != nil {
  343. return
  344. }
  345. }
  346. if hasProperty(flags, flagReserved1) {
  347. if hf.Properties.reserved1, err = readShortstr(me.r); err != nil {
  348. return
  349. }
  350. }
  351. return hf, nil
  352. }
  353. func (me *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) {
  354. bf := &bodyFrame{
  355. ChannelId: channel,
  356. Body: make([]byte, size),
  357. }
  358. if _, err = io.ReadFull(me.r, bf.Body); err != nil {
  359. return nil, err
  360. }
  361. return bf, nil
  362. }
  363. var errHeartbeatPayload = errors.New("Heartbeats should not have a payload")
  364. func (me *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) {
  365. hf := &heartbeatFrame{
  366. ChannelId: channel,
  367. }
  368. if size > 0 {
  369. return nil, errHeartbeatPayload
  370. }
  371. return hf, nil
  372. }