cluster_scanner.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package radix
  2. import (
  3. "strings"
  4. )
  5. type clusterScanner struct {
  6. cluster *Cluster
  7. opts ScanOpts
  8. addrs []string
  9. currScanner Scanner
  10. lastErr error
  11. }
  12. // NewScanner will return a Scanner which will scan over every node in the
  13. // cluster. This will panic if the ScanOpt's Command isn't "SCAN". For scanning
  14. // operations other than "SCAN" (e.g. "HSCAN", "ZSCAN") use the normal
  15. // NewScanner function.
  16. //
  17. // If the cluster topology changes during a scan the Scanner may or may not
  18. // error out due to it, depending on the nature of the change.
  19. func (c *Cluster) NewScanner(o ScanOpts) Scanner {
  20. if strings.ToUpper(o.Command) != "SCAN" {
  21. panic("Cluster.NewScanner can only perform SCAN operations")
  22. }
  23. var addrs []string
  24. for _, node := range c.Topo().Primaries() {
  25. addrs = append(addrs, node.Addr)
  26. }
  27. cs := &clusterScanner{
  28. cluster: c,
  29. opts: o,
  30. addrs: addrs,
  31. }
  32. cs.nextScanner()
  33. return cs
  34. }
  35. func (cs *clusterScanner) closeCurr() {
  36. if cs.currScanner != nil {
  37. if err := cs.currScanner.Close(); err != nil && cs.lastErr == nil {
  38. cs.lastErr = err
  39. }
  40. cs.currScanner = nil
  41. }
  42. }
  43. func (cs *clusterScanner) scannerForAddr(addr string) bool {
  44. client, _ := cs.cluster.rpool(addr)
  45. if client != nil {
  46. cs.closeCurr()
  47. cs.currScanner = NewScanner(client, cs.opts)
  48. return true
  49. }
  50. return false
  51. }
  52. func (cs *clusterScanner) nextScanner() {
  53. for {
  54. if len(cs.addrs) == 0 {
  55. cs.closeCurr()
  56. return
  57. }
  58. addr := cs.addrs[0]
  59. cs.addrs = cs.addrs[1:]
  60. if cs.scannerForAddr(addr) {
  61. return
  62. }
  63. }
  64. }
  65. func (cs *clusterScanner) Next(res *string) bool {
  66. for {
  67. if cs.currScanner == nil {
  68. return false
  69. } else if out := cs.currScanner.Next(res); out {
  70. return true
  71. }
  72. cs.nextScanner()
  73. }
  74. }
  75. func (cs *clusterScanner) Close() error {
  76. cs.closeCurr()
  77. return cs.lastErr
  78. }