123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package radix
- import (
- "strings"
- )
- type clusterScanner struct {
- cluster *Cluster
- opts ScanOpts
- addrs []string
- currScanner Scanner
- lastErr error
- }
- // NewScanner will return a Scanner which will scan over every node in the
- // cluster. This will panic if the ScanOpt's Command isn't "SCAN". For scanning
- // operations other than "SCAN" (e.g. "HSCAN", "ZSCAN") use the normal
- // NewScanner function.
- //
- // If the cluster topology changes during a scan the Scanner may or may not
- // error out due to it, depending on the nature of the change.
- func (c *Cluster) NewScanner(o ScanOpts) Scanner {
- if strings.ToUpper(o.Command) != "SCAN" {
- panic("Cluster.NewScanner can only perform SCAN operations")
- }
- var addrs []string
- for _, node := range c.Topo().Primaries() {
- addrs = append(addrs, node.Addr)
- }
- cs := &clusterScanner{
- cluster: c,
- opts: o,
- addrs: addrs,
- }
- cs.nextScanner()
- return cs
- }
- func (cs *clusterScanner) closeCurr() {
- if cs.currScanner != nil {
- if err := cs.currScanner.Close(); err != nil && cs.lastErr == nil {
- cs.lastErr = err
- }
- cs.currScanner = nil
- }
- }
- func (cs *clusterScanner) scannerForAddr(addr string) bool {
- client, _ := cs.cluster.rpool(addr)
- if client != nil {
- cs.closeCurr()
- cs.currScanner = NewScanner(client, cs.opts)
- return true
- }
- return false
- }
- func (cs *clusterScanner) nextScanner() {
- for {
- if len(cs.addrs) == 0 {
- cs.closeCurr()
- return
- }
- addr := cs.addrs[0]
- cs.addrs = cs.addrs[1:]
- if cs.scannerForAddr(addr) {
- return
- }
- }
- }
- func (cs *clusterScanner) Next(res *string) bool {
- for {
- if cs.currScanner == nil {
- return false
- } else if out := cs.currScanner.Next(res); out {
- return true
- }
- cs.nextScanner()
- }
- }
- func (cs *clusterScanner) Close() error {
- cs.closeCurr()
- return cs.lastErr
- }
|