server_manager.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "fmt"
  6. "go.etcd.io/etcd/clientv3"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. "golang.org/x/net/context"
  12. )
  13. const (
  14. // EtcdServersPrefix prefix
  15. EtcdServersPrefix = "/knowo/servers/"
  16. EtcdServersPrefixCnt = 2
  17. EnvTCPProxy = "TCP_PROXY_ADDR"
  18. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  19. EnvUDPProxy = "UDP_PROXY_ADDR"
  20. lease = 90
  21. )
  22. // ServerManager server manager
  23. type ServerManager struct {
  24. serverName string
  25. // servername -> hosttype -> hostlist
  26. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  27. mapServers map[string]map[string][]string
  28. etcdHosts []string
  29. cli *clientv3.Client
  30. leaseId clientv3.LeaseID
  31. keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
  32. mu sync.Mutex
  33. }
  34. // NewServerManager new server manager
  35. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  36. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  37. if etcd == "" {
  38. return nil, errors.New("no etcd host found!")
  39. }
  40. etcdHosts := strings.Split(etcd, ";")
  41. cli, err := clientv3.New(clientv3.Config{
  42. Endpoints: etcdHosts,
  43. DialTimeout: 5 * time.Second,
  44. })
  45. if err != nil {
  46. return nil, err
  47. }
  48. return &ServerManager{
  49. serverName: name,
  50. cli: cli,
  51. etcdHosts: etcdHosts,
  52. mapServers: make(map[string]map[string][]string),
  53. }, nil
  54. }
  55. // RegisterServer register server to etcd
  56. func (mgr *ServerManager) RegisterServer() error {
  57. if serverInstance == nil {
  58. return errorf(errServerNotInit)
  59. }
  60. ctx := context.Background()
  61. resp, err := mgr.cli.Grant(ctx, lease)
  62. if err != nil {
  63. return err
  64. }
  65. prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
  66. var (
  67. addr string
  68. key string
  69. )
  70. if serverInstance.tcpsvr != nil {
  71. addr := os.Getenv(EnvTCPProxy)
  72. if addr == "" {
  73. addr, _ = fixHostIp(*confTCPHost)
  74. }
  75. key = fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr)
  76. }
  77. if serverInstance.rpcsvr != nil {
  78. addr, _ := fixHostIp(*confRPCHost)
  79. key = fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr)
  80. }
  81. if serverInstance.udpsvr != nil {
  82. addr := os.Getenv(EnvUDPProxy)
  83. if addr == "" {
  84. addr, _ = fixHostIp(*confUDPHost)
  85. }
  86. key = fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr)
  87. }
  88. if serverInstance.httpsvr != nil {
  89. addr := os.Getenv(EnvHTTPProxy)
  90. if addr == "" {
  91. addr, _ = fixHostIp(*confHTTPHost)
  92. }
  93. key = fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr)
  94. }
  95. _, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID))
  96. if err != nil {
  97. return nil
  98. }
  99. mgr.leaseId = resp.ID
  100. leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
  101. if err != nil {
  102. return err
  103. }
  104. mgr.keepAliveChan = leaseRespChan
  105. // print common key info
  106. Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
  107. go func() {
  108. for leaseResp := range mgr.keepAliveChan {
  109. Log.Infof("update lease success:%d", leaseResp.ID)
  110. }
  111. }()
  112. return nil
  113. }
  114. // UpdateServerHosts update server hosts
  115. func (mgr *ServerManager) UpdateServerHosts() error {
  116. if serverInstance == nil {
  117. return errorf(errServerNotInit)
  118. }
  119. prefix := EtcdServersPrefix
  120. response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
  121. if err != nil {
  122. return err
  123. }
  124. servers := make(map[string](map[string][]string))
  125. for _, server := range response.Kvs {
  126. name := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+1]
  127. servers[name] = make(map[string][]string)
  128. host := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+2]
  129. servers[name][host] = []string{}
  130. addr := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+3]
  131. servers[name][host] = append(servers[name][host], addr)
  132. }
  133. mgr.mapServers = servers
  134. Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  135. return nil
  136. }
  137. // GetServerHosts get host ips for the server, now return all hosts
  138. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  139. server, ok := mgr.mapServers[serverName]
  140. if !ok {
  141. // try update server hosts mannually
  142. mgr.UpdateServerHosts()
  143. }
  144. server, ok = mgr.mapServers[serverName]
  145. if !ok {
  146. return nil, errorf("no server for %s", serverName)
  147. }
  148. hosts, ok := server[hostType]
  149. if !ok || len(hosts) == 0 {
  150. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  151. }
  152. return hosts, nil
  153. }