// Package server package server provides service interfaces and libraries. // including: // tcp/http server library. // rpc service library with addon functionality. // service discory and registration Logic. // statistic lib. package server import ( "context" "flag" "github.com/opentracing/opentracing-go" "net/http" "net/rpc" "sparrow/pkg/tracing" "time" ) // server is a singleton var serverInstance *Server = nil // Server server type Server struct { // required name string // optional rpcServer *RPCServer // RPC server tcpServer *TCPServer // TCP server httpServer *HTTPServer // HTTP server timerTask TimerTask // timer task udpServer *UDPServer // functions serverManager *ServerManager // service registration&discovery manager rpcClient *RPCClient // rpc client prome *Prometheus } // Init init the server with specific name. func Init(name string) error { if serverInstance == nil { // read config flag.Parse() // read network info readNetInterfaces() // log err := InitLog(name, *confLogLevel) if err != nil { return err } // server instance serverInstance = &Server{ name: name, } // init service manager serverInstance.serverManager, err = NewServerManager(name, *confEtcd) if err != nil { return err } // create RPC client serverInstance.rpcClient, err = NewRPCClient() if err != nil { return err } if *confProme != "" { p := NewPrometheus(name) serverInstance.prome = p go func() { if err = p.Start(*confProme); err != nil { Log.Errorf("prometheus start error:%s", err.Error()) } }() } Log.Infof("server %s init success.", name) } return nil } // RegisterTCPHandler register TCP handler class func RegisterTCPHandler(handler TCPHandler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.tcpServer == nil { if *confTCPHost == "" { return errorf(errMissingFlag, FlagTCPHost) } addr, err := fixHostIp(*confTCPHost) if err != nil { return errorf(errWrongHostAddr, *confTCPHost) } serverInstance.tcpServer = &TCPServer{ addr: addr, handler: handler, useTls: *confUseTls, } } return nil } // RegisterUDPHandler register UDP handler class func RegisterUDPHandler(handler UDPHandler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.udpServer == nil { if *confUDPHost == "" { return errorf(errMissingFlag, FlagUDPHost) } addr, err := fixHostIp(*confUDPHost) if err != nil { return errorf(errWrongHostAddr, *confUDPHost) } serverInstance.udpServer = &UDPServer{ addr: addr, handler: handler, } } return nil } // RegisterHTTPHandler register HTTP handler class func RegisterHTTPHandler(handler http.Handler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.httpServer == nil { if *confHTTPHost == "" { return errorf(errMissingFlag, FlagHTTPHost) } addr, err := fixHostIp(*confHTTPHost) if err != nil { return errorf(errWrongHostAddr, FlagHTTPHost) } serverInstance.httpServer = &HTTPServer{ addr: addr, handler: handler, useHttps: *confUseHttps, } } return nil } // RegisterRPCHandler register RPC handler class func RegisterRPCHandler(rcvr interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.rpcServer == nil { if *confRPCHost == "" { return errorf(errMissingFlag, FlagRPCHost) } addr, err := fixHostIp(*confRPCHost) if err != nil { return errorf(errWrongHostAddr, *confRPCHost) } err = rpc.Register(rcvr) if err != nil { return errorf("Cannot Resgister RPC service: %s", err) } handler := rpcHandler{} serverInstance.rpcServer = &RPCServer{ TCPServer{ addr: addr, handler: &handler, useTls: false, // rpc service do not use tls because it's in internal network }, } } return nil } // RegisterTimerTask register timer task func RegisterTimerTask(task TimerTask) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.timerTask == nil { serverInstance.timerTask = task } return nil } // RPCCallByName rpc call by name func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } return serverInstance.rpcClient.Call(serverName, serverMethod, args, reply) } // RPCCallByHost rpc call by host func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } return serverInstance.rpcClient.CallHost(host, serverMethod, args, reply) } // GetServerHosts get server's hosts by server name and service type func GetServerHosts(serverName string, hostType string) ([]string, error) { if serverInstance == nil { return nil, errorf(errServerNotInit) } return serverInstance.serverManager.GetServerHosts(serverName, hostType) } // GetRPCHost get this server's rpc host func GetRPCHost() string { if serverInstance == nil || serverInstance.rpcServer == nil { return "" } return serverInstance.rpcServer.addr.externalIp } // GetHTTPHost get this server's http host addr func GetHTTPHost() string { if serverInstance == nil || serverInstance.httpServer == nil { return "" } return serverInstance.httpServer.addr.externalIp } // Run start service func Run() error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.tcpServer != nil { err := serverInstance.tcpServer.Start() if err != nil { return err } Log.Info("starting tcp server ... OK") } if serverInstance.httpServer != nil { err := serverInstance.httpServer.Start() if err != nil { return err } Log.Info("starting http server ... OK") } if serverInstance.udpServer != nil { err := serverInstance.udpServer.Start() if err != nil { return err } Log.Infof("starting udp server ... OK") } if serverInstance.rpcServer != nil { err := serverInstance.rpcServer.Start() if err != nil { return err } Log.Info("starting rpc server ... OK") } // server manager update err := serverInstance.serverManager.RegisterServer() if err != nil { Log.Warnf("RegisterServer error: %s", err) } else { Log.Info("RegisterServer Success") } tracer, closer := tracing.Init(serverInstance.name) // opentracing defer closer.Close() opentracing.InitGlobalTracer(tracer) Log.Info("sever launch successfully!") // loop to do something for { err := serverInstance.serverManager.UpdateServerHosts() if err != nil { Log.Errorf("UpdateServerHosts error: %s", err) } else { Log.Info("UpdateServerHosts Success") } //timer task if serverInstance.timerTask != nil { serverInstance.timerTask.DoTask() } time.Sleep(60 * time.Second) } } func addCallCount() { if serverInstance.prome != nil { serverInstance.prome.CallCnt.WithLabelValues(serverInstance.name).Inc() } } func RegisterMetric(metrics ...*Metric) { serverInstance.prome.RegisterMetrics("", metrics...) }