stream.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. package radix
  2. import (
  3. "bufio"
  4. "bytes"
  5. "fmt"
  6. "io"
  7. "math"
  8. "strconv"
  9. "time"
  10. "errors"
  11. "github.com/mediocregopher/radix/v3/internal/bytesutil"
  12. "github.com/mediocregopher/radix/v3/resp"
  13. "github.com/mediocregopher/radix/v3/resp/resp2"
  14. )
  15. // StreamEntryID represents an ID used in a Redis stream with the format <time>-<seq>.
  16. type StreamEntryID struct {
  17. // Time is the first part of the ID, which is based on the time of the server that Redis runs on.
  18. Time uint64
  19. // Seq is the sequence number of the ID for entries with the same Time value.
  20. Seq uint64
  21. }
  22. // Before returns true if s comes before o in a stream (is less than o).
  23. func (s StreamEntryID) Before(o StreamEntryID) bool {
  24. if s.Time != o.Time {
  25. return s.Time < o.Time
  26. }
  27. return s.Seq < o.Seq
  28. }
  29. // Prev returns the previous stream entry ID or s if there is no prior id (s is 0-0).
  30. func (s StreamEntryID) Prev() StreamEntryID {
  31. if s.Seq > 0 {
  32. s.Seq--
  33. return s
  34. }
  35. if s.Time > 0 {
  36. s.Time--
  37. s.Seq = math.MaxUint64
  38. return s
  39. }
  40. return s
  41. }
  42. // Next returns the next stream entry ID or s if there is no higher id (s is 18446744073709551615-18446744073709551615).
  43. func (s StreamEntryID) Next() StreamEntryID {
  44. if s.Seq < math.MaxUint64 {
  45. s.Seq++
  46. return s
  47. }
  48. if s.Time < math.MaxUint64 {
  49. s.Time++
  50. s.Seq = 0
  51. return s
  52. }
  53. return s
  54. }
  55. var _ resp.Marshaler = (*StreamEntryID)(nil)
  56. var _ resp.Unmarshaler = (*StreamEntryID)(nil)
  57. var maxUint64Len = len(strconv.FormatUint(math.MaxUint64, 10))
  58. func (s *StreamEntryID) bytes() []byte {
  59. b := make([]byte, 0, maxUint64Len*2+1)
  60. b = strconv.AppendUint(b, s.Time, 10)
  61. b = append(b, '-')
  62. b = strconv.AppendUint(b, s.Seq, 10)
  63. return b
  64. }
  65. // MarshalRESP implements the resp.Marshaler interface.
  66. func (s *StreamEntryID) MarshalRESP(w io.Writer) error {
  67. return resp2.BulkStringBytes{B: s.bytes()}.MarshalRESP(w)
  68. }
  69. var errInvalidStreamID = errors.New("invalid stream entry id")
  70. // UnmarshalRESP implements the resp.Unmarshaler interface.
  71. func (s *StreamEntryID) UnmarshalRESP(br *bufio.Reader) error {
  72. buf := bytesutil.GetBytes()
  73. defer bytesutil.PutBytes(buf)
  74. bsb := resp2.BulkStringBytes{B: (*buf)[:0]}
  75. if err := bsb.UnmarshalRESP(br); err != nil {
  76. return err
  77. }
  78. split := bytes.IndexByte(bsb.B, '-')
  79. if split == -1 {
  80. return errInvalidStreamID
  81. }
  82. time, err := bytesutil.ParseUint(bsb.B[:split])
  83. if err != nil {
  84. return errInvalidStreamID
  85. }
  86. seq, err := bytesutil.ParseUint(bsb.B[split+1:])
  87. if err != nil {
  88. return errInvalidStreamID
  89. }
  90. s.Time, s.Seq = time, seq
  91. return nil
  92. }
  93. var _ fmt.Stringer = (*StreamEntryID)(nil)
  94. // String returns the ID in the format <time>-<seq> (the same format used by
  95. // Redis).
  96. //
  97. // String implements the fmt.Stringer interface.
  98. func (s StreamEntryID) String() string {
  99. return string(s.bytes())
  100. }
  101. // StreamEntry is an entry in a stream as returned by XRANGE, XREAD and
  102. // XREADGROUP.
  103. type StreamEntry struct {
  104. // ID is the ID of the entry in a stream.
  105. ID StreamEntryID
  106. // Fields contains the fields and values for the stream entry.
  107. Fields map[string]string
  108. }
  109. var _ resp.Unmarshaler = (*StreamEntry)(nil)
  110. var errInvalidStreamEntry = errors.New("invalid stream entry")
  111. // UnmarshalRESP implements the resp.Unmarshaler interface.
  112. func (s *StreamEntry) UnmarshalRESP(br *bufio.Reader) error {
  113. var ah resp2.ArrayHeader
  114. if err := ah.UnmarshalRESP(br); err != nil {
  115. return err
  116. } else if ah.N != 2 {
  117. return errInvalidStreamEntry
  118. } else if err := s.ID.UnmarshalRESP(br); err != nil {
  119. return err
  120. }
  121. // put this here in case the array has size -1
  122. for k := range s.Fields {
  123. delete(s.Fields, k)
  124. }
  125. // resp2.Any{I: &s.Fields}.UnmarshalRESP(br)
  126. if err := ah.UnmarshalRESP(br); err != nil {
  127. return err
  128. } else if ah.N == -1 {
  129. return nil
  130. } else if ah.N%2 != 0 {
  131. return errInvalidStreamEntry
  132. } else if s.Fields == nil {
  133. s.Fields = make(map[string]string, ah.N/2)
  134. }
  135. var bs resp2.BulkString
  136. for i := 0; i < ah.N; i += 2 {
  137. if err := bs.UnmarshalRESP(br); err != nil {
  138. return err
  139. }
  140. key := bs.S
  141. if err := bs.UnmarshalRESP(br); err != nil {
  142. return err
  143. }
  144. s.Fields[key] = bs.S
  145. }
  146. return nil
  147. }
  148. // StreamEntries is a stream name and set of entries as returned by XREAD and
  149. // XREADGROUP. The results from a call to XREAD(GROUP) can be unmarshaled into a
  150. // []StreamEntries.
  151. type StreamEntries struct {
  152. Stream string
  153. Entries []StreamEntry
  154. }
  155. // UnmarshalRESP implements the resp.Unmarshaler interface.
  156. func (s *StreamEntries) UnmarshalRESP(br *bufio.Reader) error {
  157. var ah resp2.ArrayHeader
  158. if err := ah.UnmarshalRESP(br); err != nil {
  159. return err
  160. } else if ah.N != 2 {
  161. return errors.New("invalid xread[group] response")
  162. }
  163. var stream resp2.BulkString
  164. if err := stream.UnmarshalRESP(br); err != nil {
  165. return err
  166. }
  167. s.Stream = stream.S
  168. if err := ah.UnmarshalRESP(br); err != nil {
  169. return err
  170. }
  171. s.Entries = make([]StreamEntry, ah.N)
  172. for i := range s.Entries {
  173. if err := s.Entries[i].UnmarshalRESP(br); err != nil {
  174. return err
  175. }
  176. }
  177. return nil
  178. }
  179. // StreamReaderOpts contains various options given for NewStreamReader that influence the behaviour.
  180. //
  181. // The only required field is Streams.
  182. type StreamReaderOpts struct {
  183. // Streams must contain one or more stream names that will be read.
  184. //
  185. // The value for each stream can either be nil or an existing ID.
  186. // If a value is non-nil, only newer stream entries will be returned.
  187. Streams map[string]*StreamEntryID
  188. // FallbackToUndelivered will cause any streams in with a non-nil value in Streams to fallback
  189. // to delivering messages not-yet-delivered to other consumers (as if the value in the Streams map was nil),
  190. // once the reader has read all its pending messages in the stream.
  191. // This must be used in conjunction with Group.
  192. FallbackToUndelivered bool
  193. // Group is an optional consumer group name.
  194. //
  195. // If Group is not empty reads will use XREADGROUP with the Group as consumer group instead of XREAD.
  196. Group string
  197. // Consumer is an optional consumer name for use with Group.
  198. Consumer string
  199. // NoAck optionally enables passing the NOACK flag to XREADGROUP.
  200. NoAck bool
  201. // Block specifies the duration in milliseconds that reads will wait for new data before returning.
  202. //
  203. // If Block is negative, reads will block indefinitely until new entries can be read or there is an error.
  204. //
  205. // The default, if Block is 0, is 5 seconds.
  206. //
  207. // If Block is non-negative, the Client used for the StreamReader must not have a timeout for commands or
  208. // the timeout duration must be substantial higher than the Block duration (at least 50% for small Block values,
  209. // but may be less for higher values).
  210. Block time.Duration
  211. // NoBlock disables blocking when no new data is available.
  212. //
  213. // If this is true, setting Block will not have any effect.
  214. NoBlock bool
  215. // Count can be used to limit the number of entries retrieved by each call to Next.
  216. //
  217. // If Count is 0, all available entries will be retrieved.
  218. Count int
  219. }
  220. // StreamReader allows reading from on or more streams, always returning newer
  221. // entries.
  222. type StreamReader interface {
  223. // Err returns any error that happened while calling Next or nil if no error
  224. // happened.
  225. //
  226. // Once Err returns a non-nil error, all successive calls will return the
  227. // same error.
  228. Err() error
  229. // Next returns new entries for any of the configured streams.
  230. //
  231. // The returned slice is only valid until the next call to Next.
  232. //
  233. // If there was an error, ok will be false. Otherwise, even if no entries
  234. // were read, ok will be true.
  235. //
  236. // If there was an error, all future calls to Next will return ok == false.
  237. Next() (stream string, entries []StreamEntry, ok bool)
  238. }
  239. // NewStreamReader returns a new StreamReader for the given client.
  240. //
  241. // Any changes on opts after calling NewStreamReader will have no effect.
  242. func NewStreamReader(c Client, opts StreamReaderOpts) StreamReader {
  243. sr := &streamReader{c: c, opts: opts}
  244. if sr.opts.Group != "" {
  245. sr.cmd = "XREADGROUP"
  246. sr.fixedArgs = []string{"GROUP", sr.opts.Group, sr.opts.Consumer}
  247. sr.fallbackToUndelivered = opts.FallbackToUndelivered
  248. } else {
  249. sr.cmd = "XREAD"
  250. sr.fixedArgs = nil
  251. }
  252. if sr.opts.Count > 0 {
  253. sr.fixedArgs = append(sr.fixedArgs, "COUNT", strconv.Itoa(sr.opts.Count))
  254. }
  255. if !sr.opts.NoBlock {
  256. dur := 5 * time.Second
  257. if sr.opts.Block < 0 {
  258. dur = 0
  259. } else if sr.opts.Block > 0 {
  260. dur = sr.opts.Block
  261. }
  262. msec := int(dur / time.Millisecond)
  263. sr.fixedArgs = append(sr.fixedArgs, "BLOCK", strconv.Itoa(msec))
  264. }
  265. if sr.opts.Group != "" && sr.opts.NoAck {
  266. sr.fixedArgs = append(sr.fixedArgs, "NOACK")
  267. }
  268. sr.streams = make([]string, 0, len(sr.opts.Streams))
  269. sr.ids = make(map[string]string, len(sr.opts.Streams))
  270. for stream, id := range sr.opts.Streams {
  271. sr.streams = append(sr.streams, stream)
  272. if id != nil {
  273. sr.ids[stream] = id.String()
  274. } else if sr.cmd == "XREAD" {
  275. sr.ids[stream] = "$"
  276. } else if sr.cmd == "XREADGROUP" {
  277. sr.ids[stream] = ">"
  278. }
  279. }
  280. // set to nil so we don't accidentally use it later, since the user could have changed
  281. // the map after using the reader.
  282. sr.opts.Streams = nil
  283. sr.fixedArgs = append(sr.fixedArgs, "STREAMS")
  284. sr.fixedArgs = append(sr.fixedArgs, sr.streams...)
  285. // preallocate space for all arguments passed to Cmd
  286. sr.args = make([]string, 0, len(sr.fixedArgs)+len(sr.streams))
  287. return sr
  288. }
  289. // streamReader implements the StreamReader interface.
  290. type streamReader struct {
  291. c Client
  292. opts StreamReaderOpts // copy of the options given to NewStreamReader with Streams == nil
  293. streams []string
  294. ids map[string]string
  295. fallbackToUndelivered bool
  296. cmd string // command. either XREAD or XREADGROUP
  297. fixedArgs []string // fixed arguments that always come directly after the command
  298. args []string // arguments passed to Cmd. reused between calls to Next to avoid allocations.
  299. unread []StreamEntries
  300. err error
  301. }
  302. func (sr *streamReader) backfill() bool {
  303. sr.args = append(sr.args[:0], sr.fixedArgs...)
  304. for _, s := range sr.streams {
  305. sr.args = append(sr.args, sr.ids[s])
  306. }
  307. if sr.err = sr.c.Do(Cmd(&sr.unread, sr.cmd, sr.args...)); sr.err != nil {
  308. return false
  309. }
  310. return true
  311. }
  312. // Err implements the StreamReader interface.
  313. func (sr *streamReader) Err() error {
  314. return sr.err
  315. }
  316. func (sr *streamReader) nextFromBuffer() (stream string, entries []StreamEntry) {
  317. for len(sr.unread) > 0 {
  318. sre := sr.unread[len(sr.unread)-1]
  319. sr.unread = sr.unread[:len(sr.unread)-1]
  320. stream = sre.Stream
  321. // entries can be empty if we are using XREADGROUP and reading unacknowledged entries.
  322. if len(sre.Entries) == 0 {
  323. if sr.fallbackToUndelivered && sr.cmd == "XREADGROUP" && sr.ids[stream] != ">" {
  324. sr.ids[stream] = ">"
  325. }
  326. continue
  327. }
  328. // do not update the ID for XREADGROUP when we are not reading unacknowledged entries.
  329. if sr.cmd == "XREAD" || (sr.cmd == "XREADGROUP" && sr.ids[stream] != ">") {
  330. sr.ids[stream] = sre.Entries[len(sre.Entries)-1].ID.String()
  331. }
  332. return stream, sre.Entries
  333. }
  334. return "", nil
  335. }
  336. // Next implements the StreamReader interface.
  337. func (sr *streamReader) Next() (stream string, entries []StreamEntry, ok bool) {
  338. if sr.err != nil {
  339. return "", nil, false
  340. }
  341. if stream, entries = sr.nextFromBuffer(); stream != "" {
  342. return stream, entries, true
  343. }
  344. if !sr.backfill() {
  345. return "", nil, false
  346. }
  347. if stream, entries = sr.nextFromBuffer(); stream != "" {
  348. return stream, entries, true
  349. }
  350. return "", nil, true
  351. }