123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- // package server provides service interfaces and libraries.
- // including:
- // tcp/http server library.
- // rpc service library with addon functionality.
- // service discory and registration Logic.
- // statistic lib.
- package server
- import (
- "context"
- "github.com/opentracing/opentracing-go"
- // "github.com/vharitonsky/iniflags"
- "flag"
- "net/http"
- "net/rpc"
- "sparrow/pkg/tracing"
- "time"
- )
- // server is a singleton
- var serverInstance *Server = nil
- // Server
- type Server struct {
- // required
- name string
- // optional
- rpcsvr *RPCServer // RPC server
- tcpsvr *TCPServer // TCP server
- httpsvr *HTTPServer // HTTP server
- timertask TimerTask // timer task
- udpsvr *UDPServer
- // functions
- svrmgr *ServerManager // service registration&discovery manager
- rpccli *RPCClient // rpc client
- }
- // init the server with specific name.
- func Init(name string) error {
- if serverInstance == nil {
- // read config
- flag.Parse()
- // read network info
- readNetInterfaces()
- // log
- err := InitLog(name, *confLogLevel)
- if err != nil {
- return err
- }
- // server instance
- serverInstance = &Server{
- name: name,
- }
- // init service manager
- serverInstance.svrmgr, err = NewServerManager(name, *confEtcd)
- if err != nil {
- return err
- }
- // create RPC client
- serverInstance.rpccli, err = NewRPCClient()
- if err != nil {
- return err
- }
- Log.Infof("server %s init success.", name)
- }
- return nil
- }
- // RegisterTCPHandler register TCP handler class
- func RegisterTCPHandler(handler TCPHandler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.tcpsvr == nil {
- if *confTCPHost == "" {
- return errorf(errMissingFlag, FlagTCPHost)
- }
- addr, err := fixHostIp(*confTCPHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confTCPHost)
- }
- serverInstance.tcpsvr = &TCPServer{
- addr: addr,
- handler: handler,
- useTls: *confUseTls,
- }
- }
- return nil
- }
- // RegisterUDPHandler register UDP handler class
- func RegisterUDPHandler(handler UDPHandler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.udpsvr == nil {
- if *confUDPHost == "" {
- return errorf(errMissingFlag, FlagUDPHost)
- }
- addr, err := fixHostIp(*confUDPHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confUDPHost)
- }
- serverInstance.udpsvr = &UDPServer{
- addr: addr,
- handler: handler,
- }
- }
- return nil
- }
- // RegisterHTTPHandler register HTTP handler class
- func RegisterHTTPHandler(handler http.Handler) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.httpsvr == nil {
- if *confHTTPHost == "" {
- return errorf(errMissingFlag, FlagHTTPHost)
- }
- addr, err := fixHostIp(*confHTTPHost)
- if err != nil {
- return errorf(errWrongHostAddr, FlagHTTPHost)
- }
- serverInstance.httpsvr = &HTTPServer{
- addr: addr,
- handler: handler,
- useHttps: *confUseHttps,
- }
- }
- return nil
- }
- // RegisterRPCHandler register RPC handler class
- func RegisterRPCHandler(rcvr interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.rpcsvr == nil {
- if *confRPCHost == "" {
- return errorf(errMissingFlag, FlagRPCHost)
- }
- addr, err := fixHostIp(*confRPCHost)
- if err != nil {
- return errorf(errWrongHostAddr, *confRPCHost)
- }
- err = rpc.Register(rcvr)
- if err != nil {
- return errorf("Cannot Resgister RPC service: %s", err)
- }
- handler := rpcHandler{}
- serverInstance.rpcsvr = &RPCServer{
- TCPServer{
- addr: addr,
- handler: &handler,
- useTls: false, // rpc service do not use tls because it's in internal network
- },
- }
- }
- return nil
- }
- // RegisterTimerTask register timer task
- func RegisterTimerTask(task TimerTask) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.timertask == nil {
- serverInstance.timertask = task
- }
- return nil
- }
- // RPCCallByName rpc call by name
- func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- //var span opentracing.Span
- //
- //if ctx != nil {
- // sp := opentracing.SpanFromContext(ctx)
- // if sp != nil {
- // span = opentracing.StartSpan(serverName, opentracing.ChildOf(sp.Context()))
- // }
- //} else {
- // span = opentracing.StartSpan(serverName)
- //}
- //span.LogFields(
- // log.Object("args", args),
- //)
- //span.SetTag("server.name", serverName)
- return serverInstance.rpccli.Call(serverName, serverMethod, args, reply)
- }
- // RPCCallByHost rpc call by host
- func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- return serverInstance.rpccli.CallHost(host, serverMethod, args, reply)
- }
- // GetServerHosts get server's hosts by server name and service type
- func GetServerHosts(serverName string, hostType string) ([]string, error) {
- if serverInstance == nil {
- return nil, errorf(errServerNotInit)
- }
- return serverInstance.svrmgr.GetServerHosts(serverName, hostType)
- }
- // GetRPCHost get this server's rpc host
- func GetRPCHost() string {
- if serverInstance == nil || serverInstance.rpcsvr == nil {
- return ""
- }
- return serverInstance.rpcsvr.addr.externalIp
- }
- // GetHTTPHost get this server's http host addr
- func GetHTTPHost() string {
- if serverInstance == nil || serverInstance.httpsvr == nil {
- return ""
- }
- return serverInstance.httpsvr.addr.externalIp
- }
- // Run start service
- func Run() error {
- if serverInstance == nil {
- return errorf(errServerNotInit)
- }
- if serverInstance.tcpsvr != nil {
- err := serverInstance.tcpsvr.Start()
- if err != nil {
- return err
- }
- Log.Info("starting tcp server ... OK")
- }
- if serverInstance.httpsvr != nil {
- err := serverInstance.httpsvr.Start()
- if err != nil {
- return err
- }
- Log.Info("starting http server ... OK")
- }
- if serverInstance.udpsvr != nil {
- err := serverInstance.udpsvr.Start()
- if err != nil {
- return err
- }
- Log.Infof("starting udp server ... OK")
- }
- if serverInstance.rpcsvr != nil {
- err := serverInstance.rpcsvr.Start()
- if err != nil {
- return err
- }
- Log.Info("starting rpc server ... OK")
- }
- // server manager update
- err := serverInstance.svrmgr.RegisterServer()
- if err != nil {
- Log.Warnf("RegisterServer error: %s", err)
- } else {
- Log.Info("RegisterServer Success")
- }
- tracer, closer := tracing.Init(serverInstance.name)
- // opentracing
- defer closer.Close()
- opentracing.InitGlobalTracer(tracer)
- Log.Info("sever launch successfully!")
- // loop to do something
- for {
- err := serverInstance.svrmgr.UpdateServerHosts()
- if err != nil {
- Log.Errorf("UpdateServerHosts error: %s", err)
- } else {
- Log.Info("UpdateServerHosts Success")
- }
- //timer task
- if serverInstance.timertask != nil {
- serverInstance.timertask.DoTask()
- }
- time.Sleep(60 * time.Second)
- }
- }
|