server_manager.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "fmt"
  6. "go.etcd.io/etcd/clientv3"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "golang.org/x/net/context"
  12. )
  13. const (
  14. // EtcdServersPrefix prefix
  15. EtcdServersPrefix = "/knowo/servers/"
  16. EtcdServersPrefixCnt = 2
  17. EnvTCPProxy = "TCP_PROXY_ADDR"
  18. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  19. EnvUDPProxy = "UDP_PROXY_ADDR"
  20. lease = 90
  21. )
  22. type addr struct {
  23. internalIp string
  24. externalIp string
  25. }
  26. // ServerManager server manager
  27. type ServerManager struct {
  28. serverName string
  29. // servername -> hosttype -> hostlist
  30. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  31. mapServers map[string]map[string][]string
  32. etcdHosts []string
  33. cli *clientv3.Client
  34. leaseId clientv3.LeaseID
  35. keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  36. mu sync.Mutex
  37. }
  38. // NewServerManager new server manager
  39. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  40. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  41. if etcd == "" {
  42. return nil, errors.New("no etcd host found!")
  43. }
  44. etcdHosts := strings.Split(etcd, ";")
  45. cli, err := clientv3.New(clientv3.Config{
  46. Endpoints: etcdHosts,
  47. DialTimeout: 5 * time.Second,
  48. })
  49. if err != nil {
  50. return nil, err
  51. }
  52. return &ServerManager{
  53. serverName: name,
  54. cli: cli,
  55. etcdHosts: etcdHosts,
  56. mapServers: make(map[string]map[string][]string),
  57. }, nil
  58. }
  59. // RegisterServer register server to etcd
  60. func (mgr *ServerManager) RegisterServer() error {
  61. if serverInstance == nil {
  62. return errorf(errServerNotInit)
  63. }
  64. ctx := context.Background()
  65. resp, err := mgr.cli.Grant(ctx, lease)
  66. if err != nil {
  67. return err
  68. }
  69. prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
  70. var (
  71. addr *addr
  72. keys []string
  73. )
  74. if serverInstance.tcpsvr != nil {
  75. addr, _ = fixHostIp(*confTCPHost)
  76. envAddr := os.Getenv(EnvTCPProxy)
  77. if envAddr != "" {
  78. addr.externalIp = envAddr
  79. }
  80. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr.externalIp))
  81. }
  82. if serverInstance.rpcsvr != nil {
  83. addr, _ := fixHostIp(*confRPCHost)
  84. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr.externalIp))
  85. }
  86. if serverInstance.udpsvr != nil {
  87. addr, _ = fixHostIp(*confUDPHost)
  88. envAddr := os.Getenv(EnvUDPProxy)
  89. if envAddr != "" {
  90. addr.externalIp = envAddr
  91. }
  92. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr.externalIp))
  93. }
  94. if serverInstance.httpsvr != nil {
  95. addr, _ = fixHostIp(*confHTTPHost)
  96. envAddr := os.Getenv(EnvHTTPProxy)
  97. if envAddr != "" {
  98. addr.externalIp = envAddr
  99. }
  100. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr.externalIp))
  101. }
  102. for _, key := range keys {
  103. _, err = mgr.cli.Put(ctx, key, "server", clientv3.WithLease(resp.ID))
  104. if err != nil {
  105. return nil
  106. }
  107. mgr.leaseId = resp.ID
  108. leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
  109. if err != nil {
  110. return err
  111. }
  112. mgr.keepAliveChan = leaseRespChan
  113. }
  114. // print common keys info
  115. Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
  116. go func() {
  117. for leaseResp := range mgr.keepAliveChan {
  118. Log.Infof("update lease success:%d", leaseResp.ID)
  119. }
  120. }()
  121. return nil
  122. }
  123. // UpdateServerHosts update server hosts
  124. func (mgr *ServerManager) UpdateServerHosts() error {
  125. if serverInstance == nil {
  126. return errorf(errServerNotInit)
  127. }
  128. prefix := EtcdServersPrefix
  129. response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  130. if err != nil {
  131. return err
  132. }
  133. servers := make(map[string]map[string][]string)
  134. for _, kvs := range response.Kvs {
  135. key := string(kvs.Key)
  136. name := strings.Split(key, "/")[EtcdServersPrefixCnt+1]
  137. host := strings.Split(key, "/")[EtcdServersPrefixCnt+2]
  138. addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3]
  139. if _, ok := servers[name]; !ok {
  140. servers[name] = make(map[string][]string)
  141. }
  142. if _, ok := servers[name][host];!ok {
  143. servers[name][host] = []string{}
  144. }
  145. servers[name][host] = append(servers[name][host], addr)
  146. }
  147. mgr.mapServers = servers
  148. Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  149. return nil
  150. }
  151. // GetServerHosts get host ips for the server, now return all hosts
  152. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  153. server, ok := mgr.mapServers[serverName]
  154. if !ok {
  155. // try update server hosts mannually
  156. mgr.UpdateServerHosts()
  157. }
  158. server, ok = mgr.mapServers[serverName]
  159. if !ok {
  160. return nil, errorf("no server for %s", serverName)
  161. }
  162. hosts, ok := server[hostType]
  163. if !ok || len(hosts) == 0 {
  164. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  165. }
  166. return hosts, nil
  167. }