server_manager.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. // service registration and discovery
  2. package server
  3. import (
  4. "errors"
  5. "github.com/coreos/etcd/client"
  6. "golang.org/x/net/context"
  7. "os"
  8. "strings"
  9. "time"
  10. )
  11. const (
  12. EtcdServersPrefix = "/pando/servers/"
  13. EtcdServersPrefixCnt = 2
  14. EnvTCPProxy = "TCP_PROXY_ADDR"
  15. EnvHTTPProxy = "HTTP_PROXY_ADDR"
  16. )
  17. type ServerManager struct {
  18. serverName string
  19. // servername -> hosttype -> hostlist
  20. // eg. var hosts []string = mapServers["testserver"]["rpchost"]
  21. mapServers map[string](map[string][]string)
  22. etcdHosts []string
  23. }
  24. // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
  25. func NewServerManager(name string, etcd string) (*ServerManager, error) {
  26. if etcd == "" {
  27. return nil, errors.New("no etcd host found!")
  28. }
  29. return &ServerManager{
  30. serverName: name,
  31. etcdHosts: strings.Split(etcd, ";"),
  32. mapServers: make(map[string](map[string][]string)),
  33. }, nil
  34. }
  35. // register server to etcd
  36. func (mgr *ServerManager) RegisterServer() error {
  37. if serverInstance == nil {
  38. return errorf(errServerNotInit)
  39. }
  40. cfg := client.Config{
  41. Endpoints: mgr.etcdHosts,
  42. Transport: client.DefaultTransport,
  43. // set timeout per request to fail fast when the target endpoint is unavailable
  44. HeaderTimeoutPerRequest: time.Second,
  45. }
  46. c, err := client.New(cfg)
  47. if err != nil {
  48. return err
  49. }
  50. kapi := client.NewKeysAPI(c)
  51. prefix := EtcdServersPrefix + mgr.serverName + "/"
  52. var response *client.Response
  53. opt := &client.SetOptions{TTL: 90 * time.Second}
  54. if serverInstance.tcpsvr != nil {
  55. addr := os.Getenv(EnvTCPProxy)
  56. if addr == "" {
  57. addr, _ = fixHostIp(*confTCPHost)
  58. }
  59. response, err = kapi.Set(context.Background(), prefix+FlagTCPHost+"/"+addr, addr, opt)
  60. }
  61. if serverInstance.rpcsvr != nil {
  62. addr, _ := fixHostIp(*confRPCHost)
  63. response, err = kapi.Set(context.Background(), prefix+FlagRPCHost+"/"+addr, addr, opt)
  64. }
  65. if serverInstance.httpsvr != nil {
  66. addr := os.Getenv(EnvHTTPProxy)
  67. if addr == "" {
  68. addr, _ = fixHostIp(*confHTTPHost)
  69. }
  70. response, err = kapi.Set(context.Background(), prefix+FlagHTTPHost+"/"+addr, addr, opt)
  71. }
  72. if err != nil {
  73. return err
  74. }
  75. // print common key info
  76. Log.Infof("RegisterServer is done. Metadata is %q\n", response)
  77. return nil
  78. }
  79. // update server hosts
  80. func (mgr *ServerManager) UpdateServerHosts() error {
  81. if serverInstance == nil {
  82. return errorf(errServerNotInit)
  83. }
  84. cfg := client.Config{
  85. Endpoints: mgr.etcdHosts,
  86. Transport: client.DefaultTransport,
  87. // set timeout per request to fail fast when the target endpoint is unavailable
  88. HeaderTimeoutPerRequest: time.Second,
  89. }
  90. c, err := client.New(cfg)
  91. if err != nil {
  92. return err
  93. }
  94. kapi := client.NewKeysAPI(c)
  95. prefix := EtcdServersPrefix
  96. opt := &client.GetOptions{Recursive: true}
  97. response, err := kapi.Get(context.Background(), prefix, opt)
  98. if err != nil {
  99. return err
  100. }
  101. servers := make(map[string](map[string][]string))
  102. root := response.Node
  103. if root.Dir != true {
  104. return errorf(errWrongEtcdPath, root.Key)
  105. }
  106. for _, server := range root.Nodes {
  107. if server.Dir != true {
  108. return errorf(errWrongEtcdPath, server.Key)
  109. }
  110. name := strings.Split(server.Key, "/")[EtcdServersPrefixCnt+1]
  111. servers[name] = make(map[string][]string)
  112. for _, hosttype := range server.Nodes {
  113. if hosttype.Dir != true {
  114. return errorf(errWrongEtcdPath, hosttype.Key)
  115. }
  116. host := strings.Split(hosttype.Key, "/")[EtcdServersPrefixCnt+2]
  117. servers[name][host] = []string{}
  118. for _, hostaddr := range hosttype.Nodes {
  119. addr := strings.Split(hostaddr.Key, "/")[EtcdServersPrefixCnt+3]
  120. servers[name][host] = append(servers[name][host], addr)
  121. }
  122. }
  123. }
  124. mgr.mapServers = servers
  125. Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
  126. return nil
  127. }
  128. // get host ips for the server, now return all hosts
  129. func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
  130. server, ok := mgr.mapServers[serverName]
  131. if !ok {
  132. // try update server hosts mannually
  133. mgr.UpdateServerHosts()
  134. }
  135. server, ok = mgr.mapServers[serverName]
  136. if !ok {
  137. return nil, errorf("no server for %s", serverName)
  138. }
  139. hosts, ok := server[hostType]
  140. if !ok || len(hosts) == 0 {
  141. return nil, errorf("no hosts for %s:%s", serverName, hostType)
  142. }
  143. return hosts, nil
  144. }