server.go 6.8 KB


  1. // package server provides service interfaces and libraries.
  2. // including:
  3. // tcp/http server library.
  4. // rpc service library with addon functionality.
  5. // service discory and registration Logic.
  6. // statistic lib.
  7. package server
  8. import (
  9. "context"
  10. "github.com/opentracing/opentracing-go"
  11. "github.com/opentracing/opentracing-go/log"
  12. // "github.com/vharitonsky/iniflags"
  13. "flag"
  14. "net/http"
  15. "net/rpc"
  16. "sparrow/pkg/tracing"
  17. "time"
  18. )
  19. // server is a singleton
  20. var serverInstance *Server = nil
  21. // Server
  22. type Server struct {
  23. // required
  24. name string
  25. // optional
  26. rpcsvr *RPCServer // RPC server
  27. tcpsvr *TCPServer // TCP server
  28. httpsvr *HTTPServer // HTTP server
  29. timertask TimerTask // timer task
  30. udpsvr *UDPServer
  31. // functions
  32. svrmgr *ServerManager // service registration&discovery manager
  33. rpccli *RPCClient // rpc client
  34. }
  35. // init the server with specific name.
  36. func Init(name string) error {
  37. if serverInstance == nil {
  38. // read config
  39. flag.Parse()
  40. // read network info
  41. readNetInterfaces()
  42. // log
  43. err := InitLog(name, *confLogLevel)
  44. if err != nil {
  45. return err
  46. }
  47. // server instance
  48. serverInstance = &Server{
  49. name: name,
  50. }
  51. // init service manager
  52. serverInstance.svrmgr, err = NewServerManager(name, *confEtcd)
  53. if err != nil {
  54. return err
  55. }
  56. // create RPC client
  57. serverInstance.rpccli, err = NewRPCClient()
  58. if err != nil {
  59. return err
  60. }
  61. Log.Infof("server %s init success.", name)
  62. }
  63. return nil
  64. }
  65. // RegisterTCPHandler register TCP handler class
  66. func RegisterTCPHandler(handler TCPHandler) error {
  67. if serverInstance == nil {
  68. return errorf(errServerNotInit)
  69. }
  70. if serverInstance.tcpsvr == nil {
  71. if *confTCPHost == "" {
  72. return errorf(errMissingFlag, FlagTCPHost)
  73. }
  74. addr, err := fixHostIp(*confTCPHost)
  75. if err != nil {
  76. return errorf(errWrongHostAddr, *confTCPHost)
  77. }
  78. serverInstance.tcpsvr = &TCPServer{
  79. addr: addr,
  80. handler: handler,
  81. useTls: *confUseTls,
  82. }
  83. }
  84. return nil
  85. }
  86. // RegisterUDPHandler register UDP handler class
  87. func RegisterUDPHandler(handler UDPHandler) error {
  88. if serverInstance == nil {
  89. return errorf(errServerNotInit)
  90. }
  91. if serverInstance.udpsvr == nil {
  92. if *confUDPHost == "" {
  93. return errorf(errMissingFlag, FlagUDPHost)
  94. }
  95. addr, err := fixHostIp(*confUDPHost)
  96. if err != nil {
  97. return errorf(errWrongHostAddr, *confUDPHost)
  98. }
  99. serverInstance.udpsvr = &UDPServer{
  100. addr: addr,
  101. handler: handler,
  102. }
  103. }
  104. return nil
  105. }
  106. // RegisterHTTPHandler register HTTP handler class
  107. func RegisterHTTPHandler(handler http.Handler) error {
  108. if serverInstance == nil {
  109. return errorf(errServerNotInit)
  110. }
  111. if serverInstance.httpsvr == nil {
  112. if *confHTTPHost == "" {
  113. return errorf(errMissingFlag, FlagHTTPHost)
  114. }
  115. addr, err := fixHostIp(*confHTTPHost)
  116. if err != nil {
  117. return errorf(errWrongHostAddr, FlagHTTPHost)
  118. }
  119. serverInstance.httpsvr = &HTTPServer{
  120. addr: addr,
  121. handler: handler,
  122. useHttps: *confUseHttps,
  123. }
  124. }
  125. return nil
  126. }
  127. // RegisterRPCHandler register RPC handler class
  128. func RegisterRPCHandler(rcvr interface{}) error {
  129. if serverInstance == nil {
  130. return errorf(errServerNotInit)
  131. }
  132. if serverInstance.rpcsvr == nil {
  133. if *confRPCHost == "" {
  134. return errorf(errMissingFlag, FlagRPCHost)
  135. }
  136. addr, err := fixHostIp(*confRPCHost)
  137. if err != nil {
  138. return errorf(errWrongHostAddr, *confRPCHost)
  139. }
  140. err = rpc.Register(rcvr)
  141. if err != nil {
  142. return errorf("Cannot Resgister RPC service: %s", err)
  143. }
  144. handler := rpcHandler{}
  145. serverInstance.rpcsvr = &RPCServer{
  146. TCPServer{
  147. addr: addr,
  148. handler: &handler,
  149. useTls: false, // rpc service do not use tls because it's in internal network
  150. },
  151. }
  152. }
  153. return nil
  154. }
  155. // RegisterTimerTask register timer task
  156. func RegisterTimerTask(task TimerTask) error {
  157. if serverInstance == nil {
  158. return errorf(errServerNotInit)
  159. }
  160. if serverInstance.timertask == nil {
  161. serverInstance.timertask = task
  162. }
  163. return nil
  164. }
  165. // RPCCallByName rpc call by name
  166. func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
  167. if serverInstance == nil {
  168. return errorf(errServerNotInit)
  169. }
  170. var span opentracing.Span
  171. if ctx != nil {
  172. sp := opentracing.SpanFromContext(ctx)
  173. span = opentracing.StartSpan(serverName, opentracing.ChildOf(sp.Context()))
  174. } else {
  175. span = opentracing.StartSpan(serverName)
  176. }
  177. span.LogFields(
  178. log.Object("args", args),
  179. )
  180. span.SetTag("server.name", serverName)
  181. return serverInstance.rpccli.Call(span, serverName, serverMethod, args, reply)
  182. }
  183. // RPCCallByHost rpc call by host
  184. func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  185. if serverInstance == nil {
  186. return errorf(errServerNotInit)
  187. }
  188. return serverInstance.rpccli.CallHost(host, serverMethod, args, reply)
  189. }
  190. // GetServerHosts get server's hosts by server name and service type
  191. func GetServerHosts(serverName string, hostType string) ([]string, error) {
  192. if serverInstance == nil {
  193. return nil, errorf(errServerNotInit)
  194. }
  195. return serverInstance.svrmgr.GetServerHosts(serverName, hostType)
  196. }
  197. // GetRPCHost get this server's rpc host
  198. func GetRPCHost() string {
  199. if serverInstance == nil || serverInstance.rpcsvr == nil {
  200. return ""
  201. }
  202. return serverInstance.rpcsvr.addr.externalIp
  203. }
  204. // GetHTTPHost get this server's http host addr
  205. func GetHTTPHost() string {
  206. if serverInstance == nil || serverInstance.httpsvr == nil {
  207. return ""
  208. }
  209. return serverInstance.httpsvr.addr.externalIp
  210. }
  211. // Run start service
  212. func Run() error {
  213. if serverInstance == nil {
  214. return errorf(errServerNotInit)
  215. }
  216. if serverInstance.tcpsvr != nil {
  217. err := serverInstance.tcpsvr.Start()
  218. if err != nil {
  219. return err
  220. }
  221. Log.Info("starting tcp server ... OK")
  222. }
  223. if serverInstance.httpsvr != nil {
  224. err := serverInstance.httpsvr.Start()
  225. if err != nil {
  226. return err
  227. }
  228. Log.Info("starting http server ... OK")
  229. }
  230. if serverInstance.udpsvr != nil {
  231. err := serverInstance.udpsvr.Start()
  232. if err != nil {
  233. return err
  234. }
  235. Log.Infof("starting udp server ... OK")
  236. }
  237. if serverInstance.rpcsvr != nil {
  238. err := serverInstance.rpcsvr.Start()
  239. if err != nil {
  240. return err
  241. }
  242. Log.Info("starting rpc server ... OK")
  243. }
  244. // server manager update
  245. err := serverInstance.svrmgr.RegisterServer()
  246. if err != nil {
  247. Log.Warnf("RegisterServer error: %s", err)
  248. } else {
  249. Log.Info("RegisterServer Success")
  250. }
  251. tracer, closer := tracing.Init(serverInstance.name)
  252. // opentracing
  253. defer closer.Close()
  254. opentracing.InitGlobalTracer(tracer)
  255. Log.Info("sever launch successfully!")
  256. // loop to do something
  257. for {
  258. err := serverInstance.svrmgr.UpdateServerHosts()
  259. if err != nil {
  260. Log.Errorf("UpdateServerHosts error: %s", err)
  261. } else {
  262. Log.Info("UpdateServerHosts Success")
  263. }
  264. //timer task
  265. if serverInstance.timertask != nil {
  266. serverInstance.timertask.DoTask()
  267. }
  268. time.Sleep(60 * time.Second)
  269. }
  270. }