123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- package radix
- import (
- "bufio"
- "fmt"
- "io"
- "net"
- "sort"
- "strconv"
- "github.com/mediocregopher/radix/v3/resp"
- "github.com/mediocregopher/radix/v3/resp/resp2"
- )
- // ClusterNode describes a single node in the cluster at a moment in time.
- type ClusterNode struct {
- // older versions of redis might not actually send back the id, so it may be
- // blank
- Addr, ID string
- // start is inclusive, end is exclusive
- Slots [][2]uint16
- // address and id this node is the secondary of, if it's a secondary
- SecondaryOfAddr, SecondaryOfID string
- }
- // ClusterTopo describes the cluster topology at a given moment. It will be
- // sorted first by slot number of each node and then by secondary status, so
- // primaries will come before secondaries.
- type ClusterTopo []ClusterNode
- // MarshalRESP implements the resp.Marshaler interface, and will marshal the
- // ClusterTopo in the same format as the return from CLUSTER SLOTS.
- func (tt ClusterTopo) MarshalRESP(w io.Writer) error {
- m := map[[2]uint16]topoSlotSet{}
- for _, t := range tt {
- for _, slots := range t.Slots {
- tss := m[slots]
- tss.slots = slots
- tss.nodes = append(tss.nodes, t)
- m[slots] = tss
- }
- }
- // we sort the topoSlotSets by their slot number so that the order is
- // deterministic, mostly so tests pass consistently, I'm not sure if actual
- // redis has any contract on the order
- allTSS := make([]topoSlotSet, 0, len(m))
- for _, tss := range m {
- allTSS = append(allTSS, tss)
- }
- sort.Slice(allTSS, func(i, j int) bool {
- return allTSS[i].slots[0] < allTSS[j].slots[0]
- })
- if err := (resp2.ArrayHeader{N: len(allTSS)}).MarshalRESP(w); err != nil {
- return err
- }
- for _, tss := range allTSS {
- if err := tss.MarshalRESP(w); err != nil {
- return err
- }
- }
- return nil
- }
- // UnmarshalRESP implements the resp.Unmarshaler interface, but only supports
- // unmarshaling the return from CLUSTER SLOTS. The unmarshaled nodes will be
- // sorted before they are returned.
- func (tt *ClusterTopo) UnmarshalRESP(br *bufio.Reader) error {
- var arrHead resp2.ArrayHeader
- if err := arrHead.UnmarshalRESP(br); err != nil {
- return err
- }
- slotSets := make([]topoSlotSet, arrHead.N)
- for i := range slotSets {
- if err := (&(slotSets[i])).UnmarshalRESP(br); err != nil {
- return err
- }
- }
- nodeAddrM := map[string]ClusterNode{}
- for _, tss := range slotSets {
- for _, n := range tss.nodes {
- if existingN, ok := nodeAddrM[n.Addr]; ok {
- existingN.Slots = append(existingN.Slots, n.Slots...)
- nodeAddrM[n.Addr] = existingN
- } else {
- nodeAddrM[n.Addr] = n
- }
- }
- }
- for _, n := range nodeAddrM {
- *tt = append(*tt, n)
- }
- tt.sort()
- return nil
- }
- func (tt ClusterTopo) sort() {
- // first go through each node and make sure the individual slot sets are
- // sorted
- for _, node := range tt {
- sort.Slice(node.Slots, func(i, j int) bool {
- return node.Slots[i][0] < node.Slots[j][0]
- })
- }
- sort.Slice(tt, func(i, j int) bool {
- if tt[i].Slots[0] != tt[j].Slots[0] {
- return tt[i].Slots[0][0] < tt[j].Slots[0][0]
- }
- // we want secondaries to come after, which actually means they should
- // be sorted as greater
- return tt[i].SecondaryOfAddr == ""
- })
- }
- // Map returns the topology as a mapping of node address to its ClusterNode.
- func (tt ClusterTopo) Map() map[string]ClusterNode {
- m := make(map[string]ClusterNode, len(tt))
- for _, t := range tt {
- m[t.Addr] = t
- }
- return m
- }
- // Primaries returns a ClusterTopo instance containing only the primary nodes
- // from the ClusterTopo being called on.
- func (tt ClusterTopo) Primaries() ClusterTopo {
- mtt := make(ClusterTopo, 0, len(tt))
- for _, node := range tt {
- if node.SecondaryOfAddr == "" {
- mtt = append(mtt, node)
- }
- }
- return mtt
- }
- // we only use this type during unmarshalling, the topo Unmarshal method will
- // convert these into ClusterNodes.
- type topoSlotSet struct {
- slots [2]uint16
- nodes []ClusterNode
- }
- func (tss topoSlotSet) MarshalRESP(w io.Writer) error {
- var err error
- marshal := func(m resp.Marshaler) {
- if err == nil {
- err = m.MarshalRESP(w)
- }
- }
- marshal(resp2.ArrayHeader{N: 2 + len(tss.nodes)})
- marshal(resp2.Any{I: tss.slots[0]})
- marshal(resp2.Any{I: tss.slots[1] - 1})
- for _, n := range tss.nodes {
- host, portStr, _ := net.SplitHostPort(n.Addr)
- port, err := strconv.Atoi(portStr)
- if err != nil {
- return err
- }
- node := []interface{}{host, port}
- if n.ID != "" {
- node = append(node, n.ID)
- }
- marshal(resp2.Any{I: node})
- }
- return err
- }
- func (tss *topoSlotSet) UnmarshalRESP(br *bufio.Reader) error {
- var arrHead resp2.ArrayHeader
- if err := arrHead.UnmarshalRESP(br); err != nil {
- return err
- }
- // first two array elements are the slot numbers. We increment the second to
- // preserve inclusive start/exclusive end, which redis doesn't
- for i := range tss.slots {
- if err := (resp2.Any{I: &tss.slots[i]}).UnmarshalRESP(br); err != nil {
- return err
- }
- }
- tss.slots[1]++
- arrHead.N -= len(tss.slots)
- var primaryNode ClusterNode
- for i := 0; i < arrHead.N; i++ {
- var nodeArrHead resp2.ArrayHeader
- if err := nodeArrHead.UnmarshalRESP(br); err != nil {
- return err
- } else if nodeArrHead.N < 2 {
- return fmt.Errorf("expected at least 2 array elements, got %d", nodeArrHead.N)
- }
- var ip resp2.BulkString
- if err := ip.UnmarshalRESP(br); err != nil {
- return err
- }
- var port resp2.Int
- if err := port.UnmarshalRESP(br); err != nil {
- return err
- }
- nodeArrHead.N -= 2
- var id resp2.BulkString
- if nodeArrHead.N > 0 {
- if err := id.UnmarshalRESP(br); err != nil {
- return err
- }
- nodeArrHead.N--
- }
- // discard anything after
- for i := 0; i < nodeArrHead.N; i++ {
- if err := (resp2.Any{}).UnmarshalRESP(br); err != nil {
- return err
- }
- }
- node := ClusterNode{
- Addr: net.JoinHostPort(ip.S, strconv.FormatInt(port.I, 10)),
- ID: id.S,
- Slots: [][2]uint16{tss.slots},
- }
- if i == 0 {
- primaryNode = node
- } else {
- node.SecondaryOfAddr = primaryNode.Addr
- node.SecondaryOfID = primaryNode.ID
- }
- tss.nodes = append(tss.nodes, node)
- }
- return nil
- }
|