// package server provides service interfaces and libraries. // including: // tcp/http server library. // rpc service library with addon functionality. // service discory and registration Logic. // statistic lib. package server import ( "context" "github.com/opentracing/opentracing-go" // "github.com/vharitonsky/iniflags" "flag" "net/http" "net/rpc" "sparrow/pkg/tracing" "time" ) // server is a singleton var serverInstance *Server = nil // Server type Server struct { // required name string // optional rpcsvr *RPCServer // RPC server tcpsvr *TCPServer // TCP server httpsvr *HTTPServer // HTTP server timertask TimerTask // timer task udpsvr *UDPServer // functions svrmgr *ServerManager // service registration&discovery manager rpccli *RPCClient // rpc client } // init the server with specific name. func Init(name string) error { if serverInstance == nil { // read config flag.Parse() // read network info readNetInterfaces() // log err := InitLog(name, *confLogLevel) if err != nil { return err } // server instance serverInstance = &Server{ name: name, } // init service manager serverInstance.svrmgr, err = NewServerManager(name, *confEtcd) if err != nil { return err } // create RPC client serverInstance.rpccli, err = NewRPCClient() if err != nil { return err } Log.Infof("server %s init success.", name) } return nil } // RegisterTCPHandler register TCP handler class func RegisterTCPHandler(handler TCPHandler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.tcpsvr == nil { if *confTCPHost == "" { return errorf(errMissingFlag, FlagTCPHost) } addr, err := fixHostIp(*confTCPHost) if err != nil { return errorf(errWrongHostAddr, *confTCPHost) } serverInstance.tcpsvr = &TCPServer{ addr: addr, handler: handler, useTls: *confUseTls, } } return nil } // RegisterUDPHandler register UDP handler class func RegisterUDPHandler(handler UDPHandler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.udpsvr == nil { if *confUDPHost == "" { return errorf(errMissingFlag, FlagUDPHost) } addr, err := fixHostIp(*confUDPHost) if err != nil { return errorf(errWrongHostAddr, *confUDPHost) } serverInstance.udpsvr = &UDPServer{ addr: addr, handler: handler, } } return nil } // RegisterHTTPHandler register HTTP handler class func RegisterHTTPHandler(handler http.Handler) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.httpsvr == nil { if *confHTTPHost == "" { return errorf(errMissingFlag, FlagHTTPHost) } addr, err := fixHostIp(*confHTTPHost) if err != nil { return errorf(errWrongHostAddr, FlagHTTPHost) } serverInstance.httpsvr = &HTTPServer{ addr: addr, handler: handler, useHttps: *confUseHttps, } } return nil } // RegisterRPCHandler register RPC handler class func RegisterRPCHandler(rcvr interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.rpcsvr == nil { if *confRPCHost == "" { return errorf(errMissingFlag, FlagRPCHost) } addr, err := fixHostIp(*confRPCHost) if err != nil { return errorf(errWrongHostAddr, *confRPCHost) } err = rpc.Register(rcvr) if err != nil { return errorf("Cannot Resgister RPC service: %s", err) } handler := rpcHandler{} serverInstance.rpcsvr = &RPCServer{ TCPServer{ addr: addr, handler: &handler, useTls: false, // rpc service do not use tls because it's in internal network }, } } return nil } // RegisterTimerTask register timer task func RegisterTimerTask(task TimerTask) error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.timertask == nil { serverInstance.timertask = task } return nil } // RPCCallByName rpc call by name func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } //var span opentracing.Span // //if ctx != nil { // sp := opentracing.SpanFromContext(ctx) // if sp != nil { // span = opentracing.StartSpan(serverName, opentracing.ChildOf(sp.Context())) // } //} else { // span = opentracing.StartSpan(serverName) //} //span.LogFields( // log.Object("args", args), //) //span.SetTag("server.name", serverName) return serverInstance.rpccli.Call(serverName, serverMethod, args, reply) } // RPCCallByHost rpc call by host func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error { if serverInstance == nil { return errorf(errServerNotInit) } return serverInstance.rpccli.CallHost(host, serverMethod, args, reply) } // GetServerHosts get server's hosts by server name and service type func GetServerHosts(serverName string, hostType string) ([]string, error) { if serverInstance == nil { return nil, errorf(errServerNotInit) } return serverInstance.svrmgr.GetServerHosts(serverName, hostType) } // GetRPCHost get this server's rpc host func GetRPCHost() string { if serverInstance == nil || serverInstance.rpcsvr == nil { return "" } return serverInstance.rpcsvr.addr.externalIp } // GetHTTPHost get this server's http host addr func GetHTTPHost() string { if serverInstance == nil || serverInstance.httpsvr == nil { return "" } return serverInstance.httpsvr.addr.externalIp } // Run start service func Run() error { if serverInstance == nil { return errorf(errServerNotInit) } if serverInstance.tcpsvr != nil { err := serverInstance.tcpsvr.Start() if err != nil { return err } Log.Info("starting tcp server ... OK") } if serverInstance.httpsvr != nil { err := serverInstance.httpsvr.Start() if err != nil { return err } Log.Info("starting http server ... OK") } if serverInstance.udpsvr != nil { err := serverInstance.udpsvr.Start() if err != nil { return err } Log.Infof("starting udp server ... OK") } if serverInstance.rpcsvr != nil { err := serverInstance.rpcsvr.Start() if err != nil { return err } Log.Info("starting rpc server ... OK") } // server manager update err := serverInstance.svrmgr.RegisterServer() if err != nil { Log.Warnf("RegisterServer error: %s", err) } else { Log.Info("RegisterServer Success") } tracer, closer := tracing.Init(serverInstance.name) // opentracing defer closer.Close() opentracing.InitGlobalTracer(tracer) Log.Info("sever launch successfully!") // loop to do something for { err := serverInstance.svrmgr.UpdateServerHosts() if err != nil { Log.Errorf("UpdateServerHosts error: %s", err) } else { Log.Info("UpdateServerHosts Success") } //timer task if serverInstance.timertask != nil { serverInstance.timertask.DoTask() } time.Sleep(60 * time.Second) } }