channel.go 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593
  1. // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Source code and contact info at http://github.com/streadway/amqp
  5. package amqp
  6. import (
  7. "reflect"
  8. "sync"
  9. "sync/atomic"
  10. )
  11. // 0 1 3 7 size+7 size+8
  12. // +------+---------+-------------+ +------------+ +-----------+
  13. // | type | channel | size | | payload | | frame-end |
  14. // +------+---------+-------------+ +------------+ +-----------+
  15. // octet short long size octets octet
  16. const frameHeaderSize = 1 + 2 + 4 + 1
  17. /*
  18. Channel represents an AMQP channel. Used as a context for valid message
  19. exchange. Errors on methods with this Channel as a receiver means this channel
  20. should be discarded and a new channel established.
  21. */
  22. type Channel struct {
  23. destructor sync.Once
  24. m sync.Mutex // struct field mutex
  25. confirmM sync.Mutex // publisher confirms state mutex
  26. notifyM sync.RWMutex
  27. connection *Connection
  28. rpc chan message
  29. consumers *consumers
  30. id uint16
  31. // closed is set to 1 when the channel has been closed - see Channel.send()
  32. closed int32
  33. // true when we will never notify again
  34. noNotify bool
  35. // Channel and Connection exceptions will be broadcast on these listeners.
  36. closes []chan *Error
  37. // Listeners for active=true flow control. When true is sent to a listener,
  38. // publishing should pause until false is sent to listeners.
  39. flows []chan bool
  40. // Listeners for returned publishings for unroutable messages on mandatory
  41. // publishings or undeliverable messages on immediate publishings.
  42. returns []chan Return
  43. // Listeners for when the server notifies the client that
  44. // a consumer has been cancelled.
  45. cancels []chan string
  46. // Allocated when in confirm mode in order to track publish counter and order confirms
  47. confirms *confirms
  48. confirming bool
  49. // Selects on any errors from shutdown during RPC
  50. errors chan *Error
  51. // State machine that manages frame order, must only be mutated by the connection
  52. recv func(*Channel, frame) error
  53. // Current state for frame re-assembly, only mutated from recv
  54. message messageWithContent
  55. header *headerFrame
  56. body []byte
  57. }
  58. // Constructs a new channel with the given framing rules
  59. func newChannel(c *Connection, id uint16) *Channel {
  60. return &Channel{
  61. connection: c,
  62. id: id,
  63. rpc: make(chan message),
  64. consumers: makeConsumers(),
  65. confirms: newConfirms(),
  66. recv: (*Channel).recvMethod,
  67. errors: make(chan *Error, 1),
  68. }
  69. }
  70. // shutdown is called by Connection after the channel has been removed from the
  71. // connection registry.
  72. func (ch *Channel) shutdown(e *Error) {
  73. ch.destructor.Do(func() {
  74. ch.m.Lock()
  75. defer ch.m.Unlock()
  76. // Grab an exclusive lock for the notify channels
  77. ch.notifyM.Lock()
  78. defer ch.notifyM.Unlock()
  79. // Broadcast abnormal shutdown
  80. if e != nil {
  81. for _, c := range ch.closes {
  82. c <- e
  83. }
  84. }
  85. // Signal that from now on, Channel.send() should call
  86. // Channel.sendClosed()
  87. atomic.StoreInt32(&ch.closed, 1)
  88. // Notify RPC if we're selecting
  89. if e != nil {
  90. ch.errors <- e
  91. }
  92. ch.consumers.close()
  93. for _, c := range ch.closes {
  94. close(c)
  95. }
  96. for _, c := range ch.flows {
  97. close(c)
  98. }
  99. for _, c := range ch.returns {
  100. close(c)
  101. }
  102. for _, c := range ch.cancels {
  103. close(c)
  104. }
  105. // Set the slices to nil to prevent the dispatch() range from sending on
  106. // the now closed channels after we release the notifyM mutex
  107. ch.flows = nil
  108. ch.closes = nil
  109. ch.returns = nil
  110. ch.cancels = nil
  111. if ch.confirms != nil {
  112. ch.confirms.Close()
  113. }
  114. close(ch.errors)
  115. ch.noNotify = true
  116. })
  117. }
  118. // send calls Channel.sendOpen() during normal operation.
  119. //
  120. // After the channel has been closed, send calls Channel.sendClosed(), ensuring
  121. // only 'channel.close' is sent to the server.
  122. func (ch *Channel) send(msg message) (err error) {
  123. // If the channel is closed, use Channel.sendClosed()
  124. if atomic.LoadInt32(&ch.closed) == 1 {
  125. return ch.sendClosed(msg)
  126. }
  127. return ch.sendOpen(msg)
  128. }
  129. func (ch *Channel) open() error {
  130. return ch.call(&channelOpen{}, &channelOpenOk{})
  131. }
  132. // Performs a request/response call for when the message is not NoWait and is
  133. // specified as Synchronous.
  134. func (ch *Channel) call(req message, res ...message) error {
  135. if err := ch.send(req); err != nil {
  136. return err
  137. }
  138. if req.wait() {
  139. select {
  140. case e, ok := <-ch.errors:
  141. if ok {
  142. return e
  143. }
  144. return ErrClosed
  145. case msg := <-ch.rpc:
  146. if msg != nil {
  147. for _, try := range res {
  148. if reflect.TypeOf(msg) == reflect.TypeOf(try) {
  149. // *res = *msg
  150. vres := reflect.ValueOf(try).Elem()
  151. vmsg := reflect.ValueOf(msg).Elem()
  152. vres.Set(vmsg)
  153. return nil
  154. }
  155. }
  156. return ErrCommandInvalid
  157. }
  158. // RPC channel has been closed without an error, likely due to a hard
  159. // error on the Connection. This indicates we have already been
  160. // shutdown and if were waiting, will have returned from the errors chan.
  161. return ErrClosed
  162. }
  163. }
  164. return nil
  165. }
  166. func (ch *Channel) sendClosed(msg message) (err error) {
  167. // After a 'channel.close' is sent or received the only valid response is
  168. // channel.close-ok
  169. if _, ok := msg.(*channelCloseOk); ok {
  170. return ch.connection.send(&methodFrame{
  171. ChannelId: ch.id,
  172. Method: msg,
  173. })
  174. }
  175. return ErrClosed
  176. }
  177. func (ch *Channel) sendOpen(msg message) (err error) {
  178. if content, ok := msg.(messageWithContent); ok {
  179. props, body := content.getContent()
  180. class, _ := content.id()
  181. // catch client max frame size==0 and server max frame size==0
  182. // set size to length of what we're trying to publish
  183. var size int
  184. if ch.connection.Config.FrameSize > 0 {
  185. size = ch.connection.Config.FrameSize - frameHeaderSize
  186. } else {
  187. size = len(body)
  188. }
  189. if err = ch.connection.send(&methodFrame{
  190. ChannelId: ch.id,
  191. Method: content,
  192. }); err != nil {
  193. return
  194. }
  195. if err = ch.connection.send(&headerFrame{
  196. ChannelId: ch.id,
  197. ClassId: class,
  198. Size: uint64(len(body)),
  199. Properties: props,
  200. }); err != nil {
  201. return
  202. }
  203. // chunk body into size (max frame size - frame header size)
  204. for i, j := 0, size; i < len(body); i, j = j, j+size {
  205. if j > len(body) {
  206. j = len(body)
  207. }
  208. if err = ch.connection.send(&bodyFrame{
  209. ChannelId: ch.id,
  210. Body: body[i:j],
  211. }); err != nil {
  212. return
  213. }
  214. }
  215. } else {
  216. err = ch.connection.send(&methodFrame{
  217. ChannelId: ch.id,
  218. Method: msg,
  219. })
  220. }
  221. return
  222. }
  223. // Eventually called via the state machine from the connection's reader
  224. // goroutine, so assumes serialized access.
  225. func (ch *Channel) dispatch(msg message) {
  226. switch m := msg.(type) {
  227. case *channelClose:
  228. // lock before sending connection.close-ok
  229. // to avoid unexpected interleaving with basic.publish frames if
  230. // publishing is happening concurrently
  231. ch.m.Lock()
  232. ch.send(&channelCloseOk{})
  233. ch.m.Unlock()
  234. ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))
  235. case *channelFlow:
  236. ch.notifyM.RLock()
  237. for _, c := range ch.flows {
  238. c <- m.Active
  239. }
  240. ch.notifyM.RUnlock()
  241. ch.send(&channelFlowOk{Active: m.Active})
  242. case *basicCancel:
  243. ch.notifyM.RLock()
  244. for _, c := range ch.cancels {
  245. c <- m.ConsumerTag
  246. }
  247. ch.notifyM.RUnlock()
  248. ch.consumers.cancel(m.ConsumerTag)
  249. case *basicReturn:
  250. ret := newReturn(*m)
  251. ch.notifyM.RLock()
  252. for _, c := range ch.returns {
  253. c <- *ret
  254. }
  255. ch.notifyM.RUnlock()
  256. case *basicAck:
  257. if ch.confirming {
  258. if m.Multiple {
  259. ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
  260. } else {
  261. ch.confirms.One(Confirmation{m.DeliveryTag, true})
  262. }
  263. }
  264. case *basicNack:
  265. if ch.confirming {
  266. if m.Multiple {
  267. ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
  268. } else {
  269. ch.confirms.One(Confirmation{m.DeliveryTag, false})
  270. }
  271. }
  272. case *basicDeliver:
  273. ch.consumers.send(m.ConsumerTag, newDelivery(ch, m))
  274. // TODO log failed consumer and close channel, this can happen when
  275. // deliveries are in flight and a no-wait cancel has happened
  276. default:
  277. ch.rpc <- msg
  278. }
  279. }
  280. func (ch *Channel) transition(f func(*Channel, frame) error) error {
  281. ch.recv = f
  282. return nil
  283. }
  284. func (ch *Channel) recvMethod(f frame) error {
  285. switch frame := f.(type) {
  286. case *methodFrame:
  287. if msg, ok := frame.Method.(messageWithContent); ok {
  288. ch.body = make([]byte, 0)
  289. ch.message = msg
  290. return ch.transition((*Channel).recvHeader)
  291. }
  292. ch.dispatch(frame.Method) // termination state
  293. return ch.transition((*Channel).recvMethod)
  294. case *headerFrame:
  295. // drop
  296. return ch.transition((*Channel).recvMethod)
  297. case *bodyFrame:
  298. // drop
  299. return ch.transition((*Channel).recvMethod)
  300. }
  301. panic("unexpected frame type")
  302. }
  303. func (ch *Channel) recvHeader(f frame) error {
  304. switch frame := f.(type) {
  305. case *methodFrame:
  306. // interrupt content and handle method
  307. return ch.recvMethod(f)
  308. case *headerFrame:
  309. // start collecting if we expect body frames
  310. ch.header = frame
  311. if frame.Size == 0 {
  312. ch.message.setContent(ch.header.Properties, ch.body)
  313. ch.dispatch(ch.message) // termination state
  314. return ch.transition((*Channel).recvMethod)
  315. }
  316. return ch.transition((*Channel).recvContent)
  317. case *bodyFrame:
  318. // drop and reset
  319. return ch.transition((*Channel).recvMethod)
  320. }
  321. panic("unexpected frame type")
  322. }
  323. // state after method + header and before the length
  324. // defined by the header has been reached
  325. func (ch *Channel) recvContent(f frame) error {
  326. switch frame := f.(type) {
  327. case *methodFrame:
  328. // interrupt content and handle method
  329. return ch.recvMethod(f)
  330. case *headerFrame:
  331. // drop and reset
  332. return ch.transition((*Channel).recvMethod)
  333. case *bodyFrame:
  334. if cap(ch.body) == 0 {
  335. ch.body = make([]byte, 0, ch.header.Size)
  336. }
  337. ch.body = append(ch.body, frame.Body...)
  338. if uint64(len(ch.body)) >= ch.header.Size {
  339. ch.message.setContent(ch.header.Properties, ch.body)
  340. ch.dispatch(ch.message) // termination state
  341. return ch.transition((*Channel).recvMethod)
  342. }
  343. return ch.transition((*Channel).recvContent)
  344. }
  345. panic("unexpected frame type")
  346. }
  347. /*
  348. Close initiate a clean channel closure by sending a close message with the error
  349. code set to '200'.
  350. It is safe to call this method multiple times.
  351. */
  352. func (ch *Channel) Close() error {
  353. defer ch.connection.closeChannel(ch, nil)
  354. return ch.call(
  355. &channelClose{ReplyCode: replySuccess},
  356. &channelCloseOk{},
  357. )
  358. }
  359. /*
  360. NotifyClose registers a listener for when the server sends a channel or
  361. connection exception in the form of a Connection.Close or Channel.Close method.
  362. Connection exceptions will be broadcast to all open channels and all channels
  363. will be closed, where channel exceptions will only be broadcast to listeners to
  364. this channel.
  365. The chan provided will be closed when the Channel is closed and on a
  366. graceful close, no error will be sent.
  367. */
  368. func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
  369. ch.notifyM.Lock()
  370. defer ch.notifyM.Unlock()
  371. if ch.noNotify {
  372. close(c)
  373. } else {
  374. ch.closes = append(ch.closes, c)
  375. }
  376. return c
  377. }
  378. /*
  379. NotifyFlow registers a listener for basic.flow methods sent by the server.
  380. When `false` is sent on one of the listener channels, all publishers should
  381. pause until a `true` is sent.
  382. The server may ask the producer to pause or restart the flow of Publishings
  383. sent by on a channel. This is a simple flow-control mechanism that a server can
  384. use to avoid overflowing its queues or otherwise finding itself receiving more
  385. messages than it can process. Note that this method is not intended for window
  386. control. It does not affect contents returned by basic.get-ok methods.
  387. When a new channel is opened, it is active (flow is active). Some
  388. applications assume that channels are inactive until started. To emulate
  389. this behavior a client MAY open the channel, then pause it.
  390. Publishers should respond to a flow messages as rapidly as possible and the
  391. server may disconnect over producing channels that do not respect these
  392. messages.
  393. basic.flow-ok methods will always be returned to the server regardless of
  394. the number of listeners there are.
  395. To control the flow of deliveries from the server, use the Channel.Flow()
  396. method instead.
  397. Note: RabbitMQ will rather use TCP pushback on the network connection instead
  398. of sending basic.flow. This means that if a single channel is producing too
  399. much on the same connection, all channels using that connection will suffer,
  400. including acknowledgments from deliveries. Use different Connections if you
  401. desire to interleave consumers and producers in the same process to avoid your
  402. basic.ack messages from getting rate limited with your basic.publish messages.
  403. */
  404. func (ch *Channel) NotifyFlow(c chan bool) chan bool {
  405. ch.notifyM.Lock()
  406. defer ch.notifyM.Unlock()
  407. if ch.noNotify {
  408. close(c)
  409. } else {
  410. ch.flows = append(ch.flows, c)
  411. }
  412. return c
  413. }
  414. /*
  415. NotifyReturn registers a listener for basic.return methods. These can be sent
  416. from the server when a publish is undeliverable either from the mandatory or
  417. immediate flags.
  418. A return struct has a copy of the Publishing along with some error
  419. information about why the publishing failed.
  420. */
  421. func (ch *Channel) NotifyReturn(c chan Return) chan Return {
  422. ch.notifyM.Lock()
  423. defer ch.notifyM.Unlock()
  424. if ch.noNotify {
  425. close(c)
  426. } else {
  427. ch.returns = append(ch.returns, c)
  428. }
  429. return c
  430. }
  431. /*
  432. NotifyCancel registers a listener for basic.cancel methods. These can be sent
  433. from the server when a queue is deleted or when consuming from a mirrored queue
  434. where the master has just failed (and was moved to another node).
  435. The subscription tag is returned to the listener.
  436. */
  437. func (ch *Channel) NotifyCancel(c chan string) chan string {
  438. ch.notifyM.Lock()
  439. defer ch.notifyM.Unlock()
  440. if ch.noNotify {
  441. close(c)
  442. } else {
  443. ch.cancels = append(ch.cancels, c)
  444. }
  445. return c
  446. }
  447. /*
  448. NotifyConfirm calls NotifyPublish and starts a goroutine sending
  449. ordered Ack and Nack DeliveryTag to the respective channels.
  450. For strict ordering, use NotifyPublish instead.
  451. */
  452. func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
  453. confirms := ch.NotifyPublish(make(chan Confirmation, cap(ack)+cap(nack)))
  454. go func() {
  455. for c := range confirms {
  456. if c.Ack {
  457. ack <- c.DeliveryTag
  458. } else {
  459. nack <- c.DeliveryTag
  460. }
  461. }
  462. close(ack)
  463. if nack != ack {
  464. close(nack)
  465. }
  466. }()
  467. return ack, nack
  468. }
  469. /*
  470. NotifyPublish registers a listener for reliable publishing. Receives from this
  471. chan for every publish after Channel.Confirm will be in order starting with
  472. DeliveryTag 1.
  473. There will be one and only one Confirmation Publishing starting with the
  474. delivery tag of 1 and progressing sequentially until the total number of
  475. Publishings have been seen by the server.
  476. Acknowledgments will be received in the order of delivery from the
  477. NotifyPublish channels even if the server acknowledges them out of order.
  478. The listener chan will be closed when the Channel is closed.
  479. The capacity of the chan Confirmation must be at least as large as the
  480. number of outstanding publishings. Not having enough buffered chans will
  481. create a deadlock if you attempt to perform other operations on the Connection
  482. or Channel while confirms are in-flight.
  483. It's advisable to wait for all Confirmations to arrive before calling
  484. Channel.Close() or Connection.Close().
  485. */
  486. func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
  487. ch.notifyM.Lock()
  488. defer ch.notifyM.Unlock()
  489. if ch.noNotify {
  490. close(confirm)
  491. } else {
  492. ch.confirms.Listen(confirm)
  493. }
  494. return confirm
  495. }
  496. /*
  497. Qos controls how many messages or how many bytes the server will try to keep on
  498. the network for consumers before receiving delivery acks. The intent of Qos is
  499. to make sure the network buffers stay full between the server and client.
  500. With a prefetch count greater than zero, the server will deliver that many
  501. messages to consumers before acknowledgments are received. The server ignores
  502. this option when consumers are started with noAck because no acknowledgments
  503. are expected or sent.
  504. With a prefetch size greater than zero, the server will try to keep at least
  505. that many bytes of deliveries flushed to the network before receiving
  506. acknowledgments from the consumers. This option is ignored when consumers are
  507. started with noAck.
  508. When global is true, these Qos settings apply to all existing and future
  509. consumers on all channels on the same connection. When false, the Channel.Qos
  510. settings will apply to all existing and future consumers on this channel.
  511. Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
  512. how the global flag is implemented in RabbitMQ, as it differs from the
  513. AMQP 0.9.1 specification in that global Qos settings are limited in scope to
  514. channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
  515. To get round-robin behavior between consumers consuming from the same queue on
  516. different connections, set the prefetch count to 1, and the next available
  517. message on the server will be delivered to the next available consumer.
  518. If your consumer work time is reasonably consistent and not much greater
  519. than two times your network round trip time, you will see significant
  520. throughput improvements starting with a prefetch count of 2 or slightly
  521. greater as described by benchmarks on RabbitMQ.
  522. http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
  523. */
  524. func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {
  525. return ch.call(
  526. &basicQos{
  527. PrefetchCount: uint16(prefetchCount),
  528. PrefetchSize: uint32(prefetchSize),
  529. Global: global,
  530. },
  531. &basicQosOk{},
  532. )
  533. }
  534. /*
  535. Cancel stops deliveries to the consumer chan established in Channel.Consume and
  536. identified by consumer.
  537. Only use this method to cleanly stop receiving deliveries from the server and
  538. cleanly shut down the consumer chan identified by this tag. Using this method
  539. and waiting for remaining messages to flush from the consumer chan will ensure
  540. all messages received on the network will be delivered to the receiver of your
  541. consumer chan.
  542. Continue consuming from the chan Delivery provided by Channel.Consume until the
  543. chan closes.
  544. When noWait is true, do not wait for the server to acknowledge the cancel.
  545. Only use this when you are certain there are no deliveries in flight that
  546. require an acknowledgment, otherwise they will arrive and be dropped in the
  547. client without an ack, and will not be redelivered to other consumers.
  548. */
  549. func (ch *Channel) Cancel(consumer string, noWait bool) error {
  550. req := &basicCancel{
  551. ConsumerTag: consumer,
  552. NoWait: noWait,
  553. }
  554. res := &basicCancelOk{}
  555. if err := ch.call(req, res); err != nil {
  556. return err
  557. }
  558. if req.wait() {
  559. ch.consumers.cancel(res.ConsumerTag)
  560. } else {
  561. // Potentially could drop deliveries in flight
  562. ch.consumers.cancel(consumer)
  563. }
  564. return nil
  565. }
  566. /*
  567. QueueDeclare declares a queue to hold messages and deliver to consumers.
  568. Declaring creates a queue if it doesn't already exist, or ensures that an
  569. existing queue matches the same parameters.
  570. Every queue declared gets a default binding to the empty exchange "" which has
  571. the type "direct" with the routing key matching the queue's name. With this
  572. default binding, it is possible to publish messages that route directly to
  573. this queue by publishing to "" with the routing key of the queue name.
  574. QueueDeclare("alerts", true, false, false, false, nil)
  575. Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
  576. Delivery Exchange Key Queue
  577. -----------------------------------------------
  578. key: alerts -> "" -> alerts -> alerts
  579. The queue name may be empty, in which case the server will generate a unique name
  580. which will be returned in the Name field of Queue struct.
  581. Durable and Non-Auto-Deleted queues will survive server restarts and remain
  582. when there are no remaining consumers or bindings. Persistent publishings will
  583. be restored in this queue on server restart. These queues are only able to be
  584. bound to durable exchanges.
  585. Non-Durable and Auto-Deleted queues will not be redeclared on server restart
  586. and will be deleted by the server after a short time when the last consumer is
  587. canceled or the last consumer's channel is closed. Queues with this lifetime
  588. can also be deleted normally with QueueDelete. These durable queues can only
  589. be bound to non-durable exchanges.
  590. Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
  591. server is running regardless of how many consumers. This lifetime is useful
  592. for temporary topologies that may have long delays between consumer activity.
  593. These queues can only be bound to non-durable exchanges.
  594. Durable and Auto-Deleted queues will be restored on server restart, but without
  595. active consumers will not survive and be removed. This Lifetime is unlikely
  596. to be useful.
  597. Exclusive queues are only accessible by the connection that declares them and
  598. will be deleted when the connection closes. Channels on other connections
  599. will receive an error when attempting to declare, bind, consume, purge or
  600. delete a queue with the same name.
  601. When noWait is true, the queue will assume to be declared on the server. A
  602. channel exception will arrive if the conditions are met for existing queues
  603. or attempting to modify an existing queue from a different connection.
  604. When the error return value is not nil, you can assume the queue could not be
  605. declared with these parameters, and the channel will be closed.
  606. */
  607. func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
  608. if err := args.Validate(); err != nil {
  609. return Queue{}, err
  610. }
  611. req := &queueDeclare{
  612. Queue: name,
  613. Passive: false,
  614. Durable: durable,
  615. AutoDelete: autoDelete,
  616. Exclusive: exclusive,
  617. NoWait: noWait,
  618. Arguments: args,
  619. }
  620. res := &queueDeclareOk{}
  621. if err := ch.call(req, res); err != nil {
  622. return Queue{}, err
  623. }
  624. if req.wait() {
  625. return Queue{
  626. Name: res.Queue,
  627. Messages: int(res.MessageCount),
  628. Consumers: int(res.ConsumerCount),
  629. }, nil
  630. }
  631. return Queue{Name: name}, nil
  632. }
  633. /*
  634. QueueDeclarePassive is functionally and parametrically equivalent to
  635. QueueDeclare, except that it sets the "passive" attribute to true. A passive
  636. queue is assumed by RabbitMQ to already exist, and attempting to connect to a
  637. non-existent queue will cause RabbitMQ to throw an exception. This function
  638. can be used to test for the existence of a queue.
  639. */
  640. func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
  641. if err := args.Validate(); err != nil {
  642. return Queue{}, err
  643. }
  644. req := &queueDeclare{
  645. Queue: name,
  646. Passive: true,
  647. Durable: durable,
  648. AutoDelete: autoDelete,
  649. Exclusive: exclusive,
  650. NoWait: noWait,
  651. Arguments: args,
  652. }
  653. res := &queueDeclareOk{}
  654. if err := ch.call(req, res); err != nil {
  655. return Queue{}, err
  656. }
  657. if req.wait() {
  658. return Queue{
  659. Name: res.Queue,
  660. Messages: int(res.MessageCount),
  661. Consumers: int(res.ConsumerCount),
  662. }, nil
  663. }
  664. return Queue{Name: name}, nil
  665. }
  666. /*
  667. QueueInspect passively declares a queue by name to inspect the current message
  668. count and consumer count.
  669. Use this method to check how many messages ready for delivery reside in the queue,
  670. how many consumers are receiving deliveries, and whether a queue by this
  671. name already exists.
  672. If the queue by this name exists, use Channel.QueueDeclare check if it is
  673. declared with specific parameters.
  674. If a queue by this name does not exist, an error will be returned and the
  675. channel will be closed.
  676. */
  677. func (ch *Channel) QueueInspect(name string) (Queue, error) {
  678. req := &queueDeclare{
  679. Queue: name,
  680. Passive: true,
  681. }
  682. res := &queueDeclareOk{}
  683. err := ch.call(req, res)
  684. state := Queue{
  685. Name: name,
  686. Messages: int(res.MessageCount),
  687. Consumers: int(res.ConsumerCount),
  688. }
  689. return state, err
  690. }
  691. /*
  692. QueueBind binds an exchange to a queue so that publishings to the exchange will
  693. be routed to the queue when the publishing routing key matches the binding
  694. routing key.
  695. QueueBind("pagers", "alert", "log", false, nil)
  696. QueueBind("emails", "info", "log", false, nil)
  697. Delivery Exchange Key Queue
  698. -----------------------------------------------
  699. key: alert --> log ----> alert --> pagers
  700. key: info ---> log ----> info ---> emails
  701. key: debug --> log (none) (dropped)
  702. If a binding with the same key and arguments already exists between the
  703. exchange and queue, the attempt to rebind will be ignored and the existing
  704. binding will be retained.
  705. In the case that multiple bindings may cause the message to be routed to the
  706. same queue, the server will only route the publishing once. This is possible
  707. with topic exchanges.
  708. QueueBind("pagers", "alert", "amq.topic", false, nil)
  709. QueueBind("emails", "info", "amq.topic", false, nil)
  710. QueueBind("emails", "#", "amq.topic", false, nil) // match everything
  711. Delivery Exchange Key Queue
  712. -----------------------------------------------
  713. key: alert --> amq.topic ----> alert --> pagers
  714. key: info ---> amq.topic ----> # ------> emails
  715. \---> info ---/
  716. key: debug --> amq.topic ----> # ------> emails
  717. It is only possible to bind a durable queue to a durable exchange regardless of
  718. whether the queue or exchange is auto-deleted. Bindings between durable queues
  719. and exchanges will also be restored on server restart.
  720. If the binding could not complete, an error will be returned and the channel
  721. will be closed.
  722. When noWait is false and the queue could not be bound, the channel will be
  723. closed with an error.
  724. */
  725. func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
  726. if err := args.Validate(); err != nil {
  727. return err
  728. }
  729. return ch.call(
  730. &queueBind{
  731. Queue: name,
  732. Exchange: exchange,
  733. RoutingKey: key,
  734. NoWait: noWait,
  735. Arguments: args,
  736. },
  737. &queueBindOk{},
  738. )
  739. }
  740. /*
  741. QueueUnbind removes a binding between an exchange and queue matching the key and
  742. arguments.
  743. It is possible to send and empty string for the exchange name which means to
  744. unbind the queue from the default exchange.
  745. */
  746. func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
  747. if err := args.Validate(); err != nil {
  748. return err
  749. }
  750. return ch.call(
  751. &queueUnbind{
  752. Queue: name,
  753. Exchange: exchange,
  754. RoutingKey: key,
  755. Arguments: args,
  756. },
  757. &queueUnbindOk{},
  758. )
  759. }
  760. /*
  761. QueuePurge removes all messages from the named queue which are not waiting to
  762. be acknowledged. Messages that have been delivered but have not yet been
  763. acknowledged will not be removed.
  764. When successful, returns the number of messages purged.
  765. If noWait is true, do not wait for the server response and the number of
  766. messages purged will not be meaningful.
  767. */
  768. func (ch *Channel) QueuePurge(name string, noWait bool) (int, error) {
  769. req := &queuePurge{
  770. Queue: name,
  771. NoWait: noWait,
  772. }
  773. res := &queuePurgeOk{}
  774. err := ch.call(req, res)
  775. return int(res.MessageCount), err
  776. }
  777. /*
  778. QueueDelete removes the queue from the server including all bindings then
  779. purges the messages based on server configuration, returning the number of
  780. messages purged.
  781. When ifUnused is true, the queue will not be deleted if there are any
  782. consumers on the queue. If there are consumers, an error will be returned and
  783. the channel will be closed.
  784. When ifEmpty is true, the queue will not be deleted if there are any messages
  785. remaining on the queue. If there are messages, an error will be returned and
  786. the channel will be closed.
  787. When noWait is true, the queue will be deleted without waiting for a response
  788. from the server. The purged message count will not be meaningful. If the queue
  789. could not be deleted, a channel exception will be raised and the channel will
  790. be closed.
  791. */
  792. func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
  793. req := &queueDelete{
  794. Queue: name,
  795. IfUnused: ifUnused,
  796. IfEmpty: ifEmpty,
  797. NoWait: noWait,
  798. }
  799. res := &queueDeleteOk{}
  800. err := ch.call(req, res)
  801. return int(res.MessageCount), err
  802. }
  803. /*
  804. Consume immediately starts delivering queued messages.
  805. Begin receiving on the returned chan Delivery before any other operation on the
  806. Connection or Channel.
  807. Continues deliveries to the returned chan Delivery until Channel.Cancel,
  808. Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must
  809. range over the chan to ensure all deliveries are received. Unreceived
  810. deliveries will block all methods on the same connection.
  811. All deliveries in AMQP must be acknowledged. It is expected of the consumer to
  812. call Delivery.Ack after it has successfully processed the delivery. If the
  813. consumer is cancelled or the channel or connection is closed any unacknowledged
  814. deliveries will be requeued at the end of the same queue.
  815. The consumer is identified by a string that is unique and scoped for all
  816. consumers on this channel. If you wish to eventually cancel the consumer, use
  817. the same non-empty identifier in Channel.Cancel. An empty string will cause
  818. the library to generate a unique identity. The consumer identity will be
  819. included in every Delivery in the ConsumerTag field
  820. When autoAck (also known as noAck) is true, the server will acknowledge
  821. deliveries to this consumer prior to writing the delivery to the network. When
  822. autoAck is true, the consumer should not call Delivery.Ack. Automatically
  823. acknowledging deliveries means that some deliveries may get lost if the
  824. consumer is unable to process them after the server delivers them.
  825. See http://www.rabbitmq.com/confirms.html for more details.
  826. When exclusive is true, the server will ensure that this is the sole consumer
  827. from this queue. When exclusive is false, the server will fairly distribute
  828. deliveries across multiple consumers.
  829. The noLocal flag is not supported by RabbitMQ.
  830. It's advisable to use separate connections for
  831. Channel.Publish and Channel.Consume so not to have TCP pushback on publishing
  832. affect the ability to consume messages, so this parameter is here mostly for
  833. completeness.
  834. When noWait is true, do not wait for the server to confirm the request and
  835. immediately begin deliveries. If it is not possible to consume, a channel
  836. exception will be raised and the channel will be closed.
  837. Optional arguments can be provided that have specific semantics for the queue
  838. or server.
  839. Inflight messages, limited by Channel.Qos will be buffered until received from
  840. the returned chan.
  841. When the Channel or Connection is closed, all buffered and inflight messages will
  842. be dropped.
  843. When the consumer tag is cancelled, all inflight messages will be delivered until
  844. the returned chan is closed.
  845. */
  846. func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
  847. // When we return from ch.call, there may be a delivery already for the
  848. // consumer that hasn't been added to the consumer hash yet. Because of
  849. // this, we never rely on the server picking a consumer tag for us.
  850. if err := args.Validate(); err != nil {
  851. return nil, err
  852. }
  853. if consumer == "" {
  854. consumer = uniqueConsumerTag()
  855. }
  856. req := &basicConsume{
  857. Queue: queue,
  858. ConsumerTag: consumer,
  859. NoLocal: noLocal,
  860. NoAck: autoAck,
  861. Exclusive: exclusive,
  862. NoWait: noWait,
  863. Arguments: args,
  864. }
  865. res := &basicConsumeOk{}
  866. deliveries := make(chan Delivery)
  867. ch.consumers.add(consumer, deliveries)
  868. if err := ch.call(req, res); err != nil {
  869. ch.consumers.cancel(consumer)
  870. return nil, err
  871. }
  872. return (<-chan Delivery)(deliveries), nil
  873. }
  874. /*
  875. ExchangeDeclare declares an exchange on the server. If the exchange does not
  876. already exist, the server will create it. If the exchange exists, the server
  877. verifies that it is of the provided type, durability and auto-delete flags.
  878. Errors returned from this method will close the channel.
  879. Exchange names starting with "amq." are reserved for pre-declared and
  880. standardized exchanges. The client MAY declare an exchange starting with
  881. "amq." if the passive option is set, or the exchange already exists. Names can
  882. consist of a non-empty sequence of letters, digits, hyphen, underscore,
  883. period, or colon.
  884. Each exchange belongs to one of a set of exchange kinds/types implemented by
  885. the server. The exchange types define the functionality of the exchange - i.e.
  886. how messages are routed through it. Once an exchange is declared, its type
  887. cannot be changed. The common types are "direct", "fanout", "topic" and
  888. "headers".
  889. Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
  890. declared when there are no remaining bindings. This is the best lifetime for
  891. long-lived exchange configurations like stable routes and default exchanges.
  892. Non-Durable and Auto-Deleted exchanges will be deleted when there are no
  893. remaining bindings and not restored on server restart. This lifetime is
  894. useful for temporary topologies that should not pollute the virtual host on
  895. failure or after the consumers have completed.
  896. Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
  897. running including when there are no remaining bindings. This is useful for
  898. temporary topologies that may have long delays between bindings.
  899. Durable and Auto-Deleted exchanges will survive server restarts and will be
  900. removed before and after server restarts when there are no remaining bindings.
  901. These exchanges are useful for robust temporary topologies or when you require
  902. binding durable queues to auto-deleted exchanges.
  903. Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
  904. durable, so queues that bind to these pre-declared exchanges must also be
  905. durable.
  906. Exchanges declared as `internal` do not accept accept publishings. Internal
  907. exchanges are useful when you wish to implement inter-exchange topologies
  908. that should not be exposed to users of the broker.
  909. When noWait is true, declare without waiting for a confirmation from the server.
  910. The channel may be closed as a result of an error. Add a NotifyClose listener
  911. to respond to any exceptions.
  912. Optional amqp.Table of arguments that are specific to the server's implementation of
  913. the exchange can be sent for exchange types that require extra parameters.
  914. */
  915. func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
  916. if err := args.Validate(); err != nil {
  917. return err
  918. }
  919. return ch.call(
  920. &exchangeDeclare{
  921. Exchange: name,
  922. Type: kind,
  923. Passive: false,
  924. Durable: durable,
  925. AutoDelete: autoDelete,
  926. Internal: internal,
  927. NoWait: noWait,
  928. Arguments: args,
  929. },
  930. &exchangeDeclareOk{},
  931. )
  932. }
  933. /*
  934. ExchangeDeclarePassive is functionally and parametrically equivalent to
  935. ExchangeDeclare, except that it sets the "passive" attribute to true. A passive
  936. exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
  937. non-existent exchange will cause RabbitMQ to throw an exception. This function
  938. can be used to detect the existence of an exchange.
  939. */
  940. func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
  941. if err := args.Validate(); err != nil {
  942. return err
  943. }
  944. return ch.call(
  945. &exchangeDeclare{
  946. Exchange: name,
  947. Type: kind,
  948. Passive: true,
  949. Durable: durable,
  950. AutoDelete: autoDelete,
  951. Internal: internal,
  952. NoWait: noWait,
  953. Arguments: args,
  954. },
  955. &exchangeDeclareOk{},
  956. )
  957. }
  958. /*
  959. ExchangeDelete removes the named exchange from the server. When an exchange is
  960. deleted all queue bindings on the exchange are also deleted. If this exchange
  961. does not exist, the channel will be closed with an error.
  962. When ifUnused is true, the server will only delete the exchange if it has no queue
  963. bindings. If the exchange has queue bindings the server does not delete it
  964. but close the channel with an exception instead. Set this to true if you are
  965. not the sole owner of the exchange.
  966. When noWait is true, do not wait for a server confirmation that the exchange has
  967. been deleted. Failing to delete the channel could close the channel. Add a
  968. NotifyClose listener to respond to these channel exceptions.
  969. */
  970. func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error {
  971. return ch.call(
  972. &exchangeDelete{
  973. Exchange: name,
  974. IfUnused: ifUnused,
  975. NoWait: noWait,
  976. },
  977. &exchangeDeleteOk{},
  978. )
  979. }
  980. /*
  981. ExchangeBind binds an exchange to another exchange to create inter-exchange
  982. routing topologies on the server. This can decouple the private topology and
  983. routing exchanges from exchanges intended solely for publishing endpoints.
  984. Binding two exchanges with identical arguments will not create duplicate
  985. bindings.
  986. Binding one exchange to another with multiple bindings will only deliver a
  987. message once. For example if you bind your exchange to `amq.fanout` with two
  988. different binding keys, only a single message will be delivered to your
  989. exchange even though multiple bindings will match.
  990. Given a message delivered to the source exchange, the message will be forwarded
  991. to the destination exchange when the routing key is matched.
  992. ExchangeBind("sell", "MSFT", "trade", false, nil)
  993. ExchangeBind("buy", "AAPL", "trade", false, nil)
  994. Delivery Source Key Destination
  995. example exchange exchange
  996. -----------------------------------------------
  997. key: AAPL --> trade ----> MSFT sell
  998. \---> AAPL --> buy
  999. When noWait is true, do not wait for the server to confirm the binding. If any
  1000. error occurs the channel will be closed. Add a listener to NotifyClose to
  1001. handle these errors.
  1002. Optional arguments specific to the exchanges bound can also be specified.
  1003. */
  1004. func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error {
  1005. if err := args.Validate(); err != nil {
  1006. return err
  1007. }
  1008. return ch.call(
  1009. &exchangeBind{
  1010. Destination: destination,
  1011. Source: source,
  1012. RoutingKey: key,
  1013. NoWait: noWait,
  1014. Arguments: args,
  1015. },
  1016. &exchangeBindOk{},
  1017. )
  1018. }
  1019. /*
  1020. ExchangeUnbind unbinds the destination exchange from the source exchange on the
  1021. server by removing the routing key between them. This is the inverse of
  1022. ExchangeBind. If the binding does not currently exist, an error will be
  1023. returned.
  1024. When noWait is true, do not wait for the server to confirm the deletion of the
  1025. binding. If any error occurs the channel will be closed. Add a listener to
  1026. NotifyClose to handle these errors.
  1027. Optional arguments that are specific to the type of exchanges bound can also be
  1028. provided. These must match the same arguments specified in ExchangeBind to
  1029. identify the binding.
  1030. */
  1031. func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error {
  1032. if err := args.Validate(); err != nil {
  1033. return err
  1034. }
  1035. return ch.call(
  1036. &exchangeUnbind{
  1037. Destination: destination,
  1038. Source: source,
  1039. RoutingKey: key,
  1040. NoWait: noWait,
  1041. Arguments: args,
  1042. },
  1043. &exchangeUnbindOk{},
  1044. )
  1045. }
  1046. /*
  1047. Publish sends a Publishing from the client to an exchange on the server.
  1048. When you want a single message to be delivered to a single queue, you can
  1049. publish to the default exchange with the routingKey of the queue name. This is
  1050. because every declared queue gets an implicit route to the default exchange.
  1051. Since publishings are asynchronous, any undeliverable message will get returned
  1052. by the server. Add a listener with Channel.NotifyReturn to handle any
  1053. undeliverable message when calling publish with either the mandatory or
  1054. immediate parameters as true.
  1055. Publishings can be undeliverable when the mandatory flag is true and no queue is
  1056. bound that matches the routing key, or when the immediate flag is true and no
  1057. consumer on the matched queue is ready to accept the delivery.
  1058. This can return an error when the channel, connection or socket is closed. The
  1059. error or lack of an error does not indicate whether the server has received this
  1060. publishing.
  1061. It is possible for publishing to not reach the broker if the underlying socket
  1062. is shut down without pending publishing packets being flushed from the kernel
  1063. buffers. The easy way of making it probable that all publishings reach the
  1064. server is to always call Connection.Close before terminating your publishing
  1065. application. The way to ensure that all publishings reach the server is to add
  1066. a listener to Channel.NotifyPublish and put the channel in confirm mode with
  1067. Channel.Confirm. Publishing delivery tags and their corresponding
  1068. confirmations start at 1. Exit when all publishings are confirmed.
  1069. When Publish does not return an error and the channel is in confirm mode, the
  1070. internal counter for DeliveryTags with the first confirmation starts at 1.
  1071. */
  1072. func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
  1073. if err := msg.Headers.Validate(); err != nil {
  1074. return err
  1075. }
  1076. ch.m.Lock()
  1077. defer ch.m.Unlock()
  1078. if err := ch.send(&basicPublish{
  1079. Exchange: exchange,
  1080. RoutingKey: key,
  1081. Mandatory: mandatory,
  1082. Immediate: immediate,
  1083. Body: msg.Body,
  1084. Properties: properties{
  1085. Headers: msg.Headers,
  1086. ContentType: msg.ContentType,
  1087. ContentEncoding: msg.ContentEncoding,
  1088. DeliveryMode: msg.DeliveryMode,
  1089. Priority: msg.Priority,
  1090. CorrelationId: msg.CorrelationId,
  1091. ReplyTo: msg.ReplyTo,
  1092. Expiration: msg.Expiration,
  1093. MessageId: msg.MessageId,
  1094. Timestamp: msg.Timestamp,
  1095. Type: msg.Type,
  1096. UserId: msg.UserId,
  1097. AppId: msg.AppId,
  1098. },
  1099. }); err != nil {
  1100. return err
  1101. }
  1102. if ch.confirming {
  1103. ch.confirms.Publish()
  1104. }
  1105. return nil
  1106. }
  1107. /*
  1108. Get synchronously receives a single Delivery from the head of a queue from the
  1109. server to the client. In almost all cases, using Channel.Consume will be
  1110. preferred.
  1111. If there was a delivery waiting on the queue and that delivery was received, the
  1112. second return value will be true. If there was no delivery waiting or an error
  1113. occurred, the ok bool will be false.
  1114. All deliveries must be acknowledged including those from Channel.Get. Call
  1115. Delivery.Ack on the returned delivery when you have fully processed this
  1116. delivery.
  1117. When autoAck is true, the server will automatically acknowledge this message so
  1118. you don't have to. But if you are unable to fully process this message before
  1119. the channel or connection is closed, the message will not get requeued.
  1120. */
  1121. func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) {
  1122. req := &basicGet{Queue: queue, NoAck: autoAck}
  1123. res := &basicGetOk{}
  1124. empty := &basicGetEmpty{}
  1125. if err := ch.call(req, res, empty); err != nil {
  1126. return Delivery{}, false, err
  1127. }
  1128. if res.DeliveryTag > 0 {
  1129. return *(newDelivery(ch, res)), true, nil
  1130. }
  1131. return Delivery{}, false, nil
  1132. }
  1133. /*
  1134. Tx puts the channel into transaction mode on the server. All publishings and
  1135. acknowledgments following this method will be atomically committed or rolled
  1136. back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to
  1137. leave a this transaction and immediately start a new transaction.
  1138. The atomicity across multiple queues is not defined as queue declarations and
  1139. bindings are not included in the transaction.
  1140. The behavior of publishings that are delivered as mandatory or immediate while
  1141. the channel is in a transaction is not defined.
  1142. Once a channel has been put into transaction mode, it cannot be taken out of
  1143. transaction mode. Use a different channel for non-transactional semantics.
  1144. */
  1145. func (ch *Channel) Tx() error {
  1146. return ch.call(
  1147. &txSelect{},
  1148. &txSelectOk{},
  1149. )
  1150. }
  1151. /*
  1152. TxCommit atomically commits all publishings and acknowledgments for a single
  1153. queue and immediately start a new transaction.
  1154. Calling this method without having called Channel.Tx is an error.
  1155. */
  1156. func (ch *Channel) TxCommit() error {
  1157. return ch.call(
  1158. &txCommit{},
  1159. &txCommitOk{},
  1160. )
  1161. }
  1162. /*
  1163. TxRollback atomically rolls back all publishings and acknowledgments for a
  1164. single queue and immediately start a new transaction.
  1165. Calling this method without having called Channel.Tx is an error.
  1166. */
  1167. func (ch *Channel) TxRollback() error {
  1168. return ch.call(
  1169. &txRollback{},
  1170. &txRollbackOk{},
  1171. )
  1172. }
  1173. /*
  1174. Flow pauses the delivery of messages to consumers on this channel. Channels
  1175. are opened with flow control active, to open a channel with paused
  1176. deliveries immediately call this method with `false` after calling
  1177. Connection.Channel.
  1178. When active is `false`, this method asks the server to temporarily pause deliveries
  1179. until called again with active as `true`.
  1180. Channel.Get methods will not be affected by flow control.
  1181. This method is not intended to act as window control. Use Channel.Qos to limit
  1182. the number of unacknowledged messages or bytes in flight instead.
  1183. The server may also send us flow methods to throttle our publishings. A well
  1184. behaving publishing client should add a listener with Channel.NotifyFlow and
  1185. pause its publishings when `false` is sent on that channel.
  1186. Note: RabbitMQ prefers to use TCP push back to control flow for all channels on
  1187. a connection, so under high volume scenarios, it's wise to open separate
  1188. Connections for publishings and deliveries.
  1189. */
  1190. func (ch *Channel) Flow(active bool) error {
  1191. return ch.call(
  1192. &channelFlow{Active: active},
  1193. &channelFlowOk{},
  1194. )
  1195. }
  1196. /*
  1197. Confirm puts this channel into confirm mode so that the client can ensure all
  1198. publishings have successfully been received by the server. After entering this
  1199. mode, the server will send a basic.ack or basic.nack message with the deliver
  1200. tag set to a 1 based incremental index corresponding to every publishing
  1201. received after the this method returns.
  1202. Add a listener to Channel.NotifyPublish to respond to the Confirmations. If
  1203. Channel.NotifyPublish is not called, the Confirmations will be silently
  1204. ignored.
  1205. The order of acknowledgments is not bound to the order of deliveries.
  1206. Ack and Nack confirmations will arrive at some point in the future.
  1207. Unroutable mandatory or immediate messages are acknowledged immediately after
  1208. any Channel.NotifyReturn listeners have been notified. Other messages are
  1209. acknowledged when all queues that should have the message routed to them have
  1210. either received acknowledgment of delivery or have enqueued the message,
  1211. persisting the message if necessary.
  1212. When noWait is true, the client will not wait for a response. A channel
  1213. exception could occur if the server does not support this method.
  1214. */
  1215. func (ch *Channel) Confirm(noWait bool) error {
  1216. if err := ch.call(
  1217. &confirmSelect{Nowait: noWait},
  1218. &confirmSelectOk{},
  1219. ); err != nil {
  1220. return err
  1221. }
  1222. ch.confirmM.Lock()
  1223. ch.confirming = true
  1224. ch.confirmM.Unlock()
  1225. return nil
  1226. }
  1227. /*
  1228. Recover redelivers all unacknowledged deliveries on this channel.
  1229. When requeue is false, messages will be redelivered to the original consumer.
  1230. When requeue is true, messages will be redelivered to any available consumer,
  1231. potentially including the original.
  1232. If the deliveries cannot be recovered, an error will be returned and the channel
  1233. will be closed.
  1234. Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead
  1235. */
  1236. func (ch *Channel) Recover(requeue bool) error {
  1237. return ch.call(
  1238. &basicRecover{Requeue: requeue},
  1239. &basicRecoverOk{},
  1240. )
  1241. }
  1242. /*
  1243. Ack acknowledges a delivery by its delivery tag when having been consumed with
  1244. Channel.Consume or Channel.Get.
  1245. Ack acknowledges all message received prior to the delivery tag when multiple
  1246. is true.
  1247. See also Delivery.Ack
  1248. */
  1249. func (ch *Channel) Ack(tag uint64, multiple bool) error {
  1250. ch.m.Lock()
  1251. defer ch.m.Unlock()
  1252. return ch.send(&basicAck{
  1253. DeliveryTag: tag,
  1254. Multiple: multiple,
  1255. })
  1256. }
  1257. /*
  1258. Nack negatively acknowledges a delivery by its delivery tag. Prefer this
  1259. method to notify the server that you were not able to process this delivery and
  1260. it must be redelivered or dropped.
  1261. See also Delivery.Nack
  1262. */
  1263. func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
  1264. ch.m.Lock()
  1265. defer ch.m.Unlock()
  1266. return ch.send(&basicNack{
  1267. DeliveryTag: tag,
  1268. Multiple: multiple,
  1269. Requeue: requeue,
  1270. })
  1271. }
  1272. /*
  1273. Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack
  1274. over Reject when communicating with a RabbitMQ server because you can Nack
  1275. multiple messages, reducing the amount of protocol messages to exchange.
  1276. See also Delivery.Reject
  1277. */
  1278. func (ch *Channel) Reject(tag uint64, requeue bool) error {
  1279. ch.m.Lock()
  1280. defer ch.m.Unlock()
  1281. return ch.send(&basicReject{
  1282. DeliveryTag: tag,
  1283. Requeue: requeue,
  1284. })
  1285. }