server_manager.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/coreos/etcd/client"
  9. "golang.org/x/net/context"
  10. )
  11. const (
  12. // EtcdServersPrefix prefix
  13. EtcdServersPrefix = "/knowo/servers/"
  14. EtcdServersPrefixCnt = 2
  15. EnvTCPProxy = "TCP_PROXY_ADDR"
  16. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  17. EnvUDPProxy = "UDP_PROXY_ADDR"
  18. )
  19. // ServerManager server manager
  20. type ServerManager struct {
  21. serverName string
  22. // servername -> hosttype -> hostlist
  23. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  24. mapServers map[string](map[string][]string)
  25. etcdHosts []string
  26. }
  27. // NewServerManager new server manager
  28. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  29. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  30. if etcd == "" {
  31. return nil, errors.New("no etcd host found!")
  32. }
  33. return &ServerManager{
  34. serverName: name,
  35. etcdHosts: strings.Split(etcd, ";"),
  36. mapServers: make(map[string](map[string][]string)),
  37. }, nil
  38. }
  39. // RegisterServer register server to etcd
  40. func (mgr *ServerManager) RegisterServer() error {
  41. if serverInstance == nil {
  42. return errorf(errServerNotInit)
  43. }
  44. cfg := client.Config{
  45. Endpoints: mgr.etcdHosts,
  46. Transport: client.DefaultTransport,
  47. // set timeout per request to fail fast when the target endpoint is unavailable
  48. HeaderTimeoutPerRequest: time.Second,
  49. }
  50. c, err := client.New(cfg)
  51. if err != nil {
  52. return err
  53. }
  54. kapi := client.NewKeysAPI(c)
  55. prefix := EtcdServersPrefix + mgr.serverName + "/"
  56. var response *client.Response
  57. opt := &client.SetOptions{TTL: 90 * time.Second}
  58. if serverInstance.tcpsvr != nil {
  59. addr := os.Getenv(EnvTCPProxy)
  60. if addr == "" {
  61. addr, _ = fixHostIp(*confTCPHost)
  62. }
  63. response, err = kapi.Set(context.Background(), prefix+FlagTCPHost+"/"+addr, addr, opt)
  64. }
  65. if serverInstance.rpcsvr != nil {
  66. addr, _ := fixHostIp(*confRPCHost)
  67. response, err = kapi.Set(context.Background(), prefix+FlagRPCHost+"/"+addr, addr, opt)
  68. }
  69. if serverInstance.udpsvr != nil {
  70. addr := os.Getenv(EnvUDPProxy)
  71. if addr == "" {
  72. addr, _ = fixHostIp(*confUDPHost)
  73. }
  74. response, err = kapi.Set(context.Background(), prefix+FlagUDPHost+"/"+addr, addr, opt)
  75. }
  76. if serverInstance.httpsvr != nil {
  77. addr := os.Getenv(EnvHTTPProxy)
  78. if addr == "" {
  79. addr, _ = fixHostIp(*confHTTPHost)
  80. }
  81. response, err = kapi.Set(context.Background(), prefix+FlagHTTPHost+"/"+addr, addr, opt)
  82. }
  83. if err != nil {
  84. return err
  85. }
  86. // print common key info
  87. Log.Infof("RegisterServer is done. Metadata is %v\n", response)
  88. return nil
  89. }
  90. // UpdateServerHosts update server hosts
  91. func (mgr *ServerManager) UpdateServerHosts() error {
  92. if serverInstance == nil {
  93. return errorf(errServerNotInit)
  94. }
  95. cfg := client.Config{
  96. Endpoints: mgr.etcdHosts,
  97. Transport: client.DefaultTransport,
  98. // set timeout per request to fail fast when the target endpoint is unavailable
  99. HeaderTimeoutPerRequest: time.Second,
  100. }
  101. c, err := client.New(cfg)
  102. if err != nil {
  103. return err
  104. }
  105. kapi := client.NewKeysAPI(c)
  106. prefix := EtcdServersPrefix
  107. opt := &client.GetOptions{Recursive: true}
  108. response, err := kapi.Get(context.Background(), prefix, opt)
  109. if err != nil {
  110. return err
  111. }
  112. servers := make(map[string](map[string][]string))
  113. root := response.Node
  114. if root.Dir != true {
  115. return errorf(errWrongEtcdPath, root.Key)
  116. }
  117. for _, server := range root.Nodes {
  118. if server.Dir != true {
  119. return errorf(errWrongEtcdPath, server.Key)
  120. }
  121. name := strings.Split(server.Key, "/")[EtcdServersPrefixCnt+1]
  122. servers[name] = make(map[string][]string)
  123. for _, hosttype := range server.Nodes {
  124. if hosttype.Dir != true {
  125. return errorf(errWrongEtcdPath, hosttype.Key)
  126. }
  127. host := strings.Split(hosttype.Key, "/")[EtcdServersPrefixCnt+2]
  128. servers[name][host] = []string{}
  129. for _, hostaddr := range hosttype.Nodes {
  130. addr := strings.Split(hostaddr.Key, "/")[EtcdServersPrefixCnt+3]
  131. servers[name][host] = append(servers[name][host], addr)
  132. }
  133. }
  134. }
  135. mgr.mapServers = servers
  136. //Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  137. return nil
  138. }
  139. // GetServerHosts get host ips for the server, now return all hosts
  140. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  141. server, ok := mgr.mapServers[serverName]
  142. if !ok {
  143. // try update server hosts mannually
  144. mgr.UpdateServerHosts()
  145. }
  146. server, ok = mgr.mapServers[serverName]
  147. if !ok {
  148. return nil, errorf("no server for %s", serverName)
  149. }
  150. hosts, ok := server[hostType]
  151. if !ok || len(hosts) == 0 {
  152. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  153. }
  154. return hosts, nil
  155. }