server.go 7.0 KB


  1. // Package server package server provides service interfaces and libraries.
  2. // including:
  3. // tcp/http server library.
  4. // rpc service library with addon functionality.
  5. // service discory and registration Logic.
  6. // statistic lib.
  7. package server
  8. import (
  9. "context"
  10. "flag"
  11. "github.com/opentracing/opentracing-go"
  12. "net/http"
  13. "net/rpc"
  14. "sparrow/pkg/tracing"
  15. "time"
  16. )
  17. // server is a singleton
  18. var serverInstance *Server = nil
  19. // Server server
  20. type Server struct {
  21. // required
  22. name string
  23. // optional
  24. rpcServer *RPCServer // RPC server
  25. tcpServer *TCPServer // TCP server
  26. httpServer *HTTPServer // HTTP server
  27. timerTask TimerTask // timer task
  28. udpServer *UDPServer
  29. // functions
  30. serverManager *ServerManager // service registration&discovery manager
  31. rpcClient *RPCClient // rpc client
  32. prome *Prometheus
  33. }
  34. // Init init the server with specific name.
  35. func Init(name string) error {
  36. if serverInstance == nil {
  37. // read config
  38. flag.Parse()
  39. // read network info
  40. readNetInterfaces()
  41. // log
  42. err := InitLog(name, *confLogLevel)
  43. if err != nil {
  44. return err
  45. }
  46. // server instance
  47. serverInstance = &Server{
  48. name: name,
  49. }
  50. // init service manager
  51. serverInstance.serverManager, err = NewServerManager(name, *confEtcd)
  52. if err != nil {
  53. return err
  54. }
  55. // create RPC client
  56. serverInstance.rpcClient, err = NewRPCClient()
  57. if err != nil {
  58. return err
  59. }
  60. if *confProme != "" {
  61. p := NewPrometheus(name)
  62. serverInstance.prome = p
  63. go func() {
  64. if err = p.Start(*confProme); err != nil {
  65. Log.Errorf("prometheus start error:%s", err.Error())
  66. }
  67. }()
  68. }
  69. Log.Infof("server %s init success.", name)
  70. }
  71. return nil
  72. }
  73. // RegisterTCPHandler register TCP handler class
  74. func RegisterTCPHandler(handler TCPHandler) error {
  75. if serverInstance == nil {
  76. return errorf(errServerNotInit)
  77. }
  78. if serverInstance.tcpServer == nil {
  79. if *confTCPHost == "" {
  80. return errorf(errMissingFlag, FlagTCPHost)
  81. }
  82. addr, err := fixHostIp(*confTCPHost)
  83. if err != nil {
  84. return errorf(errWrongHostAddr, *confTCPHost)
  85. }
  86. serverInstance.tcpServer = &TCPServer{
  87. addr: addr,
  88. handler: handler,
  89. useTls: *confUseTls,
  90. }
  91. }
  92. return nil
  93. }
  94. // RegisterUDPHandler register UDP handler class
  95. func RegisterUDPHandler(handler UDPHandler) error {
  96. if serverInstance == nil {
  97. return errorf(errServerNotInit)
  98. }
  99. if serverInstance.udpServer == nil {
  100. if *confUDPHost == "" {
  101. return errorf(errMissingFlag, FlagUDPHost)
  102. }
  103. addr, err := fixHostIp(*confUDPHost)
  104. if err != nil {
  105. return errorf(errWrongHostAddr, *confUDPHost)
  106. }
  107. serverInstance.udpServer = &UDPServer{
  108. addr: addr,
  109. handler: handler,
  110. }
  111. }
  112. return nil
  113. }
  114. // RegisterHTTPHandler register HTTP handler class
  115. func RegisterHTTPHandler(handler http.Handler) error {
  116. if serverInstance == nil {
  117. return errorf(errServerNotInit)
  118. }
  119. if serverInstance.httpServer == nil {
  120. if *confHTTPHost == "" {
  121. return errorf(errMissingFlag, FlagHTTPHost)
  122. }
  123. addr, err := fixHostIp(*confHTTPHost)
  124. if err != nil {
  125. return errorf(errWrongHostAddr, FlagHTTPHost)
  126. }
  127. serverInstance.httpServer = &HTTPServer{
  128. addr: addr,
  129. handler: handler,
  130. useHttps: *confUseHttps,
  131. }
  132. }
  133. return nil
  134. }
  135. // RegisterRPCHandler register RPC handler class
  136. func RegisterRPCHandler(rcvr interface{}) error {
  137. if serverInstance == nil {
  138. return errorf(errServerNotInit)
  139. }
  140. if serverInstance.rpcServer == nil {
  141. if *confRPCHost == "" {
  142. return errorf(errMissingFlag, FlagRPCHost)
  143. }
  144. addr, err := fixHostIp(*confRPCHost)
  145. if err != nil {
  146. return errorf(errWrongHostAddr, *confRPCHost)
  147. }
  148. err = rpc.Register(rcvr)
  149. if err != nil {
  150. return errorf("Cannot Resgister RPC service: %s", err)
  151. }
  152. handler := rpcHandler{}
  153. serverInstance.rpcServer = &RPCServer{
  154. TCPServer{
  155. addr: addr,
  156. handler: &handler,
  157. useTls: false, // rpc service do not use tls because it's in internal network
  158. },
  159. }
  160. }
  161. return nil
  162. }
  163. // RegisterTimerTask register timer task
  164. func RegisterTimerTask(task TimerTask) error {
  165. if serverInstance == nil {
  166. return errorf(errServerNotInit)
  167. }
  168. if serverInstance.timerTask == nil {
  169. serverInstance.timerTask = task
  170. }
  171. return nil
  172. }
  173. // RPCCallByName rpc call by name
  174. func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
  175. if serverInstance == nil {
  176. return errorf(errServerNotInit)
  177. }
  178. return serverInstance.rpcClient.Call(serverName, serverMethod, args, reply)
  179. }
  180. // RPCCallByHost rpc call by host
  181. func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  182. if serverInstance == nil {
  183. return errorf(errServerNotInit)
  184. }
  185. return serverInstance.rpcClient.CallHost(host, serverMethod, args, reply)
  186. }
  187. // GetServerHosts get server's hosts by server name and service type
  188. func GetServerHosts(serverName string, hostType string) ([]string, error) {
  189. if serverInstance == nil {
  190. return nil, errorf(errServerNotInit)
  191. }
  192. return serverInstance.serverManager.GetServerHosts(serverName, hostType)
  193. }
  194. // GetRPCHost get this server's rpc host
  195. func GetRPCHost() string {
  196. if serverInstance == nil || serverInstance.rpcServer == nil {
  197. return ""
  198. }
  199. return serverInstance.rpcServer.addr.externalIp
  200. }
  201. // GetHTTPHost get this server's http host addr
  202. func GetHTTPHost() string {
  203. if serverInstance == nil || serverInstance.httpServer == nil {
  204. return ""
  205. }
  206. return serverInstance.httpServer.addr.externalIp
  207. }
  208. // Run start service
  209. func Run() error {
  210. if serverInstance == nil {
  211. return errorf(errServerNotInit)
  212. }
  213. if serverInstance.tcpServer != nil {
  214. err := serverInstance.tcpServer.Start()
  215. if err != nil {
  216. return err
  217. }
  218. Log.Info("starting tcp server ... OK")
  219. }
  220. if serverInstance.httpServer != nil {
  221. err := serverInstance.httpServer.Start()
  222. if err != nil {
  223. return err
  224. }
  225. Log.Info("starting http server ... OK")
  226. }
  227. if serverInstance.udpServer != nil {
  228. err := serverInstance.udpServer.Start()
  229. if err != nil {
  230. return err
  231. }
  232. Log.Infof("starting udp server ... OK")
  233. }
  234. if serverInstance.rpcServer != nil {
  235. err := serverInstance.rpcServer.Start()
  236. if err != nil {
  237. return err
  238. }
  239. Log.Info("starting rpc server ... OK")
  240. }
  241. // server manager update
  242. err := serverInstance.serverManager.RegisterServer()
  243. if err != nil {
  244. Log.Warnf("RegisterServer error: %s", err)
  245. } else {
  246. Log.Info("RegisterServer Success")
  247. }
  248. tracer, closer := tracing.Init(serverInstance.name)
  249. // opentracing
  250. defer closer.Close()
  251. opentracing.InitGlobalTracer(tracer)
  252. Log.Info("sever launch successfully!")
  253. // loop to do something
  254. for {
  255. err := serverInstance.serverManager.UpdateServerHosts()
  256. if err != nil {
  257. Log.Errorf("UpdateServerHosts error: %s", err)
  258. } else {
  259. Log.Info("UpdateServerHosts Success")
  260. }
  261. //timer task
  262. if serverInstance.timerTask != nil {
  263. serverInstance.timerTask.DoTask()
  264. }
  265. time.Sleep(60 * time.Second)
  266. }
  267. }
  268. func addCallCount() {
  269. if serverInstance.prome != nil {
  270. serverInstance.prome.CallCnt.WithLabelValues(serverInstance.name).Inc()
  271. }
  272. }
  273. func RegisterMetric(metrics ...*Metric) {
  274. serverInstance.prome.RegisterMetrics("", metrics...)
  275. }