server.go 6.8 KB


  1. // package server provides service interfaces and libraries.
  2. // including:
  3. // tcp/http server library.
  4. // rpc service library with addon functionality.
  5. // service discory and registration Logic.
  6. // statistic lib.
  7. package server
  8. import (
  9. "context"
  10. "github.com/opentracing/opentracing-go"
  11. // "github.com/vharitonsky/iniflags"
  12. "flag"
  13. "net/http"
  14. "net/rpc"
  15. "sparrow/pkg/tracing"
  16. "time"
  17. )
  18. // server is a singleton
  19. var serverInstance *Server = nil
  20. // Server
  21. type Server struct {
  22. // required
  23. name string
  24. // optional
  25. rpcsvr *RPCServer // RPC server
  26. tcpsvr *TCPServer // TCP server
  27. httpsvr *HTTPServer // HTTP server
  28. timertask TimerTask // timer task
  29. udpsvr *UDPServer
  30. // functions
  31. svrmgr *ServerManager // service registration&discovery manager
  32. rpccli *RPCClient // rpc client
  33. }
  34. // init the server with specific name.
  35. func Init(name string) error {
  36. if serverInstance == nil {
  37. // read config
  38. flag.Parse()
  39. // read network info
  40. readNetInterfaces()
  41. // log
  42. err := InitLog(name, *confLogLevel)
  43. if err != nil {
  44. return err
  45. }
  46. // server instance
  47. serverInstance = &Server{
  48. name: name,
  49. }
  50. // init service manager
  51. serverInstance.svrmgr, err = NewServerManager(name, *confEtcd)
  52. if err != nil {
  53. return err
  54. }
  55. // create RPC client
  56. serverInstance.rpccli, err = NewRPCClient()
  57. if err != nil {
  58. return err
  59. }
  60. Log.Infof("server %s init success.", name)
  61. }
  62. return nil
  63. }
  64. // RegisterTCPHandler register TCP handler class
  65. func RegisterTCPHandler(handler TCPHandler) error {
  66. if serverInstance == nil {
  67. return errorf(errServerNotInit)
  68. }
  69. if serverInstance.tcpsvr == nil {
  70. if *confTCPHost == "" {
  71. return errorf(errMissingFlag, FlagTCPHost)
  72. }
  73. addr, err := fixHostIp(*confTCPHost)
  74. if err != nil {
  75. return errorf(errWrongHostAddr, *confTCPHost)
  76. }
  77. serverInstance.tcpsvr = &TCPServer{
  78. addr: addr,
  79. handler: handler,
  80. useTls: *confUseTls,
  81. }
  82. }
  83. return nil
  84. }
  85. // RegisterUDPHandler register UDP handler class
  86. func RegisterUDPHandler(handler UDPHandler) error {
  87. if serverInstance == nil {
  88. return errorf(errServerNotInit)
  89. }
  90. if serverInstance.udpsvr == nil {
  91. if *confUDPHost == "" {
  92. return errorf(errMissingFlag, FlagUDPHost)
  93. }
  94. addr, err := fixHostIp(*confUDPHost)
  95. if err != nil {
  96. return errorf(errWrongHostAddr, *confUDPHost)
  97. }
  98. serverInstance.udpsvr = &UDPServer{
  99. addr: addr,
  100. handler: handler,
  101. }
  102. }
  103. return nil
  104. }
  105. // RegisterHTTPHandler register HTTP handler class
  106. func RegisterHTTPHandler(handler http.Handler) error {
  107. if serverInstance == nil {
  108. return errorf(errServerNotInit)
  109. }
  110. if serverInstance.httpsvr == nil {
  111. if *confHTTPHost == "" {
  112. return errorf(errMissingFlag, FlagHTTPHost)
  113. }
  114. addr, err := fixHostIp(*confHTTPHost)
  115. if err != nil {
  116. return errorf(errWrongHostAddr, FlagHTTPHost)
  117. }
  118. serverInstance.httpsvr = &HTTPServer{
  119. addr: addr,
  120. handler: handler,
  121. useHttps: *confUseHttps,
  122. }
  123. }
  124. return nil
  125. }
  126. // RegisterRPCHandler register RPC handler class
  127. func RegisterRPCHandler(rcvr interface{}) error {
  128. if serverInstance == nil {
  129. return errorf(errServerNotInit)
  130. }
  131. if serverInstance.rpcsvr == nil {
  132. if *confRPCHost == "" {
  133. return errorf(errMissingFlag, FlagRPCHost)
  134. }
  135. addr, err := fixHostIp(*confRPCHost)
  136. if err != nil {
  137. return errorf(errWrongHostAddr, *confRPCHost)
  138. }
  139. err = rpc.Register(rcvr)
  140. if err != nil {
  141. return errorf("Cannot Resgister RPC service: %s", err)
  142. }
  143. handler := rpcHandler{}
  144. serverInstance.rpcsvr = &RPCServer{
  145. TCPServer{
  146. addr: addr,
  147. handler: &handler,
  148. useTls: false, // rpc service do not use tls because it's in internal network
  149. },
  150. }
  151. }
  152. return nil
  153. }
  154. // RegisterTimerTask register timer task
  155. func RegisterTimerTask(task TimerTask) error {
  156. if serverInstance == nil {
  157. return errorf(errServerNotInit)
  158. }
  159. if serverInstance.timertask == nil {
  160. serverInstance.timertask = task
  161. }
  162. return nil
  163. }
  164. // RPCCallByName rpc call by name
  165. func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
  166. if serverInstance == nil {
  167. return errorf(errServerNotInit)
  168. }
  169. //var span opentracing.Span
  170. //
  171. //if ctx != nil {
  172. // sp := opentracing.SpanFromContext(ctx)
  173. // if sp != nil {
  174. // span = opentracing.StartSpan(serverName, opentracing.ChildOf(sp.Context()))
  175. // }
  176. //} else {
  177. // span = opentracing.StartSpan(serverName)
  178. //}
  179. //span.LogFields(
  180. // log.Object("args", args),
  181. //)
  182. //span.SetTag("server.name", serverName)
  183. return serverInstance.rpccli.Call(serverName, serverMethod, args, reply)
  184. }
  185. // RPCCallByHost rpc call by host
  186. func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  187. if serverInstance == nil {
  188. return errorf(errServerNotInit)
  189. }
  190. return serverInstance.rpccli.CallHost(host, serverMethod, args, reply)
  191. }
  192. // GetServerHosts get server's hosts by server name and service type
  193. func GetServerHosts(serverName string, hostType string) ([]string, error) {
  194. if serverInstance == nil {
  195. return nil, errorf(errServerNotInit)
  196. }
  197. return serverInstance.svrmgr.GetServerHosts(serverName, hostType)
  198. }
  199. // GetRPCHost get this server's rpc host
  200. func GetRPCHost() string {
  201. if serverInstance == nil || serverInstance.rpcsvr == nil {
  202. return ""
  203. }
  204. return serverInstance.rpcsvr.addr.externalIp
  205. }
  206. // GetHTTPHost get this server's http host addr
  207. func GetHTTPHost() string {
  208. if serverInstance == nil || serverInstance.httpsvr == nil {
  209. return ""
  210. }
  211. return serverInstance.httpsvr.addr.externalIp
  212. }
  213. // Run start service
  214. func Run() error {
  215. if serverInstance == nil {
  216. return errorf(errServerNotInit)
  217. }
  218. if serverInstance.tcpsvr != nil {
  219. err := serverInstance.tcpsvr.Start()
  220. if err != nil {
  221. return err
  222. }
  223. Log.Info("starting tcp server ... OK")
  224. }
  225. if serverInstance.httpsvr != nil {
  226. err := serverInstance.httpsvr.Start()
  227. if err != nil {
  228. return err
  229. }
  230. Log.Info("starting http server ... OK")
  231. }
  232. if serverInstance.udpsvr != nil {
  233. err := serverInstance.udpsvr.Start()
  234. if err != nil {
  235. return err
  236. }
  237. Log.Infof("starting udp server ... OK")
  238. }
  239. if serverInstance.rpcsvr != nil {
  240. err := serverInstance.rpcsvr.Start()
  241. if err != nil {
  242. return err
  243. }
  244. Log.Info("starting rpc server ... OK")
  245. }
  246. // server manager update
  247. err := serverInstance.svrmgr.RegisterServer()
  248. if err != nil {
  249. Log.Warnf("RegisterServer error: %s", err)
  250. } else {
  251. Log.Info("RegisterServer Success")
  252. }
  253. tracer, closer := tracing.Init(serverInstance.name)
  254. // opentracing
  255. defer closer.Close()
  256. opentracing.InitGlobalTracer(tracer)
  257. Log.Info("sever launch successfully!")
  258. // loop to do something
  259. for {
  260. err := serverInstance.svrmgr.UpdateServerHosts()
  261. if err != nil {
  262. Log.Errorf("UpdateServerHosts error: %s", err)
  263. } else {
  264. Log.Info("UpdateServerHosts Success")
  265. }
  266. //timer task
  267. if serverInstance.timertask != nil {
  268. serverInstance.timertask.DoTask()
  269. }
  270. time.Sleep(60 * time.Second)
  271. }
  272. }