123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- // Package server package server provides service interfaces and libraries.
- // including:
- // tcp/http server library.
- // rpc service library with addon functionality.
- // service discory and registration Logic.
- // statistic lib.
- package server
- import (
- "context"
- "github.com/opentracing/opentracing-go"
- // "github.com/vharitonsky/iniflags"
- "flag"
- "net/http"
- "net/rpc"
- "sparrow/pkg/tracing"
- "time"
- )
- // server is a singleton
- var serverInstance *Server = nil
- // Server server
- type Server struct {
- // required
- name string
- // optional
- rpcServer *RPCServer // RPC server
- tcpServer *TCPServer // TCP server
- httpServer *HTTPServer // HTTP server
- timerTask TimerTask // timer task
- udpServer *UDPServer
- // functions
- serverManager *ServerManager // service registration&discovery manager
- rpcClient *RPCClient // rpc client
- prome *Prometheus
- }
- // Init init the server with specific name.
- func Init(name string) error {
- if serverInstance == nil {
- // read config
- flag.Parse()
- // read network info
- readNetInterfaces()
- // log
- err := InitLog(name, *confLogLevel)
- if err != nil {
- return err
- }
- // server instance
- serverInstance = &Server{
- name: name,
- }
- // init service manager
- serverInstance.serverManager, err = NewServerManager(name, *confEtcd)
- if err != nil {
- return err
- }
- // create RPC client
- serverInstance.rpcClient, err = NewRPCClient()
- if err != nil {
- return err
- }
- if *confProme != "" {
- p := NewPrometheus(name)
- serverInstance.prome = p
- go func() {
- if err = p.Start(*confProme); err != nil {
- Log.Errorf("prometheus start error:%s", err.Error())
- }
- }()
- }
- Log.Infof("server %s init success.", name)
- }
- return nil
- }
- // RegisterTCPHandler register TCP handler class
- func RegisterTCPHandler(handler TCPHandler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.tcpServer == nil {
- if *confTCPHost == "" {
- return errorf(errMissingFlag, FlagTCPHost)
- }
- addr, err := fixHostIp(*confTCPHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confTCPHost)
- }
- serverInstance.tcpServer = &TCPServer{
- addr: addr,
- handler: handler,
- useTls: *confUseTls,
- }
- }
- return nil
- }
- // RegisterUDPHandler register UDP handler class
- func RegisterUDPHandler(handler UDPHandler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.udpServer == nil {
- if *confUDPHost == "" {
- return errorf(errMissingFlag, FlagUDPHost)
- }
- addr, err := fixHostIp(*confUDPHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confUDPHost)
- }
- serverInstance.udpServer = &UDPServer{
- addr: addr,
- handler: handler,
- }
- }
- return nil
- }
- // RegisterHTTPHandler register HTTP handler class
- func RegisterHTTPHandler(handler http.Handler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.httpServer == nil {
- if *confHTTPHost == "" {
- return errorf(errMissingFlag, FlagHTTPHost)
- }
- addr, err := fixHostIp(*confHTTPHost)
- if err != nil {
- return errorf(errWrongHostAddr, FlagHTTPHost)
- }
- serverInstance.httpServer = &HTTPServer{
- addr: addr,
- handler: handler,
- useHttps: *confUseHttps,
- }
- }
- return nil
- }
- // RegisterRPCHandler register RPC handler class
- func RegisterRPCHandler(rcvr interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.rpcServer == nil {
- if *confRPCHost == "" {
- return errorf(errMissingFlag, FlagRPCHost)
- }
- addr, err := fixHostIp(*confRPCHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confRPCHost)
- }
- err = rpc.Register(rcvr)
- if err != nil {
- return errorf("Cannot Resgister RPC service: %s", err)
- }
- handler := rpcHandler{}
- serverInstance.rpcServer = &RPCServer{
- TCPServer{
- addr: addr,
- handler: &handler,
- useTls: false, // rpc service do not use tls because it's in internal network
- },
- }
- }
- return nil
- }
- // RegisterTimerTask register timer task
- func RegisterTimerTask(task TimerTask) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.timerTask == nil {
- serverInstance.timerTask = task
- }
- return nil
- }
- // RPCCallByName rpc call by name
- func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- return serverInstance.rpcClient.Call(serverName, serverMethod, args, reply)
- }
- // RPCCallByHost rpc call by host
- func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- return serverInstance.rpcClient.CallHost(host, serverMethod, args, reply)
- }
- // GetServerHosts get server's hosts by server name and service type
- func GetServerHosts(serverName string, hostType string) ([]string, error) {
- if serverInstance == nil {
- return nil, errorf(errServerNotInit)
- }
- return serverInstance.serverManager.GetServerHosts(serverName, hostType)
- }
- // GetRPCHost get this server's rpc host
- func GetRPCHost() string {
- if serverInstance == nil || serverInstance.rpcServer == nil {
- return ""
- }
- return serverInstance.rpcServer.addr.externalIp
- }
- // GetHTTPHost get this server's http host addr
- func GetHTTPHost() string {
- if serverInstance == nil || serverInstance.httpServer == nil {
- return ""
- }
- return serverInstance.httpServer.addr.externalIp
- }
- // Run start service
- func Run() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.tcpServer != nil {
- err := serverInstance.tcpServer.Start()
- if err != nil {
- return err
- }
- Log.Info("starting tcp server ... OK")
- }
- if serverInstance.httpServer != nil {
- err := serverInstance.httpServer.Start()
- if err != nil {
- return err
- }
- Log.Info("starting http server ... OK")
- }
- if serverInstance.udpServer != nil {
- err := serverInstance.udpServer.Start()
- if err != nil {
- return err
- }
- Log.Infof("starting udp server ... OK")
- }
- if serverInstance.rpcServer != nil {
- err := serverInstance.rpcServer.Start()
- if err != nil {
- return err
- }
- Log.Info("starting rpc server ... OK")
- }
- // server manager update
- err := serverInstance.serverManager.RegisterServer()
- if err != nil {
- Log.Warnf("RegisterServer error: %s", err)
- } else {
- Log.Info("RegisterServer Success")
- }
- tracer, closer := tracing.Init(serverInstance.name)
- // opentracing
- defer closer.Close()
- opentracing.InitGlobalTracer(tracer)
- Log.Info("sever launch successfully!")
- // loop to do something
- for {
- err := serverInstance.serverManager.UpdateServerHosts()
- if err != nil {
- Log.Errorf("UpdateServerHosts error: %s", err)
- } else {
- Log.Info("UpdateServerHosts Success")
- }
- //timer task
- if serverInstance.timerTask != nil {
- serverInstance.timerTask.DoTask()
- }
- time.Sleep(60 * time.Second)
- }
- }
- func addCallCount() {
- if serverInstance.prome != nil {
- serverInstance.prome.CallCnt.WithLabelValues(serverInstance.name).Inc()
- }
- }
- func RegisterMetric(metrics ...*Metric) {
- serverInstance.prome.RegisterMetrics("", metrics...)
- }
|