server_manager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "fmt"
  6. "go.etcd.io/etcd/clientv3"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "golang.org/x/net/context"
  12. )
  13. const (
  14. // EtcdServersPrefix prefix
  15. EtcdServersPrefix = "/knowo/servers/"
  16. EtcdServersPrefixCnt = 2
  17. EnvTCPProxy = "TCP_PROXY_ADDR"
  18. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  19. EnvUDPProxy = "UDP_PROXY_ADDR"
  20. lease = 90
  21. )
  22. // ServerManager server manager
  23. type ServerManager struct {
  24. serverName string
  25. // servername -> hosttype -> hostlist
  26. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  27. mapServers map[string]map[string][]string
  28. etcdHosts []string
  29. cli *clientv3.Client
  30. leaseId clientv3.LeaseID
  31. keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  32. mu sync.Mutex
  33. }
  34. // NewServerManager new server manager
  35. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  36. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  37. if etcd == "" {
  38. return nil, errors.New("no etcd host found!")
  39. }
  40. etcdHosts := strings.Split(etcd, ";")
  41. cli, err := clientv3.New(clientv3.Config{
  42. Endpoints: etcdHosts,
  43. DialTimeout: 5 * time.Second,
  44. })
  45. if err != nil {
  46. return nil, err
  47. }
  48. return &ServerManager{
  49. serverName: name,
  50. cli: cli,
  51. etcdHosts: etcdHosts,
  52. mapServers: make(map[string]map[string][]string),
  53. }, nil
  54. }
  55. // RegisterServer register server to etcd
  56. func (mgr *ServerManager) RegisterServer() error {
  57. if serverInstance == nil {
  58. return errorf(errServerNotInit)
  59. }
  60. ctx := context.Background()
  61. resp, err := mgr.cli.Grant(ctx, lease)
  62. if err != nil {
  63. return err
  64. }
  65. prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
  66. var (
  67. addr string
  68. keys []string
  69. )
  70. if serverInstance.tcpsvr != nil {
  71. addr := os.Getenv(EnvTCPProxy)
  72. if addr == "" {
  73. addr, _ = fixHostIp(*confTCPHost)
  74. }
  75. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr))
  76. }
  77. if serverInstance.rpcsvr != nil {
  78. addr, _ := fixHostIp(*confRPCHost)
  79. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr))
  80. }
  81. if serverInstance.udpsvr != nil {
  82. addr := os.Getenv(EnvUDPProxy)
  83. if addr == "" {
  84. addr, _ = fixHostIp(*confUDPHost)
  85. }
  86. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr))
  87. }
  88. if serverInstance.httpsvr != nil {
  89. addr := os.Getenv(EnvHTTPProxy)
  90. if addr == "" {
  91. addr, _ = fixHostIp(*confHTTPHost)
  92. }
  93. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr))
  94. }
  95. for _, key := range keys {
  96. _, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID))
  97. if err != nil {
  98. return nil
  99. }
  100. mgr.leaseId = resp.ID
  101. leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
  102. if err != nil {
  103. return err
  104. }
  105. mgr.keepAliveChan = leaseRespChan
  106. }
  107. // print common keys info
  108. Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
  109. go func() {
  110. for leaseResp := range mgr.keepAliveChan {
  111. Log.Infof("update lease success:%d", leaseResp.ID)
  112. }
  113. }()
  114. return nil
  115. }
  116. // UpdateServerHosts update server hosts
  117. func (mgr *ServerManager) UpdateServerHosts() error {
  118. if serverInstance == nil {
  119. return errorf(errServerNotInit)
  120. }
  121. prefix := EtcdServersPrefix
  122. response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  123. if err != nil {
  124. return err
  125. }
  126. servers := make(map[string]map[string][]string)
  127. for _, kvs := range response.Kvs {
  128. key := string(kvs.Key)
  129. name := strings.Split(key, "/")[EtcdServersPrefixCnt+1]
  130. host := strings.Split(key, "/")[EtcdServersPrefixCnt+2]
  131. addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3]
  132. if _, ok := servers[name]; !ok {
  133. servers[name] = make(map[string][]string)
  134. }
  135. if _, ok := servers[name][host];!ok {
  136. servers[name][host] = []string{}
  137. }
  138. servers[name][host] = append(servers[name][host], addr)
  139. }
  140. mgr.mapServers = servers
  141. Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  142. return nil
  143. }
  144. // GetServerHosts get host ips for the server, now return all hosts
  145. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  146. server, ok := mgr.mapServers[serverName]
  147. if !ok {
  148. // try update server hosts mannually
  149. mgr.UpdateServerHosts()
  150. }
  151. server, ok = mgr.mapServers[serverName]
  152. if !ok {
  153. return nil, errorf("no server for %s", serverName)
  154. }
  155. hosts, ok := server[hostType]
  156. if !ok || len(hosts) == 0 {
  157. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  158. }
  159. return hosts, nil
  160. }