write.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "bufio"
  8. "bytes"
  9. "encoding/binary"
  10. "errors"
  11. "io"
  12. "math"
  13. "time"
  14. )
  15. func (me *writer) WriteFrame(frame frame) (err error) {
  16. if err = frame.write(me.w); err != nil {
  17. return
  18. }
  19. if buf, ok := me.w.(*bufio.Writer); ok {
  20. err = buf.Flush()
  21. }
  22. return
  23. }
  24. func (me *methodFrame) write(w io.Writer) (err error) {
  25. var payload bytes.Buffer
  26. if me.Method == nil {
  27. return errors.New("malformed frame: missing method")
  28. }
  29. class, method := me.Method.id()
  30. if err = binary.Write(&payload, binary.BigEndian, class); err != nil {
  31. return
  32. }
  33. if err = binary.Write(&payload, binary.BigEndian, method); err != nil {
  34. return
  35. }
  36. if err = me.Method.write(&payload); err != nil {
  37. return
  38. }
  39. return writeFrame(w, frameMethod, me.ChannelId, payload.Bytes())
  40. }
  41. // Heartbeat
  42. //
  43. // Payload is empty
  44. func (me *heartbeatFrame) write(w io.Writer) (err error) {
  45. return writeFrame(w, frameHeartbeat, me.ChannelId, []byte{})
  46. }
  47. // CONTENT HEADER
  48. // 0 2 4 12 14
  49. // +----------+--------+-----------+----------------+------------- - -
  50. // | class-id | weight | body size | property flags | property list...
  51. // +----------+--------+-----------+----------------+------------- - -
  52. // short short long long short remainder...
  53. //
  54. func (me *headerFrame) write(w io.Writer) (err error) {
  55. var payload bytes.Buffer
  56. var zeroTime time.Time
  57. if err = binary.Write(&payload, binary.BigEndian, me.ClassId); err != nil {
  58. return
  59. }
  60. if err = binary.Write(&payload, binary.BigEndian, me.weight); err != nil {
  61. return
  62. }
  63. if err = binary.Write(&payload, binary.BigEndian, me.Size); err != nil {
  64. return
  65. }
  66. // First pass will build the mask to be serialized, second pass will serialize
  67. // each of the fields that appear in the mask.
  68. var mask uint16
  69. if len(me.Properties.ContentType) > 0 {
  70. mask = mask | flagContentType
  71. }
  72. if len(me.Properties.ContentEncoding) > 0 {
  73. mask = mask | flagContentEncoding
  74. }
  75. if me.Properties.Headers != nil && len(me.Properties.Headers) > 0 {
  76. mask = mask | flagHeaders
  77. }
  78. if me.Properties.DeliveryMode > 0 {
  79. mask = mask | flagDeliveryMode
  80. }
  81. if me.Properties.Priority > 0 {
  82. mask = mask | flagPriority
  83. }
  84. if len(me.Properties.CorrelationId) > 0 {
  85. mask = mask | flagCorrelationId
  86. }
  87. if len(me.Properties.ReplyTo) > 0 {
  88. mask = mask | flagReplyTo
  89. }
  90. if len(me.Properties.Expiration) > 0 {
  91. mask = mask | flagExpiration
  92. }
  93. if len(me.Properties.MessageId) > 0 {
  94. mask = mask | flagMessageId
  95. }
  96. if me.Properties.Timestamp != zeroTime {
  97. mask = mask | flagTimestamp
  98. }
  99. if len(me.Properties.Type) > 0 {
  100. mask = mask | flagType
  101. }
  102. if len(me.Properties.UserId) > 0 {
  103. mask = mask | flagUserId
  104. }
  105. if len(me.Properties.AppId) > 0 {
  106. mask = mask | flagAppId
  107. }
  108. if err = binary.Write(&payload, binary.BigEndian, mask); err != nil {
  109. return
  110. }
  111. if hasProperty(mask, flagContentType) {
  112. if err = writeShortstr(&payload, me.Properties.ContentType); err != nil {
  113. return
  114. }
  115. }
  116. if hasProperty(mask, flagContentEncoding) {
  117. if err = writeShortstr(&payload, me.Properties.ContentEncoding); err != nil {
  118. return
  119. }
  120. }
  121. if hasProperty(mask, flagHeaders) {
  122. if err = writeTable(&payload, me.Properties.Headers); err != nil {
  123. return
  124. }
  125. }
  126. if hasProperty(mask, flagDeliveryMode) {
  127. if err = binary.Write(&payload, binary.BigEndian, me.Properties.DeliveryMode); err != nil {
  128. return
  129. }
  130. }
  131. if hasProperty(mask, flagPriority) {
  132. if err = binary.Write(&payload, binary.BigEndian, me.Properties.Priority); err != nil {
  133. return
  134. }
  135. }
  136. if hasProperty(mask, flagCorrelationId) {
  137. if err = writeShortstr(&payload, me.Properties.CorrelationId); err != nil {
  138. return
  139. }
  140. }
  141. if hasProperty(mask, flagReplyTo) {
  142. if err = writeShortstr(&payload, me.Properties.ReplyTo); err != nil {
  143. return
  144. }
  145. }
  146. if hasProperty(mask, flagExpiration) {
  147. if err = writeShortstr(&payload, me.Properties.Expiration); err != nil {
  148. return
  149. }
  150. }
  151. if hasProperty(mask, flagMessageId) {
  152. if err = writeShortstr(&payload, me.Properties.MessageId); err != nil {
  153. return
  154. }
  155. }
  156. if hasProperty(mask, flagTimestamp) {
  157. if err = binary.Write(&payload, binary.BigEndian, uint64(me.Properties.Timestamp.Unix())); err != nil {
  158. return
  159. }
  160. }
  161. if hasProperty(mask, flagType) {
  162. if err = writeShortstr(&payload, me.Properties.Type); err != nil {
  163. return
  164. }
  165. }
  166. if hasProperty(mask, flagUserId) {
  167. if err = writeShortstr(&payload, me.Properties.UserId); err != nil {
  168. return
  169. }
  170. }
  171. if hasProperty(mask, flagAppId) {
  172. if err = writeShortstr(&payload, me.Properties.AppId); err != nil {
  173. return
  174. }
  175. }
  176. return writeFrame(w, frameHeader, me.ChannelId, payload.Bytes())
  177. }
  178. // Body
  179. //
  180. // Payload is one byterange from the full body who's size is declared in the
  181. // Header frame
  182. func (me *bodyFrame) write(w io.Writer) (err error) {
  183. return writeFrame(w, frameBody, me.ChannelId, me.Body)
  184. }
  185. func writeFrame(w io.Writer, typ uint8, channel uint16, payload []byte) (err error) {
  186. end := []byte{frameEnd}
  187. size := uint(len(payload))
  188. _, err = w.Write([]byte{
  189. byte(typ),
  190. byte((channel & 0xff00) >> 8),
  191. byte((channel & 0x00ff) >> 0),
  192. byte((size & 0xff000000) >> 24),
  193. byte((size & 0x00ff0000) >> 16),
  194. byte((size & 0x0000ff00) >> 8),
  195. byte((size & 0x000000ff) >> 0),
  196. })
  197. if err != nil {
  198. return
  199. }
  200. if _, err = w.Write(payload); err != nil {
  201. return
  202. }
  203. if _, err = w.Write(end); err != nil {
  204. return
  205. }
  206. return
  207. }
  208. func writeShortstr(w io.Writer, s string) (err error) {
  209. b := []byte(s)
  210. var length uint8 = uint8(len(b))
  211. if err = binary.Write(w, binary.BigEndian, length); err != nil {
  212. return
  213. }
  214. if _, err = w.Write(b[:length]); err != nil {
  215. return
  216. }
  217. return
  218. }
  219. func writeLongstr(w io.Writer, s string) (err error) {
  220. b := []byte(s)
  221. var length uint32 = uint32(len(b))
  222. if err = binary.Write(w, binary.BigEndian, length); err != nil {
  223. return
  224. }
  225. if _, err = w.Write(b[:length]); err != nil {
  226. return
  227. }
  228. return
  229. }
  230. /*
  231. 'A': []interface{}
  232. 'D': Decimal
  233. 'F': Table
  234. 'I': int32
  235. 'S': string
  236. 'T': time.Time
  237. 'V': nil
  238. 'b': byte
  239. 'd': float64
  240. 'f': float32
  241. 'l': int64
  242. 's': int16
  243. 't': bool
  244. 'x': []byte
  245. */
  246. func writeField(w io.Writer, value interface{}) (err error) {
  247. var buf [9]byte
  248. var enc []byte
  249. switch v := value.(type) {
  250. case bool:
  251. buf[0] = 't'
  252. if v {
  253. buf[1] = byte(1)
  254. } else {
  255. buf[1] = byte(0)
  256. }
  257. enc = buf[:2]
  258. case byte:
  259. buf[0] = 'b'
  260. buf[1] = byte(v)
  261. enc = buf[:2]
  262. case int16:
  263. buf[0] = 's'
  264. binary.BigEndian.PutUint16(buf[1:3], uint16(v))
  265. enc = buf[:3]
  266. case int32:
  267. buf[0] = 'I'
  268. binary.BigEndian.PutUint32(buf[1:5], uint32(v))
  269. enc = buf[:5]
  270. case int64:
  271. buf[0] = 'l'
  272. binary.BigEndian.PutUint64(buf[1:9], uint64(v))
  273. enc = buf[:9]
  274. case float32:
  275. buf[0] = 'f'
  276. binary.BigEndian.PutUint32(buf[1:5], math.Float32bits(v))
  277. enc = buf[:5]
  278. case float64:
  279. buf[0] = 'd'
  280. binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(v))
  281. enc = buf[:9]
  282. case Decimal:
  283. buf[0] = 'D'
  284. buf[1] = byte(v.Scale)
  285. binary.BigEndian.PutUint32(buf[2:6], uint32(v.Value))
  286. enc = buf[:6]
  287. case string:
  288. buf[0] = 'S'
  289. binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
  290. enc = append(buf[:5], []byte(v)...)
  291. case []interface{}: // field-array
  292. buf[0] = 'A'
  293. sec := new(bytes.Buffer)
  294. for _, val := range v {
  295. if err = writeField(sec, val); err != nil {
  296. return
  297. }
  298. }
  299. binary.BigEndian.PutUint32(buf[1:5], uint32(sec.Len()))
  300. if _, err = w.Write(buf[:5]); err != nil {
  301. return
  302. }
  303. if _, err = w.Write(sec.Bytes()); err != nil {
  304. return
  305. }
  306. return
  307. case time.Time:
  308. buf[0] = 'T'
  309. binary.BigEndian.PutUint64(buf[1:9], uint64(v.Unix()))
  310. enc = buf[:9]
  311. case Table:
  312. if _, err = w.Write([]byte{'F'}); err != nil {
  313. return
  314. }
  315. return writeTable(w, v)
  316. case []byte:
  317. buf[0] = 'x'
  318. binary.BigEndian.PutUint32(buf[1:5], uint32(len(v)))
  319. if _, err = w.Write(buf[0:5]); err != nil {
  320. return
  321. }
  322. if _, err = w.Write(v); err != nil {
  323. return
  324. }
  325. return
  326. case nil:
  327. buf[0] = 'V'
  328. enc = buf[:1]
  329. default:
  330. return ErrFieldType
  331. }
  332. _, err = w.Write(enc)
  333. return
  334. }
  335. func writeTable(w io.Writer, table Table) (err error) {
  336. var buf bytes.Buffer
  337. for key, val := range table {
  338. if err = writeShortstr(&buf, key); err != nil {
  339. return
  340. }
  341. if err = writeField(&buf, val); err != nil {
  342. return
  343. }
  344. }
  345. return writeLongstr(w, string(buf.Bytes()))
  346. }