123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- package radix
- import (
- "bufio"
- "fmt"
- "strings"
- "sync"
- "time"
- "github.com/mediocregopher/radix/v3/resp"
- )
- var blockingCmds = map[string]bool{
- "WAIT": true,
- // taken from https://github.com/joomcode/redispipe#limitations
- "BLPOP": true,
- "BRPOP": true,
- "BRPOPLPUSH": true,
- "BZPOPMIN": true,
- "BZPOPMAX": true,
- "XREAD": true,
- "XREADGROUP": true,
- "SAVE": true,
- }
- type pipeliner struct {
- c Client
- limit int
- window time.Duration
- // reqsBufCh contains buffers for collecting commands and acts as a semaphore
- // to limit the number of concurrent flushes.
- reqsBufCh chan []CmdAction
- reqCh chan *pipelinerCmd
- reqWG sync.WaitGroup
- l sync.RWMutex
- closed bool
- }
- var _ Client = (*pipeliner)(nil)
- func newPipeliner(c Client, concurrency, limit int, window time.Duration) *pipeliner {
- if concurrency < 1 {
- concurrency = 1
- }
- p := &pipeliner{
- c: c,
- limit: limit,
- window: window,
- reqsBufCh: make(chan []CmdAction, concurrency),
- reqCh: make(chan *pipelinerCmd, 32), // https://xkcd.com/221/
- }
- p.reqWG.Add(1)
- go func() {
- defer p.reqWG.Done()
- p.reqLoop()
- }()
- for i := 0; i < cap(p.reqsBufCh); i++ {
- if p.limit > 0 {
- p.reqsBufCh <- make([]CmdAction, 0, limit)
- } else {
- p.reqsBufCh <- nil
- }
- }
- return p
- }
- // CanDo checks if the given Action can be executed / passed to p.Do.
- //
- // If CanDo returns false, the Action must not be given to Do.
- func (p *pipeliner) CanDo(a Action) bool {
- // there is currently no way to get the command for CmdAction implementations
- // from outside the radix package so we can not multiplex those commands. User
- // defined pipelines are not pipelined to let the user better control them.
- if cmdA, ok := a.(*cmdAction); ok {
- return !blockingCmds[strings.ToUpper(cmdA.cmd)]
- }
- return false
- }
- // Do executes the given Action as part of the pipeline.
- //
- // If a is not a CmdAction, Do panics.
- func (p *pipeliner) Do(a Action) error {
- req := getPipelinerCmd(a.(CmdAction)) // get this outside the lock to avoid
- p.l.RLock()
- if p.closed {
- p.l.RUnlock()
- return errClientClosed
- }
- p.reqCh <- req
- p.l.RUnlock()
- err := <-req.resCh
- poolPipelinerCmd(req)
- return err
- }
- // Close closes the pipeliner and makes sure that all background goroutines
- // are stopped before returning.
- //
- // Close does *not* close the underlying Client.
- func (p *pipeliner) Close() error {
- p.l.Lock()
- defer p.l.Unlock()
- if p.closed {
- return nil
- }
- close(p.reqCh)
- p.reqWG.Wait()
- for i := 0; i < cap(p.reqsBufCh); i++ {
- <-p.reqsBufCh
- }
- p.c, p.closed = nil, true
- return nil
- }
- func (p *pipeliner) reqLoop() {
- t := getTimer(time.Hour)
- defer putTimer(t)
- t.Stop()
- reqs := <-p.reqsBufCh
- defer func() {
- p.reqsBufCh <- reqs
- }()
- for {
- select {
- case req, ok := <-p.reqCh:
- if !ok {
- reqs = p.flush(reqs)
- return
- }
- reqs = append(reqs, req)
- if p.limit > 0 && len(reqs) == p.limit {
- // if we reached the pipeline limit, execute now to avoid unnecessary waiting
- t.Stop()
- reqs = p.flush(reqs)
- } else if len(reqs) == 1 {
- t.Reset(p.window)
- }
- case <-t.C:
- reqs = p.flush(reqs)
- }
- }
- }
- func (p *pipeliner) flush(reqs []CmdAction) []CmdAction {
- if len(reqs) == 0 {
- return reqs
- }
- go func() {
- defer func() {
- p.reqsBufCh <- reqs[:0]
- }()
- pp := &pipelinerPipeline{pipeline: pipeline(reqs)}
- defer pp.flush()
- if err := p.c.Do(pp); err != nil {
- pp.doErr = err
- }
- }()
- return <-p.reqsBufCh
- }
- type pipelinerCmd struct {
- CmdAction
- resCh chan error
- unmarshalCalled bool
- unmarshalErr error
- }
- var (
- _ resp.Unmarshaler = (*pipelinerCmd)(nil)
- )
- func (p *pipelinerCmd) sendRes(err error) {
- p.resCh <- err
- }
- func (p *pipelinerCmd) UnmarshalRESP(br *bufio.Reader) error {
- p.unmarshalErr = p.CmdAction.UnmarshalRESP(br)
- p.unmarshalCalled = true // important: we set this after unmarshalErr in case the call to UnmarshalRESP panics
- return p.unmarshalErr
- }
- var pipelinerCmdPool sync.Pool
- func getPipelinerCmd(cmd CmdAction) *pipelinerCmd {
- req, _ := pipelinerCmdPool.Get().(*pipelinerCmd)
- if req != nil {
- *req = pipelinerCmd{
- CmdAction: cmd,
- resCh: req.resCh,
- }
- return req
- }
- return &pipelinerCmd{
- CmdAction: cmd,
- // using a buffer of 1 is faster than no buffer in most cases
- resCh: make(chan error, 1),
- }
- }
- func poolPipelinerCmd(req *pipelinerCmd) {
- req.CmdAction = nil
- pipelinerCmdPool.Put(req)
- }
- type pipelinerPipeline struct {
- pipeline
- doErr error
- }
- func (p *pipelinerPipeline) flush() {
- for _, req := range p.pipeline {
- var err error
- cmd := req.(*pipelinerCmd)
- if cmd.unmarshalCalled {
- err = cmd.unmarshalErr
- } else {
- err = p.doErr
- }
- cmd.sendRes(err)
- }
- }
- func (p *pipelinerPipeline) Run(c Conn) (err error) {
- defer func() {
- if v := recover(); v != nil {
- err = fmt.Errorf("%s", v)
- }
- }()
- if err := c.Encode(p); err != nil {
- return err
- }
- errConn := ioErrConn{Conn: c}
- for _, req := range p.pipeline {
- if _ = errConn.Decode(req); errConn.lastIOErr != nil {
- return errConn.lastIOErr
- }
- }
- return nil
- }
|