action.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623
  1. package radix
  2. import (
  3. "bufio"
  4. "bytes"
  5. "crypto/sha1"
  6. "encoding/hex"
  7. "fmt"
  8. "io"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "github.com/mediocregopher/radix/v3/resp"
  13. "github.com/mediocregopher/radix/v3/resp/resp2"
  14. "golang.org/x/xerrors"
  15. )
  16. // Action performs a task using a Conn.
  17. type Action interface {
  18. // Keys returns the keys which will be acted on. Empty slice or nil may be
  19. // returned if no keys are being acted on. The returned slice must not be
  20. // modified.
  21. Keys() []string
  22. // Run actually performs the Action using the given Conn.
  23. Run(c Conn) error
  24. }
  25. // CmdAction is a sub-class of Action which can be used in two different ways.
  26. // The first is as a normal Action, where Run is called with a Conn and returns
  27. // once the Action has been completed.
  28. //
  29. // The second way is as a Pipeline-able command, where one or more commands are
  30. // written in one step (via the MarshalRESP method) and their results are read
  31. // later (via the UnmarshalRESP method).
  32. //
  33. // When used directly with Do then MarshalRESP/UnmarshalRESP are not called, and
  34. // when used in a Pipeline the Run method is not called.
  35. type CmdAction interface {
  36. Action
  37. resp.Marshaler
  38. resp.Unmarshaler
  39. }
  40. var noKeyCmds = map[string]bool{
  41. "SENTINEL": true,
  42. "CLUSTER": true,
  43. "READONLY": true,
  44. "READWRITE": true,
  45. "ASKING": true,
  46. "AUTH": true,
  47. "ECHO": true,
  48. "PING": true,
  49. "QUIT": true,
  50. "SELECT": true,
  51. "SWAPDB": true,
  52. "KEYS": true,
  53. "MIGRATE": true,
  54. "OBJECT": true,
  55. "RANDOMKEY": true,
  56. "WAIT": true,
  57. "SCAN": true,
  58. "EVAL": true,
  59. "EVALSHA": true,
  60. "SCRIPT": true,
  61. "BGREWRITEAOF": true,
  62. "BGSAVE": true,
  63. "CLIENT": true,
  64. "COMMAND": true,
  65. "CONFIG": true,
  66. "DBSIZE": true,
  67. "DEBUG": true,
  68. "FLUSHALL": true,
  69. "FLUSHDB": true,
  70. "INFO": true,
  71. "LASTSAVE": true,
  72. "MONITOR": true,
  73. "ROLE": true,
  74. "SAVE": true,
  75. "SHUTDOWN": true,
  76. "SLAVEOF": true,
  77. "SLOWLOG": true,
  78. "SYNC": true,
  79. "TIME": true,
  80. "DISCARD": true,
  81. "EXEC": true,
  82. "MULTI": true,
  83. "UNWATCH": true,
  84. "WATCH": true,
  85. }
  86. func cmdString(m resp.Marshaler) string {
  87. // we go way out of the way here to display the command as it would be sent
  88. // to redis. This is pretty similar logic to what the stub does as well
  89. buf := new(bytes.Buffer)
  90. if err := m.MarshalRESP(buf); err != nil {
  91. return fmt.Sprintf("error creating string: %q", err.Error())
  92. }
  93. var ss []string
  94. err := resp2.RawMessage(buf.Bytes()).UnmarshalInto(resp2.Any{I: &ss})
  95. if err != nil {
  96. return fmt.Sprintf("error creating string: %q", err.Error())
  97. }
  98. for i := range ss {
  99. ss[i] = strconv.QuoteToASCII(ss[i])
  100. }
  101. return "[" + strings.Join(ss, " ") + "]"
  102. }
  103. func marshalBulkString(prevErr error, w io.Writer, str string) error {
  104. if prevErr != nil {
  105. return prevErr
  106. }
  107. return resp2.BulkString{S: str}.MarshalRESP(w)
  108. }
  109. func marshalBulkStringBytes(prevErr error, w io.Writer, b []byte) error {
  110. if prevErr != nil {
  111. return prevErr
  112. }
  113. return resp2.BulkStringBytes{B: b}.MarshalRESP(w)
  114. }
  115. ////////////////////////////////////////////////////////////////////////////////
  116. type cmdAction struct {
  117. rcv interface{}
  118. cmd string
  119. args []string
  120. flat bool
  121. flatKey [1]string // use array to avoid allocation in Keys
  122. flatArgs []interface{}
  123. }
  124. // BREAM: Benchmarks Rule Everything Around Me.
  125. var cmdActionPool sync.Pool
  126. func getCmdAction() *cmdAction {
  127. if ci := cmdActionPool.Get(); ci != nil {
  128. return ci.(*cmdAction)
  129. }
  130. return new(cmdAction)
  131. }
  132. // Cmd is used to perform a redis command and retrieve a result. It should not
  133. // be passed into Do more than once.
  134. //
  135. // If the receiver value of Cmd is a primitive, a slice/map, or a struct then a
  136. // pointer must be passed in. It may also be an io.Writer, an
  137. // encoding.Text/BinaryUnmarshaler, or a resp.Unmarshaler. See the package docs
  138. // for more on how results are unmarshaled into the receiver.
  139. func Cmd(rcv interface{}, cmd string, args ...string) CmdAction {
  140. c := getCmdAction()
  141. *c = cmdAction{
  142. rcv: rcv,
  143. cmd: cmd,
  144. args: args,
  145. }
  146. return c
  147. }
  148. // FlatCmd is like Cmd, but the arguments can be of almost any type, and FlatCmd
  149. // will automatically flatten them into a single array of strings. Like Cmd, a
  150. // FlatCmd should not be passed into Do more than once.
  151. //
  152. // FlatCmd does _not_ work for commands whose first parameter isn't a key, or
  153. // (generally) for MSET. Use Cmd for those.
  154. //
  155. // FlatCmd supports using a resp.LenReader (an io.Reader with a Len() method) as
  156. // an argument. *bytes.Buffer is an example of a LenReader, and the resp package
  157. // has a NewLenReader function which can wrap an existing io.Reader.
  158. //
  159. // FlatCmd also supports encoding.Text/BinaryMarshalers. It does _not_ currently
  160. // support resp.Marshaler.
  161. //
  162. // The receiver to FlatCmd follows the same rules as for Cmd.
  163. func FlatCmd(rcv interface{}, cmd, key string, args ...interface{}) CmdAction {
  164. c := getCmdAction()
  165. *c = cmdAction{
  166. rcv: rcv,
  167. cmd: cmd,
  168. flat: true,
  169. flatKey: [1]string{key},
  170. flatArgs: args,
  171. }
  172. return c
  173. }
  174. func findStreamsKeys(args []string) []string {
  175. for i, arg := range args {
  176. if strings.ToUpper(arg) != "STREAMS" {
  177. continue
  178. }
  179. // after STREAMS only stream keys and IDs can be given and since there must be the same number of keys and ids
  180. // we can just take half of remaining arguments as keys. If the number of IDs does not match the number of
  181. // keys the command will fail later when send to Redis so no need for us to handle that case.
  182. ids := len(args[i+1:]) / 2
  183. return args[i+1 : len(args)-ids]
  184. }
  185. return nil
  186. }
  187. func (c *cmdAction) Keys() []string {
  188. if c.flat {
  189. return c.flatKey[:]
  190. }
  191. cmd := strings.ToUpper(c.cmd)
  192. if cmd == "BITOP" && len(c.args) > 1 { // antirez why you do this
  193. return c.args[1:]
  194. } else if cmd == "XINFO" {
  195. if len(c.args) < 2 {
  196. return nil
  197. }
  198. return c.args[1:2]
  199. } else if cmd == "XGROUP" && len(c.args) > 1 {
  200. return c.args[1:2]
  201. } else if cmd == "XREAD" || cmd == "XREADGROUP" { // antirez why you still do this
  202. return findStreamsKeys(c.args)
  203. } else if noKeyCmds[cmd] || len(c.args) == 0 {
  204. return nil
  205. }
  206. return c.args[:1]
  207. }
  208. func (c *cmdAction) flatMarshalRESP(w io.Writer) error {
  209. var err error
  210. a := resp2.Any{
  211. I: c.flatArgs,
  212. MarshalBulkString: true,
  213. MarshalNoArrayHeaders: true,
  214. }
  215. arrL := 2 + a.NumElems()
  216. err = resp2.ArrayHeader{N: arrL}.MarshalRESP(w)
  217. err = marshalBulkString(err, w, c.cmd)
  218. err = marshalBulkString(err, w, c.flatKey[0])
  219. if err != nil {
  220. return err
  221. }
  222. return a.MarshalRESP(w)
  223. }
  224. func (c *cmdAction) MarshalRESP(w io.Writer) error {
  225. if c.flat {
  226. return c.flatMarshalRESP(w)
  227. }
  228. err := resp2.ArrayHeader{N: len(c.args) + 1}.MarshalRESP(w)
  229. err = marshalBulkString(err, w, c.cmd)
  230. for i := range c.args {
  231. err = marshalBulkString(err, w, c.args[i])
  232. }
  233. return err
  234. }
  235. func (c *cmdAction) UnmarshalRESP(br *bufio.Reader) error {
  236. if err := (resp2.Any{I: c.rcv}).UnmarshalRESP(br); err != nil {
  237. return err
  238. }
  239. cmdActionPool.Put(c)
  240. return nil
  241. }
  242. func (c *cmdAction) Run(conn Conn) error {
  243. if err := conn.Encode(c); err != nil {
  244. return err
  245. }
  246. return conn.Decode(c)
  247. }
  248. func (c *cmdAction) String() string {
  249. return cmdString(c)
  250. }
  251. func (c *cmdAction) ClusterCanRetry() bool {
  252. return true
  253. }
  254. ////////////////////////////////////////////////////////////////////////////////
  255. // MaybeNil is a type which wraps a receiver. It will first detect if what's
  256. // being received is a nil RESP type (either bulk string or array), and if so
  257. // set Nil to true. If not the return value will be unmarshalled into Rcv
  258. // normally. If the response being received is an empty array then the EmptyArray
  259. // field will be set and Rcv unmarshalled into normally.
  260. type MaybeNil struct {
  261. Nil bool
  262. EmptyArray bool
  263. Rcv interface{}
  264. }
  265. // UnmarshalRESP implements the method for the resp.Unmarshaler interface.
  266. func (mn *MaybeNil) UnmarshalRESP(br *bufio.Reader) error {
  267. var rm resp2.RawMessage
  268. err := rm.UnmarshalRESP(br)
  269. mn.Nil = false
  270. mn.EmptyArray = false
  271. switch {
  272. case err != nil:
  273. return err
  274. case rm.IsNil():
  275. mn.Nil = true
  276. return nil
  277. case rm.IsEmptyArray():
  278. mn.EmptyArray = true
  279. fallthrough // to not break backwards compatibility
  280. default:
  281. return rm.UnmarshalInto(resp2.Any{I: mn.Rcv})
  282. }
  283. }
  284. ////////////////////////////////////////////////////////////////////////////////
  285. // Tuple is a helper type which can be used when unmarshaling a RESP array.
  286. // Each element of Tuple should be a pointer receiver which the corresponding
  287. // element of the RESP array will be unmarshaled into, or nil to skip that
  288. // element. The length of Tuple must match the length of the RESP array being
  289. // unmarshaled.
  290. //
  291. // Tuple is useful when unmarshaling the results from commands like EXEC and
  292. // EVAL.
  293. type Tuple []interface{}
  294. // UnmarshalRESP implements the method for the resp.Unmarshaler interface.
  295. func (t Tuple) UnmarshalRESP(br *bufio.Reader) error {
  296. var ah resp2.ArrayHeader
  297. if err := ah.UnmarshalRESP(br); err != nil {
  298. return err
  299. } else if ah.N != len(t) {
  300. for i := 0; i < ah.N; i++ {
  301. if err := (resp2.Any{}).UnmarshalRESP(br); err != nil {
  302. return err
  303. }
  304. }
  305. return resp.ErrDiscarded{
  306. Err: fmt.Errorf("expected array of size %d but got array of size %d", len(t), ah.N),
  307. }
  308. }
  309. var retErr error
  310. for i := 0; i < ah.N; i++ {
  311. if err := (resp2.Any{I: t[i]}).UnmarshalRESP(br); err != nil {
  312. // if the message was discarded then we can just continue, this
  313. // method will return the first error it sees
  314. if !xerrors.As(err, new(resp.ErrDiscarded)) {
  315. return err
  316. } else if retErr == nil {
  317. retErr = err
  318. }
  319. }
  320. }
  321. return retErr
  322. }
  323. ////////////////////////////////////////////////////////////////////////////////
  324. // EvalScript contains the body of a script to be used with redis' EVAL
  325. // functionality. Call Cmd on a EvalScript to actually create an Action which
  326. // can be run.
  327. type EvalScript struct {
  328. script, sum string
  329. numKeys int
  330. }
  331. // NewEvalScript initializes a EvalScript instance. numKeys corresponds to the
  332. // number of arguments which will be keys when Cmd is called.
  333. func NewEvalScript(numKeys int, script string) EvalScript {
  334. sumRaw := sha1.Sum([]byte(script))
  335. sum := hex.EncodeToString(sumRaw[:])
  336. return EvalScript{
  337. script: script,
  338. sum: sum,
  339. numKeys: numKeys,
  340. }
  341. }
  342. var (
  343. evalsha = []byte("EVALSHA")
  344. eval = []byte("EVAL")
  345. )
  346. type evalAction struct {
  347. EvalScript
  348. keys, args []string
  349. rcv interface{}
  350. flat bool
  351. flatArgs []interface{}
  352. eval bool
  353. }
  354. // Cmd is like the top-level Cmd but it uses the the EvalScript to perform an
  355. // EVALSHA command (and will automatically fallback to EVAL as necessary).
  356. // keysAndArgs must be at least as long as the numKeys argument of
  357. // NewEvalScript.
  358. func (es EvalScript) Cmd(rcv interface{}, keysAndArgs ...string) Action {
  359. if len(keysAndArgs) < es.numKeys {
  360. panic("not enough arguments passed into EvalScript.Cmd")
  361. }
  362. return &evalAction{
  363. EvalScript: es,
  364. keys: keysAndArgs[:es.numKeys],
  365. args: keysAndArgs[es.numKeys:],
  366. rcv: rcv,
  367. }
  368. }
  369. // FlatCmd is like the top level FlatCmd except it uses the EvalScript to
  370. // perform an EVALSHA command (and will automatically fallback to EVAL as
  371. // necessary). keys must be as long as the numKeys argument of NewEvalScript.
  372. func (es EvalScript) FlatCmd(rcv interface{}, keys []string, args ...interface{}) Action {
  373. if len(keys) != es.numKeys {
  374. panic("incorrect number of keys passed into EvalScript.FlatCmd")
  375. }
  376. return &evalAction{
  377. EvalScript: es,
  378. keys: keys,
  379. flatArgs: args,
  380. flat: true,
  381. rcv: rcv,
  382. }
  383. }
  384. func (ec *evalAction) Keys() []string {
  385. return ec.keys
  386. }
  387. func (ec *evalAction) MarshalRESP(w io.Writer) error {
  388. // EVAL(SHA) script/sum numkeys keys... args...
  389. ah := resp2.ArrayHeader{N: 3 + len(ec.keys)}
  390. if ec.flat {
  391. ah.N += (resp2.Any{I: ec.flatArgs}).NumElems()
  392. } else {
  393. ah.N += len(ec.args)
  394. }
  395. if err := ah.MarshalRESP(w); err != nil {
  396. return err
  397. }
  398. var err error
  399. if ec.eval {
  400. err = marshalBulkStringBytes(err, w, eval)
  401. err = marshalBulkString(err, w, ec.script)
  402. } else {
  403. err = marshalBulkStringBytes(err, w, evalsha)
  404. err = marshalBulkString(err, w, ec.sum)
  405. }
  406. err = marshalBulkString(err, w, strconv.Itoa(ec.numKeys))
  407. for i := range ec.keys {
  408. err = marshalBulkString(err, w, ec.keys[i])
  409. }
  410. if err != nil {
  411. return err
  412. }
  413. if ec.flat {
  414. err = (resp2.Any{
  415. I: ec.flatArgs,
  416. MarshalBulkString: true,
  417. MarshalNoArrayHeaders: true,
  418. }).MarshalRESP(w)
  419. } else {
  420. for i := range ec.args {
  421. err = marshalBulkString(err, w, ec.args[i])
  422. }
  423. }
  424. return err
  425. }
  426. func (ec *evalAction) Run(conn Conn) error {
  427. run := func(eval bool) error {
  428. ec.eval = eval
  429. if err := conn.Encode(ec); err != nil {
  430. return err
  431. }
  432. return conn.Decode(resp2.Any{I: ec.rcv})
  433. }
  434. err := run(false)
  435. if err != nil && strings.HasPrefix(err.Error(), "NOSCRIPT") {
  436. err = run(true)
  437. }
  438. return err
  439. }
  440. func (ec *evalAction) ClusterCanRetry() bool {
  441. return true
  442. }
  443. ////////////////////////////////////////////////////////////////////////////////
  444. type pipeline []CmdAction
  445. // Pipeline returns an Action which first writes multiple commands to a Conn in
  446. // a single write, then reads their responses in a single read. This reduces
  447. // network delay into a single round-trip.
  448. //
  449. // Run will not be called on any of the passed in CmdActions.
  450. //
  451. // NOTE that, while a Pipeline performs all commands on a single Conn, it
  452. // shouldn't be used by itself for MULTI/EXEC transactions, because if there's
  453. // an error it won't discard the incomplete transaction. Use WithConn or
  454. // EvalScript for transactional functionality instead.
  455. func Pipeline(cmds ...CmdAction) Action {
  456. return pipeline(cmds)
  457. }
  458. func (p pipeline) Keys() []string {
  459. m := map[string]bool{}
  460. for _, rc := range p {
  461. for _, k := range rc.Keys() {
  462. m[k] = true
  463. }
  464. }
  465. keys := make([]string, 0, len(m))
  466. for k := range m {
  467. keys = append(keys, k)
  468. }
  469. return keys
  470. }
  471. func (p pipeline) Run(c Conn) error {
  472. if err := c.Encode(p); err != nil {
  473. return err
  474. }
  475. for i, cmd := range p {
  476. if err := c.Decode(cmd); err != nil {
  477. p.drain(c, len(p)-i-1)
  478. return decodeErr(cmd, err)
  479. }
  480. }
  481. return nil
  482. }
  483. func (p pipeline) drain(c Conn, n int) {
  484. rcv := resp2.Any{I: nil}
  485. for i := 0; i < n; i++ {
  486. _ = c.Decode(&rcv)
  487. }
  488. }
  489. func decodeErr(cmd CmdAction, err error) error {
  490. c, ok := cmd.(*cmdAction)
  491. if ok {
  492. return fmt.Errorf(
  493. "failed to decode pipeline CmdAction '%v' with keys %v: %w",
  494. c.cmd,
  495. c.Keys(),
  496. err)
  497. }
  498. return fmt.Errorf(
  499. "failed to decode pipeline CmdAction '%v': %w",
  500. cmd,
  501. err)
  502. }
  503. // MarshalRESP implements the resp.Marshaler interface, so that the pipeline can
  504. // pass itself to the Conn.Encode method instead of calling Conn.Encode for each
  505. // CmdAction in the pipeline.
  506. //
  507. // This helps with Conn implementations that flush their underlying buffers
  508. // after each call to Encode, like the default default Conn implementation
  509. // (connWrap) does, making better use of internal buffering and automatic
  510. // flushing as well as reducing the number of syscalls that both the client and
  511. // Redis need to do.
  512. //
  513. // Without this, using the default Conn implementation, big pipelines can easily
  514. // spend much of their time just in flushing (in one case measured, up to 40%).
  515. func (p pipeline) MarshalRESP(w io.Writer) error {
  516. for _, cmd := range p {
  517. if err := cmd.MarshalRESP(w); err != nil {
  518. return err
  519. }
  520. }
  521. return nil
  522. }
  523. ////////////////////////////////////////////////////////////////////////////////
  524. type withConn struct {
  525. key [1]string // use array to avoid allocation in Keys
  526. fn func(Conn) error
  527. }
  528. // WithConn is used to perform a set of independent Actions on the same Conn.
  529. //
  530. // key should be a key which one or more of the inner Actions is going to act
  531. // on, or "" if no keys are being acted on or the keys aren't yet known. key is
  532. // generally only necessary when using Cluster.
  533. //
  534. // The callback function is what should actually carry out the inner actions,
  535. // and the error it returns will be passed back up immediately.
  536. //
  537. // NOTE that WithConn only ensures all inner Actions are performed on the same
  538. // Conn, it doesn't make them transactional. Use MULTI/WATCH/EXEC within a
  539. // WithConn for transactions, or use EvalScript.
  540. func WithConn(key string, fn func(Conn) error) Action {
  541. return &withConn{[1]string{key}, fn}
  542. }
  543. func (wc *withConn) Keys() []string {
  544. return wc.key[:]
  545. }
  546. func (wc *withConn) Run(c Conn) error {
  547. return wc.fn(c)
  548. }