// service registration and discovery package server import ( "errors" "fmt" "go.etcd.io/etcd/clientv3" "os" "strings" "sync" "time" "golang.org/x/net/context" ) const ( // EtcdServersPrefix prefix EtcdServersPrefix = "/knowo/servers/" EtcdServersPrefixCnt = 2 EnvTCPProxy = "TCP_PROXY_ADDR" EnvHTTPProxy = "HTTP_PROXY_ADDR" EnvUDPProxy = "UDP_PROXY_ADDR" lease = 90 ) // ServerManager server manager type ServerManager struct { serverName string // servername -> hosttype -> hostlist // eg. var hosts []string = mapServers["testserver"]["rpchost"] mapServers map[string]map[string][]string etcdHosts []string cli *clientv3.Client leaseId clientv3.LeaseID keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse mu sync.Mutex } // NewServerManager new server manager // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3 func NewServerManager(name string, etcd string) (*ServerManager, error) { if etcd == "" { return nil, errors.New("no etcd host found!") } etcdHosts := strings.Split(etcd, ";") cli, err := clientv3.New(clientv3.Config{ Endpoints: etcdHosts, DialTimeout: 5 * time.Second, }) if err != nil { return nil, err } return &ServerManager{ serverName: name, cli:cli, etcdHosts: etcdHosts, mapServers: make(map[string]map[string][]string), }, nil } // RegisterServer register server to etcd func (mgr *ServerManager) RegisterServer() error { if serverInstance == nil { return errorf(errServerNotInit) } ctx := context.Background() resp, err := mgr.cli.Grant(ctx, lease) if err != nil { return err } prefix := fmt.Sprintf("%s%s/",EtcdServersPrefix, mgr.serverName ) var ( addr string key string ) if serverInstance.tcpsvr != nil { addr := os.Getenv(EnvTCPProxy) if addr == "" { addr, _ = fixHostIp(*confTCPHost) } key = fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr) } if serverInstance.rpcsvr != nil { addr, _ := fixHostIp(*confRPCHost) key = fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr) } if serverInstance.udpsvr != nil { addr := os.Getenv(EnvUDPProxy) if addr == "" { addr, _ = fixHostIp(*confUDPHost) } key = fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr) } if serverInstance.httpsvr != nil { addr := os.Getenv(EnvHTTPProxy) if addr == "" { addr, _ = fixHostIp(*confHTTPHost) } key = fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr) } _, err = mgr.cli.Put(ctx, key, addr, clientv3.WithLease(resp.ID)) if err != nil { return nil } mgr.leaseId = resp.ID leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID) if err != nil { return err } mgr.keepAliveChan = leaseRespChan // print common key info Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId) go func() { for leaseResp := range mgr.keepAliveChan { Log.Infof("update lease success:%d", leaseResp.ID) } }() return nil } // UpdateServerHosts update server hosts func (mgr *ServerManager) UpdateServerHosts() error { if serverInstance == nil { return errorf(errServerNotInit) } prefix := EtcdServersPrefix response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return err } servers := make(map[string](map[string][]string)) for _, server := range response.Kvs { name := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt + 1] servers[name] = make(map[string][]string) host := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt + 2] servers[name][host] = []string{} addr := strings.Split(string(server.Key), "/")[EtcdServersPrefixCnt+3] servers[name][host] = append(servers[name][host], addr) } mgr.mapServers = servers Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers) return nil } // GetServerHosts get host ips for the server, now return all hosts func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) { server, ok := mgr.mapServers[serverName] if !ok { // try update server hosts mannually mgr.UpdateServerHosts() } server, ok = mgr.mapServers[serverName] if !ok { return nil, errorf("no server for %s", serverName) } hosts, ok := server[hostType] if !ok || len(hosts) == 0 { return nil, errorf("no hosts for %s:%s", serverName, hostType) } return hosts, nil }