server_manager.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "fmt"
  6. "go.etcd.io/etcd/clientv3"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "golang.org/x/net/context"
  12. )
  13. const (
  14. // EtcdServersPrefix prefix
  15. EtcdServersPrefix = "/knowo/servers/"
  16. EtcdServersPrefixCnt = 2
  17. EnvTCPProxy = "TCP_PROXY_ADDR"
  18. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  19. EnvUDPProxy = "UDP_PROXY_ADDR"
  20. lease = 90
  21. )
  22. type addr struct {
  23. internalIp string
  24. externalIp string
  25. }
  26. // ServerManager server manager
  27. type ServerManager struct {
  28. serverName string
  29. // servername -> hosttype -> hostlist
  30. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  31. mapServers map[string]map[string][]string
  32. etcdHosts []string
  33. cli *clientv3.Client
  34. leaseId clientv3.LeaseID
  35. keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  36. mu sync.Mutex
  37. }
  38. // NewServerManager new server manager
  39. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  40. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  41. if etcd == "" {
  42. return nil, errors.New("no etcd host found!")
  43. }
  44. etcdHosts := strings.Split(etcd, ";")
  45. cli, err := clientv3.New(clientv3.Config{
  46. Endpoints: etcdHosts,
  47. DialTimeout: 5 * time.Second,
  48. })
  49. if err != nil {
  50. return nil, err
  51. }
  52. return &ServerManager{
  53. serverName: name,
  54. cli: cli,
  55. etcdHosts: etcdHosts,
  56. mapServers: make(map[string]map[string][]string),
  57. }, nil
  58. }
  59. // RegisterServer register server to etcd
  60. func (mgr *ServerManager) RegisterServer() error {
  61. if serverInstance == nil {
  62. return errorf(errServerNotInit)
  63. }
  64. ctx := context.Background()
  65. resp, err := mgr.cli.Grant(ctx, lease)
  66. if err != nil {
  67. return err
  68. }
  69. prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
  70. var (
  71. addr *addr
  72. keys []string
  73. )
  74. if serverInstance.tcpsvr != nil {
  75. addr, _ = fixHostIp(*confTCPHost)
  76. envAddr := os.Getenv(EnvTCPProxy)
  77. if envAddr != "" {
  78. addr.externalIp = envAddr
  79. }
  80. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr.externalIp))
  81. }
  82. if serverInstance.rpcsvr != nil {
  83. addr, _ := fixHostIp(*confRPCHost)
  84. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr.externalIp))
  85. }
  86. if serverInstance.udpsvr != nil {
  87. addr, _ = fixHostIp(*confUDPHost)
  88. envAddr := os.Getenv(EnvUDPProxy)
  89. if envAddr != "" {
  90. addr.externalIp = envAddr
  91. }
  92. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr.externalIp))
  93. }
  94. if serverInstance.httpsvr != nil {
  95. addr, _ = fixHostIp(*confHTTPHost)
  96. envAddr := os.Getenv(EnvHTTPProxy)
  97. if envAddr != "" {
  98. addr.externalIp = envAddr
  99. }
  100. keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr.externalIp))
  101. }
  102. for _, key := range keys {
  103. _, err = mgr.cli.Put(ctx, key, "server", clientv3.WithLease(resp.ID))
  104. if err != nil {
  105. return nil
  106. }
  107. mgr.leaseId = resp.ID
  108. leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
  109. if err != nil {
  110. return err
  111. }
  112. mgr.keepAliveChan = leaseRespChan
  113. }
  114. // print common keys info
  115. Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
  116. go func() {
  117. for {
  118. leaseResp := <-mgr.keepAliveChan
  119. Log.Infof("update lease success:%d", leaseResp.TTL)
  120. }
  121. }()
  122. return nil
  123. }
  124. // UpdateServerHosts update server hosts
  125. func (mgr *ServerManager) UpdateServerHosts() error {
  126. if serverInstance == nil {
  127. return errorf(errServerNotInit)
  128. }
  129. prefix := EtcdServersPrefix
  130. response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  131. if err != nil {
  132. return err
  133. }
  134. servers := make(map[string]map[string][]string)
  135. for _, kvs := range response.Kvs {
  136. key := string(kvs.Key)
  137. name := strings.Split(key, "/")[EtcdServersPrefixCnt+1]
  138. host := strings.Split(key, "/")[EtcdServersPrefixCnt+2]
  139. addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3]
  140. if _, ok := servers[name]; !ok {
  141. servers[name] = make(map[string][]string)
  142. }
  143. if _, ok := servers[name][host]; !ok {
  144. servers[name][host] = []string{}
  145. }
  146. servers[name][host] = append(servers[name][host], addr)
  147. }
  148. mgr.mapServers = servers
  149. Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  150. return nil
  151. }
  152. // GetServerHosts get host ips for the server, now return all hosts
  153. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  154. server, ok := mgr.mapServers[serverName]
  155. if !ok {
  156. // try update server hosts mannually
  157. mgr.UpdateServerHosts()
  158. }
  159. server, ok = mgr.mapServers[serverName]
  160. if !ok {
  161. return nil, errorf("no server for %s", serverName)
  162. }
  163. hosts, ok := server[hostType]
  164. if !ok || len(hosts) == 0 {
  165. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  166. }
  167. return hosts, nil
  168. }