context.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // Copyright 2016-2022 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "context"
  16. "reflect"
  17. )
  18. // RequestMsgWithContext takes a context, a subject and payload
  19. // in bytes and request expecting a single response.
  20. func (nc *Conn) RequestMsgWithContext(ctx context.Context, msg *Msg) (*Msg, error) {
  21. if msg == nil {
  22. return nil, ErrInvalidMsg
  23. }
  24. hdr, err := msg.headerBytes()
  25. if err != nil {
  26. return nil, err
  27. }
  28. return nc.requestWithContext(ctx, msg.Subject, hdr, msg.Data)
  29. }
  30. // RequestWithContext takes a context, a subject and payload
  31. // in bytes and request expecting a single response.
  32. func (nc *Conn) RequestWithContext(ctx context.Context, subj string, data []byte) (*Msg, error) {
  33. return nc.requestWithContext(ctx, subj, nil, data)
  34. }
  35. func (nc *Conn) requestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
  36. if ctx == nil {
  37. return nil, ErrInvalidContext
  38. }
  39. if nc == nil {
  40. return nil, ErrInvalidConnection
  41. }
  42. // Check whether the context is done already before making
  43. // the request.
  44. if ctx.Err() != nil {
  45. return nil, ctx.Err()
  46. }
  47. var m *Msg
  48. var err error
  49. // If user wants the old style.
  50. if nc.useOldRequestStyle() {
  51. m, err = nc.oldRequestWithContext(ctx, subj, hdr, data)
  52. } else {
  53. mch, token, err := nc.createNewRequestAndSend(subj, hdr, data)
  54. if err != nil {
  55. return nil, err
  56. }
  57. var ok bool
  58. select {
  59. case m, ok = <-mch:
  60. if !ok {
  61. return nil, ErrConnectionClosed
  62. }
  63. case <-ctx.Done():
  64. nc.mu.Lock()
  65. delete(nc.respMap, token)
  66. nc.mu.Unlock()
  67. return nil, ctx.Err()
  68. }
  69. }
  70. // Check for no responder status.
  71. if err == nil && len(m.Data) == 0 && m.Header.Get(statusHdr) == noResponders {
  72. m, err = nil, ErrNoResponders
  73. }
  74. return m, err
  75. }
  76. // oldRequestWithContext utilizes inbox and subscription per request.
  77. func (nc *Conn) oldRequestWithContext(ctx context.Context, subj string, hdr, data []byte) (*Msg, error) {
  78. inbox := nc.NewInbox()
  79. ch := make(chan *Msg, RequestChanLen)
  80. s, err := nc.subscribe(inbox, _EMPTY_, nil, ch, true, nil)
  81. if err != nil {
  82. return nil, err
  83. }
  84. s.AutoUnsubscribe(1)
  85. defer s.Unsubscribe()
  86. err = nc.publish(subj, inbox, hdr, data)
  87. if err != nil {
  88. return nil, err
  89. }
  90. return s.NextMsgWithContext(ctx)
  91. }
  92. func (s *Subscription) nextMsgWithContext(ctx context.Context, pullSubInternal, waitIfNoMsg bool) (*Msg, error) {
  93. if ctx == nil {
  94. return nil, ErrInvalidContext
  95. }
  96. if s == nil {
  97. return nil, ErrBadSubscription
  98. }
  99. if ctx.Err() != nil {
  100. return nil, ctx.Err()
  101. }
  102. s.mu.Lock()
  103. err := s.validateNextMsgState(pullSubInternal)
  104. if err != nil {
  105. s.mu.Unlock()
  106. return nil, err
  107. }
  108. // snapshot
  109. mch := s.mch
  110. s.mu.Unlock()
  111. var ok bool
  112. var msg *Msg
  113. // If something is available right away, let's optimize that case.
  114. select {
  115. case msg, ok = <-mch:
  116. if !ok {
  117. return nil, s.getNextMsgErr()
  118. }
  119. if err := s.processNextMsgDelivered(msg); err != nil {
  120. return nil, err
  121. }
  122. return msg, nil
  123. default:
  124. // If internal and we don't want to wait, signal that there is no
  125. // message in the internal queue.
  126. if pullSubInternal && !waitIfNoMsg {
  127. return nil, errNoMessages
  128. }
  129. }
  130. select {
  131. case msg, ok = <-mch:
  132. if !ok {
  133. return nil, s.getNextMsgErr()
  134. }
  135. if err := s.processNextMsgDelivered(msg); err != nil {
  136. return nil, err
  137. }
  138. case <-ctx.Done():
  139. return nil, ctx.Err()
  140. }
  141. return msg, nil
  142. }
  143. // NextMsgWithContext takes a context and returns the next message
  144. // available to a synchronous subscriber, blocking until it is delivered
  145. // or context gets canceled.
  146. func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
  147. return s.nextMsgWithContext(ctx, false, true)
  148. }
  149. // FlushWithContext will allow a context to control the duration
  150. // of a Flush() call. This context should be non-nil and should
  151. // have a deadline set. We will return an error if none is present.
  152. func (nc *Conn) FlushWithContext(ctx context.Context) error {
  153. if nc == nil {
  154. return ErrInvalidConnection
  155. }
  156. if ctx == nil {
  157. return ErrInvalidContext
  158. }
  159. _, ok := ctx.Deadline()
  160. if !ok {
  161. return ErrNoDeadlineContext
  162. }
  163. nc.mu.Lock()
  164. if nc.isClosed() {
  165. nc.mu.Unlock()
  166. return ErrConnectionClosed
  167. }
  168. // Create a buffered channel to prevent chan send to block
  169. // in processPong()
  170. ch := make(chan struct{}, 1)
  171. nc.sendPing(ch)
  172. nc.mu.Unlock()
  173. var err error
  174. select {
  175. case _, ok := <-ch:
  176. if !ok {
  177. err = ErrConnectionClosed
  178. } else {
  179. close(ch)
  180. }
  181. case <-ctx.Done():
  182. err = ctx.Err()
  183. }
  184. if err != nil {
  185. nc.removeFlushEntry(ch)
  186. }
  187. return err
  188. }
  189. // RequestWithContext will create an Inbox and perform a Request
  190. // using the provided cancellation context with the Inbox reply
  191. // for the data v. A response will be decoded into the vPtr last parameter.
  192. func (c *EncodedConn) RequestWithContext(ctx context.Context, subject string, v any, vPtr any) error {
  193. if ctx == nil {
  194. return ErrInvalidContext
  195. }
  196. b, err := c.Enc.Encode(subject, v)
  197. if err != nil {
  198. return err
  199. }
  200. m, err := c.Conn.RequestWithContext(ctx, subject, b)
  201. if err != nil {
  202. return err
  203. }
  204. if reflect.TypeOf(vPtr) == emptyMsgType {
  205. mPtr := vPtr.(*Msg)
  206. *mPtr = *m
  207. } else {
  208. err := c.Enc.Decode(m.Subject, m.Data, vPtr)
  209. if err != nil {
  210. return err
  211. }
  212. }
  213. return nil
  214. }