server.go 7.1 KB


  1. // Package server package server provides service interfaces and libraries.
  2. // including:
  3. // tcp/http server library.
  4. // rpc service library with addon functionality.
  5. // service discory and registration Logic.
  6. // statistic lib.
  7. package server
  8. import (
  9. "context"
  10. "github.com/opentracing/opentracing-go"
  11. // "github.com/vharitonsky/iniflags"
  12. "flag"
  13. "net/http"
  14. "net/rpc"
  15. "sparrow/pkg/tracing"
  16. "time"
  17. )
  18. // server is a singleton
  19. var serverInstance *Server = nil
  20. // Server server
  21. type Server struct {
  22. // required
  23. name string
  24. // optional
  25. rpcServer *RPCServer // RPC server
  26. tcpServer *TCPServer // TCP server
  27. httpServer *HTTPServer // HTTP server
  28. timerTask TimerTask // timer task
  29. udpServer *UDPServer
  30. // functions
  31. serverManager *ServerManager // service registration&discovery manager
  32. rpcClient *RPCClient // rpc client
  33. prome *Prometheus
  34. }
  35. // Init init the server with specific name.
  36. func Init(name string) error {
  37. if serverInstance == nil {
  38. // read config
  39. flag.Parse()
  40. // read network info
  41. readNetInterfaces()
  42. // log
  43. err := InitLog(name, *confLogLevel)
  44. if err != nil {
  45. return err
  46. }
  47. // server instance
  48. serverInstance = &Server{
  49. name: name,
  50. }
  51. // init service manager
  52. serverInstance.serverManager, err = NewServerManager(name, *confEtcd)
  53. if err != nil {
  54. return err
  55. }
  56. // create RPC client
  57. serverInstance.rpcClient, err = NewRPCClient()
  58. if err != nil {
  59. return err
  60. }
  61. if *confProme != "" {
  62. p := NewPrometheus(name)
  63. serverInstance.prome = p
  64. go func() {
  65. if err = p.Start(*confProme); err != nil {
  66. Log.Errorf("prometheus start error:%s", err.Error())
  67. }
  68. }()
  69. }
  70. Log.Infof("server %s init success.", name)
  71. }
  72. return nil
  73. }
  74. // RegisterTCPHandler register TCP handler class
  75. func RegisterTCPHandler(handler TCPHandler) error {
  76. if serverInstance == nil {
  77. return errorf(errServerNotInit)
  78. }
  79. if serverInstance.tcpServer == nil {
  80. if *confTCPHost == "" {
  81. return errorf(errMissingFlag, FlagTCPHost)
  82. }
  83. addr, err := fixHostIp(*confTCPHost)
  84. if err != nil {
  85. return errorf(errWrongHostAddr, *confTCPHost)
  86. }
  87. serverInstance.tcpServer = &TCPServer{
  88. addr: addr,
  89. handler: handler,
  90. useTls: *confUseTls,
  91. }
  92. }
  93. return nil
  94. }
  95. // RegisterUDPHandler register UDP handler class
  96. func RegisterUDPHandler(handler UDPHandler) error {
  97. if serverInstance == nil {
  98. return errorf(errServerNotInit)
  99. }
  100. if serverInstance.udpServer == nil {
  101. if *confUDPHost == "" {
  102. return errorf(errMissingFlag, FlagUDPHost)
  103. }
  104. addr, err := fixHostIp(*confUDPHost)
  105. if err != nil {
  106. return errorf(errWrongHostAddr, *confUDPHost)
  107. }
  108. serverInstance.udpServer = &UDPServer{
  109. addr: addr,
  110. handler: handler,
  111. }
  112. }
  113. return nil
  114. }
  115. // RegisterHTTPHandler register HTTP handler class
  116. func RegisterHTTPHandler(handler http.Handler) error {
  117. if serverInstance == nil {
  118. return errorf(errServerNotInit)
  119. }
  120. if serverInstance.httpServer == nil {
  121. if *confHTTPHost == "" {
  122. return errorf(errMissingFlag, FlagHTTPHost)
  123. }
  124. addr, err := fixHostIp(*confHTTPHost)
  125. if err != nil {
  126. return errorf(errWrongHostAddr, FlagHTTPHost)
  127. }
  128. serverInstance.httpServer = &HTTPServer{
  129. addr: addr,
  130. handler: handler,
  131. useHttps: *confUseHttps,
  132. }
  133. }
  134. return nil
  135. }
  136. // RegisterRPCHandler register RPC handler class
  137. func RegisterRPCHandler(rcvr interface{}) error {
  138. if serverInstance == nil {
  139. return errorf(errServerNotInit)
  140. }
  141. if serverInstance.rpcServer == nil {
  142. if *confRPCHost == "" {
  143. return errorf(errMissingFlag, FlagRPCHost)
  144. }
  145. addr, err := fixHostIp(*confRPCHost)
  146. if err != nil {
  147. return errorf(errWrongHostAddr, *confRPCHost)
  148. }
  149. err = rpc.Register(rcvr)
  150. if err != nil {
  151. return errorf("Cannot Resgister RPC service: %s", err)
  152. }
  153. handler := rpcHandler{}
  154. serverInstance.rpcServer = &RPCServer{
  155. TCPServer{
  156. addr: addr,
  157. handler: &handler,
  158. useTls: false, // rpc service do not use tls because it's in internal network
  159. },
  160. }
  161. }
  162. return nil
  163. }
  164. // RegisterTimerTask register timer task
  165. func RegisterTimerTask(task TimerTask) error {
  166. if serverInstance == nil {
  167. return errorf(errServerNotInit)
  168. }
  169. if serverInstance.timerTask == nil {
  170. serverInstance.timerTask = task
  171. }
  172. return nil
  173. }
  174. // RPCCallByName rpc call by name
  175. func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
  176. if serverInstance == nil {
  177. return errorf(errServerNotInit)
  178. }
  179. return serverInstance.rpcClient.Call(serverName, serverMethod, args, reply)
  180. }
  181. // RPCCallByHost rpc call by host
  182. func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  183. if serverInstance == nil {
  184. return errorf(errServerNotInit)
  185. }
  186. return serverInstance.rpcClient.CallHost(host, serverMethod, args, reply)
  187. }
  188. // GetServerHosts get server's hosts by server name and service type
  189. func GetServerHosts(serverName string, hostType string) ([]string, error) {
  190. if serverInstance == nil {
  191. return nil, errorf(errServerNotInit)
  192. }
  193. return serverInstance.serverManager.GetServerHosts(serverName, hostType)
  194. }
  195. // GetRPCHost get this server's rpc host
  196. func GetRPCHost() string {
  197. if serverInstance == nil || serverInstance.rpcServer == nil {
  198. return ""
  199. }
  200. return serverInstance.rpcServer.addr.externalIp
  201. }
  202. // GetHTTPHost get this server's http host addr
  203. func GetHTTPHost() string {
  204. if serverInstance == nil || serverInstance.httpServer == nil {
  205. return ""
  206. }
  207. return serverInstance.httpServer.addr.externalIp
  208. }
  209. // Run start service
  210. func Run() error {
  211. if serverInstance == nil {
  212. return errorf(errServerNotInit)
  213. }
  214. if serverInstance.tcpServer != nil {
  215. err := serverInstance.tcpServer.Start()
  216. if err != nil {
  217. return err
  218. }
  219. Log.Info("starting tcp server ... OK")
  220. }
  221. if serverInstance.httpServer != nil {
  222. err := serverInstance.httpServer.Start()
  223. if err != nil {
  224. return err
  225. }
  226. Log.Info("starting http server ... OK")
  227. }
  228. if serverInstance.udpServer != nil {
  229. err := serverInstance.udpServer.Start()
  230. if err != nil {
  231. return err
  232. }
  233. Log.Infof("starting udp server ... OK")
  234. }
  235. if serverInstance.rpcServer != nil {
  236. err := serverInstance.rpcServer.Start()
  237. if err != nil {
  238. return err
  239. }
  240. Log.Info("starting rpc server ... OK")
  241. }
  242. // server manager update
  243. err := serverInstance.serverManager.RegisterServer()
  244. if err != nil {
  245. Log.Warnf("RegisterServer error: %s", err)
  246. } else {
  247. Log.Info("RegisterServer Success")
  248. }
  249. tracer, closer := tracing.Init(serverInstance.name)
  250. // opentracing
  251. defer closer.Close()
  252. opentracing.InitGlobalTracer(tracer)
  253. Log.Info("sever launch successfully!")
  254. // loop to do something
  255. for {
  256. err := serverInstance.serverManager.UpdateServerHosts()
  257. if err != nil {
  258. Log.Errorf("UpdateServerHosts error: %s", err)
  259. } else {
  260. Log.Info("UpdateServerHosts Success")
  261. }
  262. //timer task
  263. if serverInstance.timerTask != nil {
  264. serverInstance.timerTask.DoTask()
  265. }
  266. time.Sleep(60 * time.Second)
  267. }
  268. }
  269. func addCallCount() {
  270. if serverInstance.prome != nil {
  271. serverInstance.prome.CallCnt.WithLabelValues(serverInstance.name).Inc()
  272. }
  273. }
  274. func RegisterMetric(metrics ...*Metric) {
  275. serverInstance.prome.RegisterMetrics("", metrics...)
  276. }