server_manager.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "os"
  6. "strings"
  7. "time"
  8. "github.com/coreos/etcd/client"
  9. "golang.org/x/net/context"
  10. )
  11. const (
  12. // EtcdServersPrefix prefix
  13. EtcdServersPrefix = "/knowo/servers/"
  14. EtcdServersPrefixCnt = 2
  15. EnvTCPProxy = "TCP_PROXY_ADDR"
  16. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  17. )
  18. // ServerManager server manager
  19. type ServerManager struct {
  20. serverName string
  21. // servername -> hosttype -> hostlist
  22. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  23. mapServers map[string](map[string][]string)
  24. etcdHosts []string
  25. }
  26. // NewServerManager new server manager
  27. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  28. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  29. if etcd == "" {
  30. return nil, errors.New("no etcd host found!")
  31. }
  32. return &ServerManager{
  33. serverName: name,
  34. etcdHosts: strings.Split(etcd, ";"),
  35. mapServers: make(map[string](map[string][]string)),
  36. }, nil
  37. }
  38. // RegisterServer register server to etcd
  39. func (mgr *ServerManager) RegisterServer() error {
  40. if serverInstance == nil {
  41. return errorf(errServerNotInit)
  42. }
  43. cfg := client.Config{
  44. Endpoints: mgr.etcdHosts,
  45. Transport: client.DefaultTransport,
  46. // set timeout per request to fail fast when the target endpoint is unavailable
  47. HeaderTimeoutPerRequest: time.Second,
  48. }
  49. c, err := client.New(cfg)
  50. if err != nil {
  51. return err
  52. }
  53. kapi := client.NewKeysAPI(c)
  54. prefix := EtcdServersPrefix + mgr.serverName + "/"
  55. var response *client.Response
  56. opt := &client.SetOptions{TTL: 90 * time.Second}
  57. if serverInstance.tcpsvr != nil {
  58. addr := os.Getenv(EnvTCPProxy)
  59. if addr == "" {
  60. addr, _ = fixHostIp(*confTCPHost)
  61. }
  62. response, err = kapi.Set(context.Background(), prefix+FlagTCPHost+"/"+addr, addr, opt)
  63. }
  64. if serverInstance.rpcsvr != nil {
  65. addr, _ := fixHostIp(*confRPCHost)
  66. response, err = kapi.Set(context.Background(), prefix+FlagRPCHost+"/"+addr, addr, opt)
  67. }
  68. if serverInstance.httpsvr != nil {
  69. addr := os.Getenv(EnvHTTPProxy)
  70. if addr == "" {
  71. addr, _ = fixHostIp(*confHTTPHost)
  72. }
  73. response, err = kapi.Set(context.Background(), prefix+FlagHTTPHost+"/"+addr, addr, opt)
  74. }
  75. if err != nil {
  76. return err
  77. }
  78. // print common key info
  79. Log.Infof("RegisterServer is done. Metadata is %v\n", response)
  80. return nil
  81. }
  82. // UpdateServerHosts update server hosts
  83. func (mgr *ServerManager) UpdateServerHosts() error {
  84. if serverInstance == nil {
  85. return errorf(errServerNotInit)
  86. }
  87. cfg := client.Config{
  88. Endpoints: mgr.etcdHosts,
  89. Transport: client.DefaultTransport,
  90. // set timeout per request to fail fast when the target endpoint is unavailable
  91. HeaderTimeoutPerRequest: time.Second,
  92. }
  93. c, err := client.New(cfg)
  94. if err != nil {
  95. return err
  96. }
  97. kapi := client.NewKeysAPI(c)
  98. prefix := EtcdServersPrefix
  99. opt := &client.GetOptions{Recursive: true}
  100. response, err := kapi.Get(context.Background(), prefix, opt)
  101. if err != nil {
  102. return err
  103. }
  104. servers := make(map[string](map[string][]string))
  105. root := response.Node
  106. if root.Dir != true {
  107. return errorf(errWrongEtcdPath, root.Key)
  108. }
  109. for _, server := range root.Nodes {
  110. if server.Dir != true {
  111. return errorf(errWrongEtcdPath, server.Key)
  112. }
  113. name := strings.Split(server.Key, "/")[EtcdServersPrefixCnt+1]
  114. servers[name] = make(map[string][]string)
  115. for _, hosttype := range server.Nodes {
  116. if hosttype.Dir != true {
  117. return errorf(errWrongEtcdPath, hosttype.Key)
  118. }
  119. host := strings.Split(hosttype.Key, "/")[EtcdServersPrefixCnt+2]
  120. servers[name][host] = []string{}
  121. for _, hostaddr := range hosttype.Nodes {
  122. addr := strings.Split(hostaddr.Key, "/")[EtcdServersPrefixCnt+3]
  123. servers[name][host] = append(servers[name][host], addr)
  124. }
  125. }
  126. }
  127. mgr.mapServers = servers
  128. //Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  129. return nil
  130. }
  131. // GetServerHosts get host ips for the server, now return all hosts
  132. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  133. server, ok := mgr.mapServers[serverName]
  134. if !ok {
  135. // try update server hosts mannually
  136. mgr.UpdateServerHosts()
  137. }
  138. server, ok = mgr.mapServers[serverName]
  139. if !ok {
  140. return nil, errorf("no server for %s", serverName)
  141. }
  142. hosts, ok := server[hostType]
  143. if !ok || len(hosts) == 0 {
  144. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  145. }
  146. return hosts, nil
  147. }