read.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "bytes"
  8. "encoding/binary"
  9. "errors"
  10. "io"
  11. "time"
  12. )
  13. /*
  14. Reads a frame from an input stream and returns an interface that can be cast into
  15. one of the following:
  16. methodFrame
  17. PropertiesFrame
  18. bodyFrame
  19. heartbeatFrame
  20. 2.3.5 frame Details
  21. All frames consist of a header (7 octets), a payload of arbitrary size, and a
  22. 'frame-end' octet that detects malformed frames:
  23. 0 1 3 7 size+7 size+8
  24. +------+---------+-------------+ +------------+ +-----------+
  25. | type | channel | size | | payload | | frame-end |
  26. +------+---------+-------------+ +------------+ +-----------+
  27. octet short long size octets octet
  28. To read a frame, we:
  29. 1. Read the header and check the frame type and channel.
  30. 2. Depending on the frame type, we read the payload and process it.
  31. 3. Read the frame end octet.
  32. In realistic implementations where performance is a concern, we would use
  33. “read-ahead buffering” or
  34. “gathering reads” to avoid doing three separate system calls to read a frame.
  35. */
  36. func (r *reader) ReadFrame() (frame frame, err error) {
  37. var scratch [7]byte
  38. if _, err = io.ReadFull(r.r, scratch[:7]); err != nil {
  39. return
  40. }
  41. typ := uint8(scratch[0])
  42. channel := binary.BigEndian.Uint16(scratch[1:3])
  43. size := binary.BigEndian.Uint32(scratch[3:7])
  44. switch typ {
  45. case frameMethod:
  46. if frame, err = r.parseMethodFrame(channel, size); err != nil {
  47. return
  48. }
  49. case frameHeader:
  50. if frame, err = r.parseHeaderFrame(channel, size); err != nil {
  51. return
  52. }
  53. case frameBody:
  54. if frame, err = r.parseBodyFrame(channel, size); err != nil {
  55. return nil, err
  56. }
  57. case frameHeartbeat:
  58. if frame, err = r.parseHeartbeatFrame(channel, size); err != nil {
  59. return
  60. }
  61. default:
  62. return nil, ErrFrame
  63. }
  64. if _, err = io.ReadFull(r.r, scratch[:1]); err != nil {
  65. return nil, err
  66. }
  67. if scratch[0] != frameEnd {
  68. return nil, ErrFrame
  69. }
  70. return
  71. }
  72. func readShortstr(r io.Reader) (v string, err error) {
  73. var length uint8
  74. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  75. return
  76. }
  77. bytes := make([]byte, length)
  78. if _, err = io.ReadFull(r, bytes); err != nil {
  79. return
  80. }
  81. return string(bytes), nil
  82. }
  83. func readLongstr(r io.Reader) (v string, err error) {
  84. var length uint32
  85. if err = binary.Read(r, binary.BigEndian, &length); err != nil {
  86. return
  87. }
  88. // slices can't be longer than max int32 value
  89. if length > (^uint32(0) >> 1) {
  90. return
  91. }
  92. bytes := make([]byte, length)
  93. if _, err = io.ReadFull(r, bytes); err != nil {
  94. return
  95. }
  96. return string(bytes), nil
  97. }
  98. func readDecimal(r io.Reader) (v Decimal, err error) {
  99. if err = binary.Read(r, binary.BigEndian, &v.Scale); err != nil {
  100. return
  101. }
  102. if err = binary.Read(r, binary.BigEndian, &v.Value); err != nil {
  103. return
  104. }
  105. return
  106. }
  107. func readFloat32(r io.Reader) (v float32, err error) {
  108. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  109. return
  110. }
  111. return
  112. }
  113. func readFloat64(r io.Reader) (v float64, err error) {
  114. if err = binary.Read(r, binary.BigEndian, &v); err != nil {
  115. return
  116. }
  117. return
  118. }
  119. func readTimestamp(r io.Reader) (v time.Time, err error) {
  120. var sec int64
  121. if err = binary.Read(r, binary.BigEndian, &sec); err != nil {
  122. return
  123. }
  124. return time.Unix(sec, 0), nil
  125. }
  126. /*
  127. 'A': []interface{}
  128. 'D': Decimal
  129. 'F': Table
  130. 'I': int32
  131. 'S': string
  132. 'T': time.Time
  133. 'V': nil
  134. 'b': byte
  135. 'd': float64
  136. 'f': float32
  137. 'l': int64
  138. 's': int16
  139. 't': bool
  140. 'x': []byte
  141. */
  142. func readField(r io.Reader) (v interface{}, err error) {
  143. var typ byte
  144. if err = binary.Read(r, binary.BigEndian, &typ); err != nil {
  145. return
  146. }
  147. switch typ {
  148. case 't':
  149. var value uint8
  150. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  151. return
  152. }
  153. return (value != 0), nil
  154. case 'b':
  155. var value [1]byte
  156. if _, err = io.ReadFull(r, value[0:1]); err != nil {
  157. return
  158. }
  159. return value[0], nil
  160. case 's':
  161. var value int16
  162. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  163. return
  164. }
  165. return value, nil
  166. case 'I':
  167. var value int32
  168. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  169. return
  170. }
  171. return value, nil
  172. case 'l':
  173. var value int64
  174. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  175. return
  176. }
  177. return value, nil
  178. case 'f':
  179. var value float32
  180. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  181. return
  182. }
  183. return value, nil
  184. case 'd':
  185. var value float64
  186. if err = binary.Read(r, binary.BigEndian, &value); err != nil {
  187. return
  188. }
  189. return value, nil
  190. case 'D':
  191. return readDecimal(r)
  192. case 'S':
  193. return readLongstr(r)
  194. case 'A':
  195. return readArray(r)
  196. case 'T':
  197. return readTimestamp(r)
  198. case 'F':
  199. return readTable(r)
  200. case 'x':
  201. var len int32
  202. if err = binary.Read(r, binary.BigEndian, &len); err != nil {
  203. return nil, err
  204. }
  205. value := make([]byte, len)
  206. if _, err = io.ReadFull(r, value); err != nil {
  207. return nil, err
  208. }
  209. return value, err
  210. case 'V':
  211. return nil, nil
  212. }
  213. return nil, ErrSyntax
  214. }
  215. /*
  216. Field tables are long strings that contain packed name-value pairs. The
  217. name-value pairs are encoded as short string defining the name, and octet
  218. defining the values type and then the value itself. The valid field types for
  219. tables are an extension of the native integer, bit, string, and timestamp
  220. types, and are shown in the grammar. Multi-octet integer fields are always
  221. held in network byte order.
  222. */
  223. func readTable(r io.Reader) (table Table, err error) {
  224. var nested bytes.Buffer
  225. var str string
  226. if str, err = readLongstr(r); err != nil {
  227. return
  228. }
  229. nested.Write([]byte(str))
  230. table = make(Table)
  231. for nested.Len() > 0 {
  232. var key string
  233. var value interface{}
  234. if key, err = readShortstr(&nested); err != nil {
  235. return
  236. }
  237. if value, err = readField(&nested); err != nil {
  238. return
  239. }
  240. table[key] = value
  241. }
  242. return
  243. }
  244. func readArray(r io.Reader) ([]interface{}, error) {
  245. var (
  246. size uint32
  247. err error
  248. )
  249. if err = binary.Read(r, binary.BigEndian, &size); err != nil {
  250. return nil, err
  251. }
  252. var (
  253. lim = &io.LimitedReader{R: r, N: int64(size)}
  254. arr = []interface{}{}
  255. field interface{}
  256. )
  257. for {
  258. if field, err = readField(lim); err != nil {
  259. if err == io.EOF {
  260. break
  261. }
  262. return nil, err
  263. }
  264. arr = append(arr, field)
  265. }
  266. return arr, nil
  267. }
  268. // Checks if this bit mask matches the flags bitset
  269. func hasProperty(mask uint16, prop int) bool {
  270. return int(mask)&prop > 0
  271. }
  272. func (r *reader) parseHeaderFrame(channel uint16, size uint32) (frame frame, err error) {
  273. hf := &headerFrame{
  274. ChannelId: channel,
  275. }
  276. if err = binary.Read(r.r, binary.BigEndian, &hf.ClassId); err != nil {
  277. return
  278. }
  279. if err = binary.Read(r.r, binary.BigEndian, &hf.weight); err != nil {
  280. return
  281. }
  282. if err = binary.Read(r.r, binary.BigEndian, &hf.Size); err != nil {
  283. return
  284. }
  285. var flags uint16
  286. if err = binary.Read(r.r, binary.BigEndian, &flags); err != nil {
  287. return
  288. }
  289. if hasProperty(flags, flagContentType) {
  290. if hf.Properties.ContentType, err = readShortstr(r.r); err != nil {
  291. return
  292. }
  293. }
  294. if hasProperty(flags, flagContentEncoding) {
  295. if hf.Properties.ContentEncoding, err = readShortstr(r.r); err != nil {
  296. return
  297. }
  298. }
  299. if hasProperty(flags, flagHeaders) {
  300. if hf.Properties.Headers, err = readTable(r.r); err != nil {
  301. return
  302. }
  303. }
  304. if hasProperty(flags, flagDeliveryMode) {
  305. if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.DeliveryMode); err != nil {
  306. return
  307. }
  308. }
  309. if hasProperty(flags, flagPriority) {
  310. if err = binary.Read(r.r, binary.BigEndian, &hf.Properties.Priority); err != nil {
  311. return
  312. }
  313. }
  314. if hasProperty(flags, flagCorrelationId) {
  315. if hf.Properties.CorrelationId, err = readShortstr(r.r); err != nil {
  316. return
  317. }
  318. }
  319. if hasProperty(flags, flagReplyTo) {
  320. if hf.Properties.ReplyTo, err = readShortstr(r.r); err != nil {
  321. return
  322. }
  323. }
  324. if hasProperty(flags, flagExpiration) {
  325. if hf.Properties.Expiration, err = readShortstr(r.r); err != nil {
  326. return
  327. }
  328. }
  329. if hasProperty(flags, flagMessageId) {
  330. if hf.Properties.MessageId, err = readShortstr(r.r); err != nil {
  331. return
  332. }
  333. }
  334. if hasProperty(flags, flagTimestamp) {
  335. if hf.Properties.Timestamp, err = readTimestamp(r.r); err != nil {
  336. return
  337. }
  338. }
  339. if hasProperty(flags, flagType) {
  340. if hf.Properties.Type, err = readShortstr(r.r); err != nil {
  341. return
  342. }
  343. }
  344. if hasProperty(flags, flagUserId) {
  345. if hf.Properties.UserId, err = readShortstr(r.r); err != nil {
  346. return
  347. }
  348. }
  349. if hasProperty(flags, flagAppId) {
  350. if hf.Properties.AppId, err = readShortstr(r.r); err != nil {
  351. return
  352. }
  353. }
  354. if hasProperty(flags, flagReserved1) {
  355. if hf.Properties.reserved1, err = readShortstr(r.r); err != nil {
  356. return
  357. }
  358. }
  359. return hf, nil
  360. }
  361. func (r *reader) parseBodyFrame(channel uint16, size uint32) (frame frame, err error) {
  362. bf := &bodyFrame{
  363. ChannelId: channel,
  364. Body: make([]byte, size),
  365. }
  366. if _, err = io.ReadFull(r.r, bf.Body); err != nil {
  367. return nil, err
  368. }
  369. return bf, nil
  370. }
  371. var errHeartbeatPayload = errors.New("Heartbeats should not have a payload")
  372. func (r *reader) parseHeartbeatFrame(channel uint16, size uint32) (frame frame, err error) {
  373. hf := &heartbeatFrame{
  374. ChannelId: channel,
  375. }
  376. if size > 0 {
  377. return nil, errHeartbeatPayload
  378. }
  379. return hf, nil
  380. }