pipeline.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package redis
  2. import (
  3. "sync"
  4. "github.com/go-redis/redis/internal/pool"
  5. )
  6. type pipelineExecer func([]Cmder) error
  7. // Pipeliner is an mechanism to realise Redis Pipeline technique.
  8. //
  9. // Pipelining is a technique to extremely speed up processing by packing
  10. // operations to batches, send them at once to Redis and read a replies in a
  11. // singe step.
  12. // See https://redis.io/topics/pipelining
  13. //
  14. // Pay attention, that Pipeline is not a transaction, so you can get unexpected
  15. // results in case of big pipelines and small read/write timeouts.
  16. // Redis client has retransmission logic in case of timeouts, pipeline
  17. // can be retransmitted and commands can be executed more then once.
  18. // To avoid this: it is good idea to use reasonable bigger read/write timeouts
  19. // depends of your batch size and/or use TxPipeline.
  20. type Pipeliner interface {
  21. StatefulCmdable
  22. Do(args ...interface{}) *Cmd
  23. Process(cmd Cmder) error
  24. Close() error
  25. Discard() error
  26. Exec() ([]Cmder, error)
  27. }
  28. var _ Pipeliner = (*Pipeline)(nil)
  29. // Pipeline implements pipelining as described in
  30. // http://redis.io/topics/pipelining. It's safe for concurrent use
  31. // by multiple goroutines.
  32. type Pipeline struct {
  33. statefulCmdable
  34. exec pipelineExecer
  35. mu sync.Mutex
  36. cmds []Cmder
  37. closed bool
  38. }
  39. func (c *Pipeline) Do(args ...interface{}) *Cmd {
  40. cmd := NewCmd(args...)
  41. _ = c.Process(cmd)
  42. return cmd
  43. }
  44. // Process queues the cmd for later execution.
  45. func (c *Pipeline) Process(cmd Cmder) error {
  46. c.mu.Lock()
  47. c.cmds = append(c.cmds, cmd)
  48. c.mu.Unlock()
  49. return nil
  50. }
  51. // Close closes the pipeline, releasing any open resources.
  52. func (c *Pipeline) Close() error {
  53. c.mu.Lock()
  54. c.discard()
  55. c.closed = true
  56. c.mu.Unlock()
  57. return nil
  58. }
  59. // Discard resets the pipeline and discards queued commands.
  60. func (c *Pipeline) Discard() error {
  61. c.mu.Lock()
  62. err := c.discard()
  63. c.mu.Unlock()
  64. return err
  65. }
  66. func (c *Pipeline) discard() error {
  67. if c.closed {
  68. return pool.ErrClosed
  69. }
  70. c.cmds = c.cmds[:0]
  71. return nil
  72. }
  73. // Exec executes all previously queued commands using one
  74. // client-server roundtrip.
  75. //
  76. // Exec always returns list of commands and error of the first failed
  77. // command if any.
  78. func (c *Pipeline) Exec() ([]Cmder, error) {
  79. c.mu.Lock()
  80. defer c.mu.Unlock()
  81. if c.closed {
  82. return nil, pool.ErrClosed
  83. }
  84. if len(c.cmds) == 0 {
  85. return nil, nil
  86. }
  87. cmds := c.cmds
  88. c.cmds = nil
  89. return cmds, c.exec(cmds)
  90. }
  91. func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  92. if err := fn(c); err != nil {
  93. return nil, err
  94. }
  95. cmds, err := c.Exec()
  96. _ = c.Close()
  97. return cmds, err
  98. }
  99. func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  100. return c.pipelined(fn)
  101. }
  102. func (c *Pipeline) Pipeline() Pipeliner {
  103. return c
  104. }
  105. func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
  106. return c.pipelined(fn)
  107. }
  108. func (c *Pipeline) TxPipeline() Pipeliner {
  109. return c
  110. }