// service registration and discovery package server import ( "errors" "fmt" "go.etcd.io/etcd/clientv3" "os" "strings" "sync" "time" "golang.org/x/net/context" ) const ( // EtcdServersPrefix prefix EtcdServersPrefix = "/knowo/servers/" EtcdServersPrefixCnt = 2 EnvTCPProxy = "TCP_PROXY_ADDR" EnvHTTPProxy = "HTTP_PROXY_ADDR" EnvUDPProxy = "UDP_PROXY_ADDR" lease = 90 ) type addr struct { internalIp string externalIp string } // ServerManager server manager type ServerManager struct { serverName string // servername -> hosttype -> hostlist // eg. var hosts []string = mapServers["testserver"]["rpchost"] mapServers map[string]map[string][]string etcdHosts []string cli *clientv3.Client leaseId clientv3.LeaseID keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse mu sync.Mutex } // NewServerManager new server manager // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3 func NewServerManager(name string, etcd string) (*ServerManager, error) { if etcd == "" { return nil, errors.New("no etcd host found!") } etcdHosts := strings.Split(etcd, ";") cli, err := clientv3.New(clientv3.Config{ Endpoints: etcdHosts, DialTimeout: 5 * time.Second, }) if err != nil { return nil, err } return &ServerManager{ serverName: name, cli: cli, etcdHosts: etcdHosts, mapServers: make(map[string]map[string][]string), }, nil } // RegisterServer register server to etcd func (mgr *ServerManager) RegisterServer() error { if serverInstance == nil { return errorf(errServerNotInit) } ctx := context.Background() resp, err := mgr.cli.Grant(ctx, lease) if err != nil { return err } prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName) var ( addr *addr keys []string ) if serverInstance.tcpsvr != nil { addr, _ = fixHostIp(*confTCPHost) envAddr := os.Getenv(EnvTCPProxy) if envAddr != "" { addr.externalIp = envAddr } keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr.externalIp)) } if serverInstance.rpcsvr != nil { addr, _ := fixHostIp(*confRPCHost) keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr.externalIp)) } if serverInstance.udpsvr != nil { addr, _ = fixHostIp(*confUDPHost) envAddr := os.Getenv(EnvUDPProxy) if envAddr != "" { addr.externalIp = envAddr } keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr.externalIp)) } if serverInstance.httpsvr != nil { addr, _ = fixHostIp(*confHTTPHost) envAddr := os.Getenv(EnvHTTPProxy) if envAddr != "" { addr.externalIp = envAddr } keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr.externalIp)) } for _, key := range keys { _, err = mgr.cli.Put(ctx, key, "server", clientv3.WithLease(resp.ID)) if err != nil { return nil } mgr.leaseId = resp.ID leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID) if err != nil { return err } mgr.keepAliveChan = leaseRespChan } // print common keys info Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId) go func() { for { <-mgr.keepAliveChan } }() return nil } // UpdateServerHosts update server hosts func (mgr *ServerManager) UpdateServerHosts() error { if serverInstance == nil { return errorf(errServerNotInit) } prefix := EtcdServersPrefix response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix()) if err != nil { return err } servers := make(map[string]map[string][]string) for _, kvs := range response.Kvs { key := string(kvs.Key) name := strings.Split(key, "/")[EtcdServersPrefixCnt+1] host := strings.Split(key, "/")[EtcdServersPrefixCnt+2] addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3] if _, ok := servers[name]; !ok { servers[name] = make(map[string][]string) } if _, ok := servers[name][host]; !ok { servers[name][host] = []string{} } servers[name][host] = append(servers[name][host], addr) } mgr.mapServers = servers Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers) return nil } // GetServerHosts get host ips for the server, now return all hosts func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) { server, ok := mgr.mapServers[serverName] if !ok { // try update server hosts mannually mgr.UpdateServerHosts() } server, ok = mgr.mapServers[serverName] if !ok { return nil, errorf("no server for %s", serverName) } hosts, ok := server[hostType] if !ok || len(hosts) == 0 { return nil, errorf("no hosts for %s:%s", serverName, hostType) } return hosts, nil }