123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // service registration and discovery
- package server
- import (
- "errors"
- "fmt"
- "go.etcd.io/etcd/clientv3"
- "os"
- "strings"
- "sync"
- "time"
- "golang.org/x/net/context"
- )
- const (
- // EtcdServersPrefix prefix
- EtcdServersPrefix = "/knowo/servers/"
- EtcdServersPrefixCnt = 2
- EnvTCPProxy = "TCP_PROXY_ADDR"
- EnvHTTPProxy = "HTTP_PROXY_ADDR"
- EnvUDPProxy = "UDP_PROXY_ADDR"
- lease = 90
- )
- type addr struct {
- internalIp string
- externalIp string
- }
- // ServerManager server manager
- type ServerManager struct {
- serverName string
- // servername -> hosttype -> hostlist
- // eg. var hosts []string = mapServers["testserver"]["rpchost"]
- mapServers map[string]map[string][]string
- etcdHosts []string
- cli *clientv3.Client
- leaseId clientv3.LeaseID
- keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
- mu sync.Mutex
- }
- // NewServerManager new server manager
- // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
- func NewServerManager(name string, etcd string) (*ServerManager, error) {
- if etcd == "" {
- return nil, errors.New("no etcd host found!")
- }
- etcdHosts := strings.Split(etcd, ";")
- cli, err := clientv3.New(clientv3.Config{
- Endpoints: etcdHosts,
- DialTimeout: 5 * time.Second,
- })
- if err != nil {
- return nil, err
- }
- return &ServerManager{
- serverName: name,
- cli: cli,
- etcdHosts: etcdHosts,
- mapServers: make(map[string]map[string][]string),
- }, nil
- }
- // RegisterServer register server to etcd
- func (mgr *ServerManager) RegisterServer() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- ctx := context.Background()
- resp, err := mgr.cli.Grant(ctx, lease)
- if err != nil {
- return err
- }
- prefix := fmt.Sprintf("%s%s/", EtcdServersPrefix, mgr.serverName)
- var (
- addr *addr
- keys []string
- )
- if serverInstance.tcpsvr != nil {
- addr, _ = fixHostIp(*confTCPHost)
- envAddr := os.Getenv(EnvTCPProxy)
- if envAddr != "" {
- addr.externalIp = envAddr
- }
- keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagTCPHost, addr.externalIp))
- }
- if serverInstance.rpcsvr != nil {
- addr, _ := fixHostIp(*confRPCHost)
- keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagRPCHost, addr.externalIp))
- }
- if serverInstance.udpsvr != nil {
- addr, _ = fixHostIp(*confUDPHost)
- envAddr := os.Getenv(EnvUDPProxy)
- if envAddr != "" {
- addr.externalIp = envAddr
- }
- keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagUDPHost, addr.externalIp))
- }
- if serverInstance.httpsvr != nil {
- addr, _ = fixHostIp(*confHTTPHost)
- envAddr := os.Getenv(EnvHTTPProxy)
- if envAddr != "" {
- addr.externalIp = envAddr
- }
- keys = append(keys, fmt.Sprintf("%s%s/%s", prefix, FlagHTTPHost, addr.externalIp))
- }
- for _, key := range keys {
- _, err = mgr.cli.Put(ctx, key, "server", clientv3.WithLease(resp.ID))
- if err != nil {
- return nil
- }
- mgr.leaseId = resp.ID
- leaseRespChan, err := mgr.cli.KeepAlive(ctx, resp.ID)
- if err != nil {
- return err
- }
- mgr.keepAliveChan = leaseRespChan
- }
- // print common keys info
- Log.Infof("RegisterServer is done. leaseId is %v\n", mgr.leaseId)
- go func() {
- for leaseResp := range mgr.keepAliveChan {
- Log.Infof("update lease success:%d", leaseResp.ID)
- }
- }()
- return nil
- }
- // UpdateServerHosts update server hosts
- func (mgr *ServerManager) UpdateServerHosts() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- prefix := EtcdServersPrefix
- response, err := mgr.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
- if err != nil {
- return err
- }
- servers := make(map[string]map[string][]string)
- for _, kvs := range response.Kvs {
- key := string(kvs.Key)
- name := strings.Split(key, "/")[EtcdServersPrefixCnt+1]
- host := strings.Split(key, "/")[EtcdServersPrefixCnt+2]
- addr := strings.Split(key, "/")[EtcdServersPrefixCnt+3]
- if _, ok := servers[name]; !ok {
- servers[name] = make(map[string][]string)
- }
- if _, ok := servers[name][host];!ok {
- servers[name][host] = []string{}
- }
- servers[name][host] = append(servers[name][host], addr)
- }
- mgr.mapServers = servers
- Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
- return nil
- }
- // GetServerHosts get host ips for the server, now return all hosts
- func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
- server, ok := mgr.mapServers[serverName]
- if !ok {
- // try update server hosts mannually
- mgr.UpdateServerHosts()
- }
- server, ok = mgr.mapServers[serverName]
- if !ok {
- return nil, errorf("no server for %s", serverName)
- }
- hosts, ok := server[hostType]
- if !ok || len(hosts) == 0 {
- return nil, errorf("no hosts for %s:%s", serverName, hostType)
- }
- return hosts, nil
- }
|