123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623 |
- package radix
- import (
- "bufio"
- "bytes"
- "crypto/sha1"
- "encoding/hex"
- "fmt"
- "io"
- "strconv"
- "strings"
- "sync"
- "github.com/mediocregopher/radix/v3/resp"
- "github.com/mediocregopher/radix/v3/resp/resp2"
- "golang.org/x/xerrors"
- )
- // Action performs a task using a Conn.
- type Action interface {
- // Keys returns the keys which will be acted on. Empty slice or nil may be
- // returned if no keys are being acted on. The returned slice must not be
- // modified.
- Keys() []string
- // Run actually performs the Action using the given Conn.
- Run(c Conn) error
- }
- // CmdAction is a sub-class of Action which can be used in two different ways.
- // The first is as a normal Action, where Run is called with a Conn and returns
- // once the Action has been completed.
- //
- // The second way is as a Pipeline-able command, where one or more commands are
- // written in one step (via the MarshalRESP method) and their results are read
- // later (via the UnmarshalRESP method).
- //
- // When used directly with Do then MarshalRESP/UnmarshalRESP are not called, and
- // when used in a Pipeline the Run method is not called.
- type CmdAction interface {
- Action
- resp.Marshaler
- resp.Unmarshaler
- }
- var noKeyCmds = map[string]bool{
- "SENTINEL": true,
- "CLUSTER": true,
- "READONLY": true,
- "READWRITE": true,
- "ASKING": true,
- "AUTH": true,
- "ECHO": true,
- "PING": true,
- "QUIT": true,
- "SELECT": true,
- "SWAPDB": true,
- "KEYS": true,
- "MIGRATE": true,
- "OBJECT": true,
- "RANDOMKEY": true,
- "WAIT": true,
- "SCAN": true,
- "EVAL": true,
- "EVALSHA": true,
- "SCRIPT": true,
- "BGREWRITEAOF": true,
- "BGSAVE": true,
- "CLIENT": true,
- "COMMAND": true,
- "CONFIG": true,
- "DBSIZE": true,
- "DEBUG": true,
- "FLUSHALL": true,
- "FLUSHDB": true,
- "INFO": true,
- "LASTSAVE": true,
- "MONITOR": true,
- "ROLE": true,
- "SAVE": true,
- "SHUTDOWN": true,
- "SLAVEOF": true,
- "SLOWLOG": true,
- "SYNC": true,
- "TIME": true,
- "DISCARD": true,
- "EXEC": true,
- "MULTI": true,
- "UNWATCH": true,
- "WATCH": true,
- }
- func cmdString(m resp.Marshaler) string {
- // we go way out of the way here to display the command as it would be sent
- // to redis. This is pretty similar logic to what the stub does as well
- buf := new(bytes.Buffer)
- if err := m.MarshalRESP(buf); err != nil {
- return fmt.Sprintf("error creating string: %q", err.Error())
- }
- var ss []string
- err := resp2.RawMessage(buf.Bytes()).UnmarshalInto(resp2.Any{I: &ss})
- if err != nil {
- return fmt.Sprintf("error creating string: %q", err.Error())
- }
- for i := range ss {
- ss[i] = strconv.QuoteToASCII(ss[i])
- }
- return "[" + strings.Join(ss, " ") + "]"
- }
- func marshalBulkString(prevErr error, w io.Writer, str string) error {
- if prevErr != nil {
- return prevErr
- }
- return resp2.BulkString{S: str}.MarshalRESP(w)
- }
- func marshalBulkStringBytes(prevErr error, w io.Writer, b []byte) error {
- if prevErr != nil {
- return prevErr
- }
- return resp2.BulkStringBytes{B: b}.MarshalRESP(w)
- }
- ////////////////////////////////////////////////////////////////////////////////
- type cmdAction struct {
- rcv interface{}
- cmd string
- args []string
- flat bool
- flatKey [1]string // use array to avoid allocation in Keys
- flatArgs []interface{}
- }
- // BREAM: Benchmarks Rule Everything Around Me.
- var cmdActionPool sync.Pool
- func getCmdAction() *cmdAction {
- if ci := cmdActionPool.Get(); ci != nil {
- return ci.(*cmdAction)
- }
- return new(cmdAction)
- }
- // Cmd is used to perform a redis command and retrieve a result. It should not
- // be passed into Do more than once.
- //
- // If the receiver value of Cmd is a primitive, a slice/map, or a struct then a
- // pointer must be passed in. It may also be an io.Writer, an
- // encoding.Text/BinaryUnmarshaler, or a resp.Unmarshaler. See the package docs
- // for more on how results are unmarshaled into the receiver.
- func Cmd(rcv interface{}, cmd string, args ...string) CmdAction {
- c := getCmdAction()
- *c = cmdAction{
- rcv: rcv,
- cmd: cmd,
- args: args,
- }
- return c
- }
- // FlatCmd is like Cmd, but the arguments can be of almost any type, and FlatCmd
- // will automatically flatten them into a single array of strings. Like Cmd, a
- // FlatCmd should not be passed into Do more than once.
- //
- // FlatCmd does _not_ work for commands whose first parameter isn't a key, or
- // (generally) for MSET. Use Cmd for those.
- //
- // FlatCmd supports using a resp.LenReader (an io.Reader with a Len() method) as
- // an argument. *bytes.Buffer is an example of a LenReader, and the resp package
- // has a NewLenReader function which can wrap an existing io.Reader.
- //
- // FlatCmd also supports encoding.Text/BinaryMarshalers. It does _not_ currently
- // support resp.Marshaler.
- //
- // The receiver to FlatCmd follows the same rules as for Cmd.
- func FlatCmd(rcv interface{}, cmd, key string, args ...interface{}) CmdAction {
- c := getCmdAction()
- *c = cmdAction{
- rcv: rcv,
- cmd: cmd,
- flat: true,
- flatKey: [1]string{key},
- flatArgs: args,
- }
- return c
- }
- func findStreamsKeys(args []string) []string {
- for i, arg := range args {
- if strings.ToUpper(arg) != "STREAMS" {
- continue
- }
- // after STREAMS only stream keys and IDs can be given and since there must be the same number of keys and ids
- // we can just take half of remaining arguments as keys. If the number of IDs does not match the number of
- // keys the command will fail later when send to Redis so no need for us to handle that case.
- ids := len(args[i+1:]) / 2
- return args[i+1 : len(args)-ids]
- }
- return nil
- }
- func (c *cmdAction) Keys() []string {
- if c.flat {
- return c.flatKey[:]
- }
- cmd := strings.ToUpper(c.cmd)
- if cmd == "BITOP" && len(c.args) > 1 { // antirez why you do this
- return c.args[1:]
- } else if cmd == "XINFO" {
- if len(c.args) < 2 {
- return nil
- }
- return c.args[1:2]
- } else if cmd == "XGROUP" && len(c.args) > 1 {
- return c.args[1:2]
- } else if cmd == "XREAD" || cmd == "XREADGROUP" { // antirez why you still do this
- return findStreamsKeys(c.args)
- } else if noKeyCmds[cmd] || len(c.args) == 0 {
- return nil
- }
- return c.args[:1]
- }
- func (c *cmdAction) flatMarshalRESP(w io.Writer) error {
- var err error
- a := resp2.Any{
- I: c.flatArgs,
- MarshalBulkString: true,
- MarshalNoArrayHeaders: true,
- }
- arrL := 2 + a.NumElems()
- err = resp2.ArrayHeader{N: arrL}.MarshalRESP(w)
- err = marshalBulkString(err, w, c.cmd)
- err = marshalBulkString(err, w, c.flatKey[0])
- if err != nil {
- return err
- }
- return a.MarshalRESP(w)
- }
- func (c *cmdAction) MarshalRESP(w io.Writer) error {
- if c.flat {
- return c.flatMarshalRESP(w)
- }
- err := resp2.ArrayHeader{N: len(c.args) + 1}.MarshalRESP(w)
- err = marshalBulkString(err, w, c.cmd)
- for i := range c.args {
- err = marshalBulkString(err, w, c.args[i])
- }
- return err
- }
- func (c *cmdAction) UnmarshalRESP(br *bufio.Reader) error {
- if err := (resp2.Any{I: c.rcv}).UnmarshalRESP(br); err != nil {
- return err
- }
- cmdActionPool.Put(c)
- return nil
- }
- func (c *cmdAction) Run(conn Conn) error {
- if err := conn.Encode(c); err != nil {
- return err
- }
- return conn.Decode(c)
- }
- func (c *cmdAction) String() string {
- return cmdString(c)
- }
- func (c *cmdAction) ClusterCanRetry() bool {
- return true
- }
- ////////////////////////////////////////////////////////////////////////////////
- // MaybeNil is a type which wraps a receiver. It will first detect if what's
- // being received is a nil RESP type (either bulk string or array), and if so
- // set Nil to true. If not the return value will be unmarshalled into Rcv
- // normally. If the response being received is an empty array then the EmptyArray
- // field will be set and Rcv unmarshalled into normally.
- type MaybeNil struct {
- Nil bool
- EmptyArray bool
- Rcv interface{}
- }
- // UnmarshalRESP implements the method for the resp.Unmarshaler interface.
- func (mn *MaybeNil) UnmarshalRESP(br *bufio.Reader) error {
- var rm resp2.RawMessage
- err := rm.UnmarshalRESP(br)
- mn.Nil = false
- mn.EmptyArray = false
- switch {
- case err != nil:
- return err
- case rm.IsNil():
- mn.Nil = true
- return nil
- case rm.IsEmptyArray():
- mn.EmptyArray = true
- fallthrough // to not break backwards compatibility
- default:
- return rm.UnmarshalInto(resp2.Any{I: mn.Rcv})
- }
- }
- ////////////////////////////////////////////////////////////////////////////////
- // Tuple is a helper type which can be used when unmarshaling a RESP array.
- // Each element of Tuple should be a pointer receiver which the corresponding
- // element of the RESP array will be unmarshaled into, or nil to skip that
- // element. The length of Tuple must match the length of the RESP array being
- // unmarshaled.
- //
- // Tuple is useful when unmarshaling the results from commands like EXEC and
- // EVAL.
- type Tuple []interface{}
- // UnmarshalRESP implements the method for the resp.Unmarshaler interface.
- func (t Tuple) UnmarshalRESP(br *bufio.Reader) error {
- var ah resp2.ArrayHeader
- if err := ah.UnmarshalRESP(br); err != nil {
- return err
- } else if ah.N != len(t) {
- for i := 0; i < ah.N; i++ {
- if err := (resp2.Any{}).UnmarshalRESP(br); err != nil {
- return err
- }
- }
- return resp.ErrDiscarded{
- Err: fmt.Errorf("expected array of size %d but got array of size %d", len(t), ah.N),
- }
- }
- var retErr error
- for i := 0; i < ah.N; i++ {
- if err := (resp2.Any{I: t[i]}).UnmarshalRESP(br); err != nil {
- // if the message was discarded then we can just continue, this
- // method will return the first error it sees
- if !xerrors.As(err, new(resp.ErrDiscarded)) {
- return err
- } else if retErr == nil {
- retErr = err
- }
- }
- }
- return retErr
- }
- ////////////////////////////////////////////////////////////////////////////////
- // EvalScript contains the body of a script to be used with redis' EVAL
- // functionality. Call Cmd on a EvalScript to actually create an Action which
- // can be run.
- type EvalScript struct {
- script, sum string
- numKeys int
- }
- // NewEvalScript initializes a EvalScript instance. numKeys corresponds to the
- // number of arguments which will be keys when Cmd is called.
- func NewEvalScript(numKeys int, script string) EvalScript {
- sumRaw := sha1.Sum([]byte(script))
- sum := hex.EncodeToString(sumRaw[:])
- return EvalScript{
- script: script,
- sum: sum,
- numKeys: numKeys,
- }
- }
- var (
- evalsha = []byte("EVALSHA")
- eval = []byte("EVAL")
- )
- type evalAction struct {
- EvalScript
- keys, args []string
- rcv interface{}
- flat bool
- flatArgs []interface{}
- eval bool
- }
- // Cmd is like the top-level Cmd but it uses the the EvalScript to perform an
- // EVALSHA command (and will automatically fallback to EVAL as necessary).
- // keysAndArgs must be at least as long as the numKeys argument of
- // NewEvalScript.
- func (es EvalScript) Cmd(rcv interface{}, keysAndArgs ...string) Action {
- if len(keysAndArgs) < es.numKeys {
- panic("not enough arguments passed into EvalScript.Cmd")
- }
- return &evalAction{
- EvalScript: es,
- keys: keysAndArgs[:es.numKeys],
- args: keysAndArgs[es.numKeys:],
- rcv: rcv,
- }
- }
- // FlatCmd is like the top level FlatCmd except it uses the EvalScript to
- // perform an EVALSHA command (and will automatically fallback to EVAL as
- // necessary). keys must be as long as the numKeys argument of NewEvalScript.
- func (es EvalScript) FlatCmd(rcv interface{}, keys []string, args ...interface{}) Action {
- if len(keys) != es.numKeys {
- panic("incorrect number of keys passed into EvalScript.FlatCmd")
- }
- return &evalAction{
- EvalScript: es,
- keys: keys,
- flatArgs: args,
- flat: true,
- rcv: rcv,
- }
- }
- func (ec *evalAction) Keys() []string {
- return ec.keys
- }
- func (ec *evalAction) MarshalRESP(w io.Writer) error {
- // EVAL(SHA) script/sum numkeys keys... args...
- ah := resp2.ArrayHeader{N: 3 + len(ec.keys)}
- if ec.flat {
- ah.N += (resp2.Any{I: ec.flatArgs}).NumElems()
- } else {
- ah.N += len(ec.args)
- }
- if err := ah.MarshalRESP(w); err != nil {
- return err
- }
- var err error
- if ec.eval {
- err = marshalBulkStringBytes(err, w, eval)
- err = marshalBulkString(err, w, ec.script)
- } else {
- err = marshalBulkStringBytes(err, w, evalsha)
- err = marshalBulkString(err, w, ec.sum)
- }
- err = marshalBulkString(err, w, strconv.Itoa(ec.numKeys))
- for i := range ec.keys {
- err = marshalBulkString(err, w, ec.keys[i])
- }
- if err != nil {
- return err
- }
- if ec.flat {
- err = (resp2.Any{
- I: ec.flatArgs,
- MarshalBulkString: true,
- MarshalNoArrayHeaders: true,
- }).MarshalRESP(w)
- } else {
- for i := range ec.args {
- err = marshalBulkString(err, w, ec.args[i])
- }
- }
- return err
- }
- func (ec *evalAction) Run(conn Conn) error {
- run := func(eval bool) error {
- ec.eval = eval
- if err := conn.Encode(ec); err != nil {
- return err
- }
- return conn.Decode(resp2.Any{I: ec.rcv})
- }
- err := run(false)
- if err != nil && strings.HasPrefix(err.Error(), "NOSCRIPT") {
- err = run(true)
- }
- return err
- }
- func (ec *evalAction) ClusterCanRetry() bool {
- return true
- }
- ////////////////////////////////////////////////////////////////////////////////
- type pipeline []CmdAction
- // Pipeline returns an Action which first writes multiple commands to a Conn in
- // a single write, then reads their responses in a single read. This reduces
- // network delay into a single round-trip.
- //
- // Run will not be called on any of the passed in CmdActions.
- //
- // NOTE that, while a Pipeline performs all commands on a single Conn, it
- // shouldn't be used by itself for MULTI/EXEC transactions, because if there's
- // an error it won't discard the incomplete transaction. Use WithConn or
- // EvalScript for transactional functionality instead.
- func Pipeline(cmds ...CmdAction) Action {
- return pipeline(cmds)
- }
- func (p pipeline) Keys() []string {
- m := map[string]bool{}
- for _, rc := range p {
- for _, k := range rc.Keys() {
- m[k] = true
- }
- }
- keys := make([]string, 0, len(m))
- for k := range m {
- keys = append(keys, k)
- }
- return keys
- }
- func (p pipeline) Run(c Conn) error {
- if err := c.Encode(p); err != nil {
- return err
- }
- for i, cmd := range p {
- if err := c.Decode(cmd); err != nil {
- p.drain(c, len(p)-i-1)
- return decodeErr(cmd, err)
- }
- }
- return nil
- }
- func (p pipeline) drain(c Conn, n int) {
- rcv := resp2.Any{I: nil}
- for i := 0; i < n; i++ {
- _ = c.Decode(&rcv)
- }
- }
- func decodeErr(cmd CmdAction, err error) error {
- c, ok := cmd.(*cmdAction)
- if ok {
- return fmt.Errorf(
- "failed to decode pipeline CmdAction '%v' with keys %v: %w",
- c.cmd,
- c.Keys(),
- err)
- }
- return fmt.Errorf(
- "failed to decode pipeline CmdAction '%v': %w",
- cmd,
- err)
- }
- // MarshalRESP implements the resp.Marshaler interface, so that the pipeline can
- // pass itself to the Conn.Encode method instead of calling Conn.Encode for each
- // CmdAction in the pipeline.
- //
- // This helps with Conn implementations that flush their underlying buffers
- // after each call to Encode, like the default default Conn implementation
- // (connWrap) does, making better use of internal buffering and automatic
- // flushing as well as reducing the number of syscalls that both the client and
- // Redis need to do.
- //
- // Without this, using the default Conn implementation, big pipelines can easily
- // spend much of their time just in flushing (in one case measured, up to 40%).
- func (p pipeline) MarshalRESP(w io.Writer) error {
- for _, cmd := range p {
- if err := cmd.MarshalRESP(w); err != nil {
- return err
- }
- }
- return nil
- }
- ////////////////////////////////////////////////////////////////////////////////
- type withConn struct {
- key [1]string // use array to avoid allocation in Keys
- fn func(Conn) error
- }
- // WithConn is used to perform a set of independent Actions on the same Conn.
- //
- // key should be a key which one or more of the inner Actions is going to act
- // on, or "" if no keys are being acted on or the keys aren't yet known. key is
- // generally only necessary when using Cluster.
- //
- // The callback function is what should actually carry out the inner actions,
- // and the error it returns will be passed back up immediately.
- //
- // NOTE that WithConn only ensures all inner Actions are performed on the same
- // Conn, it doesn't make them transactional. Use MULTI/WATCH/EXEC within a
- // WithConn for transactions, or use EvalScript.
- func WithConn(key string, fn func(Conn) error) Action {
- return &withConn{[1]string{key}, fn}
- }
- func (wc *withConn) Keys() []string {
- return wc.key[:]
- }
- func (wc *withConn) Run(c Conn) error {
- return wc.fn(c)
- }
|