delivery.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "errors"
  8. "time"
  9. )
  10. var errDeliveryNotInitialized = errors.New("delivery not initialized")
  11. // Acknowledger notifies the server of successful or failed consumption of
  12. // delivieries via identifier found in the Delivery.DeliveryTag field.
  13. //
  14. // Applications can provide mock implementations in tests of Delivery handlers.
  15. type Acknowledger interface {
  16. Ack(tag uint64, multiple bool) error
  17. Nack(tag uint64, multiple bool, requeue bool) error
  18. Reject(tag uint64, requeue bool) error
  19. }
  20. // Delivery captures the fields for a previously delivered message resident in
  21. // a queue to be delivered by the server to a consumer from Channel.Consume or
  22. // Channel.Get.
  23. type Delivery struct {
  24. Acknowledger Acknowledger // the channel from which this delivery arrived
  25. Headers Table // Application or header exchange table
  26. // Properties
  27. ContentType string // MIME content type
  28. ContentEncoding string // MIME content encoding
  29. DeliveryMode uint8 // queue implemention use - non-persistent (1) or persistent (2)
  30. Priority uint8 // queue implementation use - 0 to 9
  31. CorrelationId string // application use - correlation identifier
  32. ReplyTo string // application use - address to to reply to (ex: RPC)
  33. Expiration string // implementation use - message expiration spec
  34. MessageId string // application use - message identifier
  35. Timestamp time.Time // application use - message timestamp
  36. Type string // application use - message type name
  37. UserId string // application use - creating user - should be authenticated user
  38. AppId string // application use - creating application id
  39. // Valid only with Channel.Consume
  40. ConsumerTag string
  41. // Valid only with Channel.Get
  42. MessageCount uint32
  43. DeliveryTag uint64
  44. Redelivered bool
  45. Exchange string // basic.publish exhange
  46. RoutingKey string // basic.publish routing key
  47. Body []byte
  48. }
  49. func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
  50. props, body := msg.getContent()
  51. delivery := Delivery{
  52. Acknowledger: channel,
  53. Headers: props.Headers,
  54. ContentType: props.ContentType,
  55. ContentEncoding: props.ContentEncoding,
  56. DeliveryMode: props.DeliveryMode,
  57. Priority: props.Priority,
  58. CorrelationId: props.CorrelationId,
  59. ReplyTo: props.ReplyTo,
  60. Expiration: props.Expiration,
  61. MessageId: props.MessageId,
  62. Timestamp: props.Timestamp,
  63. Type: props.Type,
  64. UserId: props.UserId,
  65. AppId: props.AppId,
  66. Body: body,
  67. }
  68. // Properties for the delivery types
  69. switch m := msg.(type) {
  70. case *basicDeliver:
  71. delivery.ConsumerTag = m.ConsumerTag
  72. delivery.DeliveryTag = m.DeliveryTag
  73. delivery.Redelivered = m.Redelivered
  74. delivery.Exchange = m.Exchange
  75. delivery.RoutingKey = m.RoutingKey
  76. case *basicGetOk:
  77. delivery.MessageCount = m.MessageCount
  78. delivery.DeliveryTag = m.DeliveryTag
  79. delivery.Redelivered = m.Redelivered
  80. delivery.Exchange = m.Exchange
  81. delivery.RoutingKey = m.RoutingKey
  82. }
  83. return &delivery
  84. }
  85. /*
  86. Ack delegates an acknowledgement through the Acknowledger interface that the
  87. client or server has finished work on a delivery.
  88. All deliveries in AMQP must be acknowledged. If you called Channel.Consume
  89. with autoAck true then the server will be automatically ack each message and
  90. this method should not be called. Otherwise, you must call Delivery.Ack after
  91. you have successfully processed this delivery.
  92. When multiple is true, this delivery and all prior unacknowledged deliveries
  93. on the same channel will be acknowledged. This is useful for batch processing
  94. of deliveries.
  95. An error will indicate that the acknowledge could not be delivered to the
  96. channel it was sent from.
  97. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
  98. delivery that is not automatically acknowledged.
  99. */
  100. func (me Delivery) Ack(multiple bool) error {
  101. if me.Acknowledger == nil {
  102. return errDeliveryNotInitialized
  103. }
  104. return me.Acknowledger.Ack(me.DeliveryTag, multiple)
  105. }
  106. /*
  107. Reject delegates a negatively acknowledgement through the Acknowledger interface.
  108. When requeue is true, queue this message to be delivered to a consumer on a
  109. different channel. When requeue is false or the server is unable to queue this
  110. message, it will be dropped.
  111. If you are batch processing deliveries, and your server supports it, prefer
  112. Delivery.Nack.
  113. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
  114. delivery that is not automatically acknowledged.
  115. */
  116. func (me Delivery) Reject(requeue bool) error {
  117. if me.Acknowledger == nil {
  118. return errDeliveryNotInitialized
  119. }
  120. return me.Acknowledger.Reject(me.DeliveryTag, requeue)
  121. }
  122. /*
  123. Nack negatively acknowledge the delivery of message(s) identified by the
  124. delivery tag from either the client or server.
  125. When multiple is true, nack messages up to and including delivered messages up
  126. until the delivery tag delivered on the same channel.
  127. When requeue is true, request the server to deliver this message to a different
  128. consumer. If it is not possible or requeue is false, the message will be
  129. dropped or delivered to a server configured dead-letter queue.
  130. This method must not be used to select or requeue messages the client wishes
  131. not to handle, rather it is to inform the server that the client is incapable
  132. of handling this message at this time.
  133. Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
  134. delivery that is not automatically acknowledged.
  135. */
  136. func (me Delivery) Nack(multiple, requeue bool) error {
  137. if me.Acknowledger == nil {
  138. return errDeliveryNotInitialized
  139. }
  140. return me.Acknowledger.Nack(me.DeliveryTag, multiple, requeue)
  141. }