123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- // service registration and discovery
- package server
- import (
- "errors"
- "github.com/coreos/etcd/client"
- "golang.org/x/net/context"
- "os"
- "strings"
- "time"
- )
- const (
- EtcdServersPrefix = "/pando/servers/"
- EtcdServersPrefixCnt = 2
- EnvTCPProxy = "TCP_PROXY_ADDR"
- EnvHTTPProxy = "HTTP_PROXY_ADDR"
- )
- type ServerManager struct {
- serverName string
- // servername -> hosttype -> hostlist
- // eg. var hosts []string = mapServers["testserver"]["rpchost"]
- mapServers map[string](map[string][]string)
- etcdHosts []string
- }
- // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
- func NewServerManager(name string, etcd string) (*ServerManager, error) {
- if etcd == "" {
- return nil, errors.New("no etcd host found!")
- }
- return &ServerManager{
- serverName: name,
- etcdHosts: strings.Split(etcd, ";"),
- mapServers: make(map[string](map[string][]string)),
- }, nil
- }
- // register server to etcd
- func (mgr *ServerManager) RegisterServer() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- cfg := client.Config{
- Endpoints: mgr.etcdHosts,
- Transport: client.DefaultTransport,
- // set timeout per request to fail fast when the target endpoint is unavailable
- HeaderTimeoutPerRequest: time.Second,
- }
- c, err := client.New(cfg)
- if err != nil {
- return err
- }
- kapi := client.NewKeysAPI(c)
- prefix := EtcdServersPrefix + mgr.serverName + "/"
- var response *client.Response
- opt := &client.SetOptions{TTL: 90 * time.Second}
- if serverInstance.tcpsvr != nil {
- addr := os.Getenv(EnvTCPProxy)
- if addr == "" {
- addr, _ = fixHostIp(*confTCPHost)
- }
- response, err = kapi.Set(context.Background(), prefix+FlagTCPHost+"/"+addr, addr, opt)
- }
- if serverInstance.rpcsvr != nil {
- addr, _ := fixHostIp(*confRPCHost)
- response, err = kapi.Set(context.Background(), prefix+FlagRPCHost+"/"+addr, addr, opt)
- }
- if serverInstance.httpsvr != nil {
- addr := os.Getenv(EnvHTTPProxy)
- if addr == "" {
- addr, _ = fixHostIp(*confHTTPHost)
- }
- response, err = kapi.Set(context.Background(), prefix+FlagHTTPHost+"/"+addr, addr, opt)
- }
- if err != nil {
- return err
- }
- // print common key info
- Log.Infof("RegisterServer is done. Metadata is %q\n", response)
- return nil
- }
- // update server hosts
- func (mgr *ServerManager) UpdateServerHosts() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- cfg := client.Config{
- Endpoints: mgr.etcdHosts,
- Transport: client.DefaultTransport,
- // set timeout per request to fail fast when the target endpoint is unavailable
- HeaderTimeoutPerRequest: time.Second,
- }
- c, err := client.New(cfg)
- if err != nil {
- return err
- }
- kapi := client.NewKeysAPI(c)
- prefix := EtcdServersPrefix
- opt := &client.GetOptions{Recursive: true}
- response, err := kapi.Get(context.Background(), prefix, opt)
- if err != nil {
- return err
- }
- servers := make(map[string](map[string][]string))
- root := response.Node
- if root.Dir != true {
- return errorf(errWrongEtcdPath, root.Key)
- }
- for _, server := range root.Nodes {
- if server.Dir != true {
- return errorf(errWrongEtcdPath, server.Key)
- }
- name := strings.Split(server.Key, "/")[EtcdServersPrefixCnt+1]
- servers[name] = make(map[string][]string)
- for _, hosttype := range server.Nodes {
- if hosttype.Dir != true {
- return errorf(errWrongEtcdPath, hosttype.Key)
- }
- host := strings.Split(hosttype.Key, "/")[EtcdServersPrefixCnt+2]
- servers[name][host] = []string{}
- for _, hostaddr := range hosttype.Nodes {
- addr := strings.Split(hostaddr.Key, "/")[EtcdServersPrefixCnt+3]
- servers[name][host] = append(servers[name][host], addr)
- }
- }
- }
- mgr.mapServers = servers
- Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
- return nil
- }
- // get host ips for the server, now return all hosts
- func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
- server, ok := mgr.mapServers[serverName]
- if !ok {
- // try update server hosts mannually
- mgr.UpdateServerHosts()
- }
- server, ok = mgr.mapServers[serverName]
- if !ok {
- return nil, errorf("no server for %s", serverName)
- }
- hosts, ok := server[hostType]
- if !ok || len(hosts) == 0 {
- return nil, errorf("no hosts for %s:%s", serverName, hostType)
- }
- return hosts, nil
- }
|