cluster_topo.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package radix
  2. import (
  3. "bufio"
  4. "fmt"
  5. "io"
  6. "net"
  7. "sort"
  8. "strconv"
  9. "github.com/mediocregopher/radix/v3/resp"
  10. "github.com/mediocregopher/radix/v3/resp/resp2"
  11. )
  12. // ClusterNode describes a single node in the cluster at a moment in time.
  13. type ClusterNode struct {
  14. // older versions of redis might not actually send back the id, so it may be
  15. // blank
  16. Addr, ID string
  17. // start is inclusive, end is exclusive
  18. Slots [][2]uint16
  19. // address and id this node is the secondary of, if it's a secondary
  20. SecondaryOfAddr, SecondaryOfID string
  21. }
  22. // ClusterTopo describes the cluster topology at a given moment. It will be
  23. // sorted first by slot number of each node and then by secondary status, so
  24. // primaries will come before secondaries.
  25. type ClusterTopo []ClusterNode
  26. // MarshalRESP implements the resp.Marshaler interface, and will marshal the
  27. // ClusterTopo in the same format as the return from CLUSTER SLOTS.
  28. func (tt ClusterTopo) MarshalRESP(w io.Writer) error {
  29. m := map[[2]uint16]topoSlotSet{}
  30. for _, t := range tt {
  31. for _, slots := range t.Slots {
  32. tss := m[slots]
  33. tss.slots = slots
  34. tss.nodes = append(tss.nodes, t)
  35. m[slots] = tss
  36. }
  37. }
  38. // we sort the topoSlotSets by their slot number so that the order is
  39. // deterministic, mostly so tests pass consistently, I'm not sure if actual
  40. // redis has any contract on the order
  41. allTSS := make([]topoSlotSet, 0, len(m))
  42. for _, tss := range m {
  43. allTSS = append(allTSS, tss)
  44. }
  45. sort.Slice(allTSS, func(i, j int) bool {
  46. return allTSS[i].slots[0] < allTSS[j].slots[0]
  47. })
  48. if err := (resp2.ArrayHeader{N: len(allTSS)}).MarshalRESP(w); err != nil {
  49. return err
  50. }
  51. for _, tss := range allTSS {
  52. if err := tss.MarshalRESP(w); err != nil {
  53. return err
  54. }
  55. }
  56. return nil
  57. }
  58. // UnmarshalRESP implements the resp.Unmarshaler interface, but only supports
  59. // unmarshaling the return from CLUSTER SLOTS. The unmarshaled nodes will be
  60. // sorted before they are returned.
  61. func (tt *ClusterTopo) UnmarshalRESP(br *bufio.Reader) error {
  62. var arrHead resp2.ArrayHeader
  63. if err := arrHead.UnmarshalRESP(br); err != nil {
  64. return err
  65. }
  66. slotSets := make([]topoSlotSet, arrHead.N)
  67. for i := range slotSets {
  68. if err := (&(slotSets[i])).UnmarshalRESP(br); err != nil {
  69. return err
  70. }
  71. }
  72. nodeAddrM := map[string]ClusterNode{}
  73. for _, tss := range slotSets {
  74. for _, n := range tss.nodes {
  75. if existingN, ok := nodeAddrM[n.Addr]; ok {
  76. existingN.Slots = append(existingN.Slots, n.Slots...)
  77. nodeAddrM[n.Addr] = existingN
  78. } else {
  79. nodeAddrM[n.Addr] = n
  80. }
  81. }
  82. }
  83. for _, n := range nodeAddrM {
  84. *tt = append(*tt, n)
  85. }
  86. tt.sort()
  87. return nil
  88. }
  89. func (tt ClusterTopo) sort() {
  90. // first go through each node and make sure the individual slot sets are
  91. // sorted
  92. for _, node := range tt {
  93. sort.Slice(node.Slots, func(i, j int) bool {
  94. return node.Slots[i][0] < node.Slots[j][0]
  95. })
  96. }
  97. sort.Slice(tt, func(i, j int) bool {
  98. if tt[i].Slots[0] != tt[j].Slots[0] {
  99. return tt[i].Slots[0][0] < tt[j].Slots[0][0]
  100. }
  101. // we want secondaries to come after, which actually means they should
  102. // be sorted as greater
  103. return tt[i].SecondaryOfAddr == ""
  104. })
  105. }
  106. // Map returns the topology as a mapping of node address to its ClusterNode.
  107. func (tt ClusterTopo) Map() map[string]ClusterNode {
  108. m := make(map[string]ClusterNode, len(tt))
  109. for _, t := range tt {
  110. m[t.Addr] = t
  111. }
  112. return m
  113. }
  114. // Primaries returns a ClusterTopo instance containing only the primary nodes
  115. // from the ClusterTopo being called on.
  116. func (tt ClusterTopo) Primaries() ClusterTopo {
  117. mtt := make(ClusterTopo, 0, len(tt))
  118. for _, node := range tt {
  119. if node.SecondaryOfAddr == "" {
  120. mtt = append(mtt, node)
  121. }
  122. }
  123. return mtt
  124. }
  125. // we only use this type during unmarshalling, the topo Unmarshal method will
  126. // convert these into ClusterNodes.
  127. type topoSlotSet struct {
  128. slots [2]uint16
  129. nodes []ClusterNode
  130. }
  131. func (tss topoSlotSet) MarshalRESP(w io.Writer) error {
  132. var err error
  133. marshal := func(m resp.Marshaler) {
  134. if err == nil {
  135. err = m.MarshalRESP(w)
  136. }
  137. }
  138. marshal(resp2.ArrayHeader{N: 2 + len(tss.nodes)})
  139. marshal(resp2.Any{I: tss.slots[0]})
  140. marshal(resp2.Any{I: tss.slots[1] - 1})
  141. for _, n := range tss.nodes {
  142. host, portStr, _ := net.SplitHostPort(n.Addr)
  143. port, err := strconv.Atoi(portStr)
  144. if err != nil {
  145. return err
  146. }
  147. node := []interface{}{host, port}
  148. if n.ID != "" {
  149. node = append(node, n.ID)
  150. }
  151. marshal(resp2.Any{I: node})
  152. }
  153. return err
  154. }
  155. func (tss *topoSlotSet) UnmarshalRESP(br *bufio.Reader) error {
  156. var arrHead resp2.ArrayHeader
  157. if err := arrHead.UnmarshalRESP(br); err != nil {
  158. return err
  159. }
  160. // first two array elements are the slot numbers. We increment the second to
  161. // preserve inclusive start/exclusive end, which redis doesn't
  162. for i := range tss.slots {
  163. if err := (resp2.Any{I: &tss.slots[i]}).UnmarshalRESP(br); err != nil {
  164. return err
  165. }
  166. }
  167. tss.slots[1]++
  168. arrHead.N -= len(tss.slots)
  169. var primaryNode ClusterNode
  170. for i := 0; i < arrHead.N; i++ {
  171. var nodeArrHead resp2.ArrayHeader
  172. if err := nodeArrHead.UnmarshalRESP(br); err != nil {
  173. return err
  174. } else if nodeArrHead.N < 2 {
  175. return fmt.Errorf("expected at least 2 array elements, got %d", nodeArrHead.N)
  176. }
  177. var ip resp2.BulkString
  178. if err := ip.UnmarshalRESP(br); err != nil {
  179. return err
  180. }
  181. var port resp2.Int
  182. if err := port.UnmarshalRESP(br); err != nil {
  183. return err
  184. }
  185. nodeArrHead.N -= 2
  186. var id resp2.BulkString
  187. if nodeArrHead.N > 0 {
  188. if err := id.UnmarshalRESP(br); err != nil {
  189. return err
  190. }
  191. nodeArrHead.N--
  192. }
  193. // discard anything after
  194. for i := 0; i < nodeArrHead.N; i++ {
  195. if err := (resp2.Any{}).UnmarshalRESP(br); err != nil {
  196. return err
  197. }
  198. }
  199. node := ClusterNode{
  200. Addr: net.JoinHostPort(ip.S, strconv.FormatInt(port.I, 10)),
  201. ID: id.S,
  202. Slots: [][2]uint16{tss.slots},
  203. }
  204. if i == 0 {
  205. primaryNode = node
  206. } else {
  207. node.SecondaryOfAddr = primaryNode.Addr
  208. node.SecondaryOfID = primaryNode.ID
  209. }
  210. tss.nodes = append(tss.nodes, node)
  211. }
  212. return nil
  213. }