server.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // package server provides service interfaces and libraries.
  2. // including:
  3. // tcp/http server library.
  4. // rpc service library with addon functionality.
  5. // service discory and registration Logic.
  6. // statistic lib.
  7. package server
  8. import (
  9. "github.com/opentracing/opentracing-go"
  10. // "github.com/vharitonsky/iniflags"
  11. "flag"
  12. "net/http"
  13. "net/rpc"
  14. "sparrow/pkg/tracing"
  15. "time"
  16. )
  17. // server is a singleton
  18. var serverInstance *Server = nil
  19. // Server
  20. type Server struct {
  21. // required
  22. name string
  23. // optional
  24. rpcsvr *RPCServer // RPC server
  25. tcpsvr *TCPServer // TCP server
  26. httpsvr *HTTPServer // HTTP server
  27. timertask TimerTask // timer task
  28. udpsvr *UDPServer
  29. // functions
  30. svrmgr *ServerManager // service registration&discovery manager
  31. rpccli *RPCClient // rpc client
  32. }
  33. // init the server with specific name.
  34. func Init(name string) error {
  35. if serverInstance == nil {
  36. // read config
  37. flag.Parse()
  38. // read network info
  39. readNetInterfaces()
  40. // log
  41. err := initLog(name, *confLogLevel)
  42. if err != nil {
  43. return err
  44. }
  45. // server instance
  46. serverInstance = &Server{
  47. name: name,
  48. }
  49. // init service manager
  50. serverInstance.svrmgr, err = NewServerManager(name, *confEtcd)
  51. if err != nil {
  52. return err
  53. }
  54. // create RPC client
  55. serverInstance.rpccli, err = NewRPCClient()
  56. if err != nil {
  57. return err
  58. }
  59. Log.Infof("server %s init success.", name)
  60. }
  61. return nil
  62. }
  63. // RegisterTCPHandler register TCP handler class
  64. func RegisterTCPHandler(handler TCPHandler) error {
  65. if serverInstance == nil {
  66. return errorf(errServerNotInit)
  67. }
  68. if serverInstance.tcpsvr == nil {
  69. if *confTCPHost == "" {
  70. return errorf(errMissingFlag, FlagTCPHost)
  71. }
  72. addr, err := fixHostIp(*confTCPHost)
  73. if err != nil {
  74. return errorf(errWrongHostAddr, *confTCPHost)
  75. }
  76. serverInstance.tcpsvr = &TCPServer{
  77. addr: addr,
  78. handler: handler,
  79. useTls: *confUseTls,
  80. }
  81. }
  82. return nil
  83. }
  84. // RegisterUDPHandler register UDP handler class
  85. func RegisterUDPHandler(handler UDPHandler) error {
  86. if serverInstance == nil {
  87. return errorf(errServerNotInit)
  88. }
  89. if serverInstance.udpsvr == nil {
  90. if *confUDPHost == "" {
  91. return errorf(errMissingFlag, FlagUDPHost)
  92. }
  93. addr, err := fixHostIp(*confUDPHost)
  94. if err != nil {
  95. return errorf(errWrongHostAddr, *confUDPHost)
  96. }
  97. serverInstance.udpsvr = &UDPServer{
  98. addr: addr,
  99. handler: handler,
  100. }
  101. }
  102. return nil
  103. }
  104. // RegisterHTTPHandler register HTTP handler class
  105. func RegisterHTTPHandler(handler http.Handler) error {
  106. if serverInstance == nil {
  107. return errorf(errServerNotInit)
  108. }
  109. if serverInstance.httpsvr == nil {
  110. if *confHTTPHost == "" {
  111. return errorf(errMissingFlag, FlagHTTPHost)
  112. }
  113. addr, err := fixHostIp(*confHTTPHost)
  114. if err != nil {
  115. return errorf(errWrongHostAddr, FlagHTTPHost)
  116. }
  117. serverInstance.httpsvr = &HTTPServer{
  118. addr: addr,
  119. handler: handler,
  120. useHttps: *confUseHttps,
  121. }
  122. }
  123. return nil
  124. }
  125. // RegisterRPCHandler register RPC handler class
  126. func RegisterRPCHandler(rcvr interface{}) error {
  127. if serverInstance == nil {
  128. return errorf(errServerNotInit)
  129. }
  130. if serverInstance.rpcsvr == nil {
  131. if *confRPCHost == "" {
  132. return errorf(errMissingFlag, FlagRPCHost)
  133. }
  134. addr, err := fixHostIp(*confRPCHost)
  135. if err != nil {
  136. return errorf(errWrongHostAddr, *confRPCHost)
  137. }
  138. err = rpc.Register(rcvr)
  139. if err != nil {
  140. return errorf("Cannot Resgister RPC service: %s", err)
  141. }
  142. handler := rpcHandler{}
  143. serverInstance.rpcsvr = &RPCServer{
  144. TCPServer{
  145. addr: addr,
  146. handler: &handler,
  147. useTls: false, // rpc service do not use tls because it's in internal network
  148. },
  149. }
  150. }
  151. return nil
  152. }
  153. // RegisterTimerTask register timer task
  154. func RegisterTimerTask(task TimerTask) error {
  155. if serverInstance == nil {
  156. return errorf(errServerNotInit)
  157. }
  158. if serverInstance.timertask == nil {
  159. serverInstance.timertask = task
  160. }
  161. return nil
  162. }
  163. // RPCCallByName rpc call by name
  164. func RPCCallByName(serverName string, serverMethod string, args interface{}, reply interface{}) error {
  165. if serverInstance == nil {
  166. return errorf(errServerNotInit)
  167. }
  168. return serverInstance.rpccli.Call(serverName, serverMethod, args, reply)
  169. }
  170. // RPCCallByHost rpc call by host
  171. func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
  172. if serverInstance == nil {
  173. return errorf(errServerNotInit)
  174. }
  175. return serverInstance.rpccli.CallHost(host, serverMethod, args, reply)
  176. }
  177. // GetServerHosts get server's hosts by server name and service type
  178. func GetServerHosts(serverName string, hostType string) ([]string, error) {
  179. if serverInstance == nil {
  180. return nil, errorf(errServerNotInit)
  181. }
  182. return serverInstance.svrmgr.GetServerHosts(serverName, hostType)
  183. }
  184. // GetRPCHost get this server's rpc host
  185. func GetRPCHost() string {
  186. if serverInstance == nil || serverInstance.rpcsvr == nil {
  187. return ""
  188. }
  189. return serverInstance.rpcsvr.addr
  190. }
  191. // GetHTTPHost get this server's http host addr
  192. func GetHTTPHost() string {
  193. if serverInstance == nil || serverInstance.httpsvr == nil {
  194. return ""
  195. }
  196. return serverInstance.httpsvr.addr
  197. }
  198. // Run start service
  199. func Run() error {
  200. if serverInstance == nil {
  201. return errorf(errServerNotInit)
  202. }
  203. if serverInstance.tcpsvr != nil {
  204. err := serverInstance.tcpsvr.Start()
  205. if err != nil {
  206. return err
  207. }
  208. Log.Info("starting tcp server ... OK")
  209. }
  210. if serverInstance.httpsvr != nil {
  211. err := serverInstance.httpsvr.Start()
  212. if err != nil {
  213. return err
  214. }
  215. Log.Info("starting http server ... OK")
  216. }
  217. if serverInstance.udpsvr != nil {
  218. err := serverInstance.udpsvr.Start()
  219. if err != nil {
  220. return err
  221. }
  222. Log.Infof("starting udp server ... OK")
  223. }
  224. if serverInstance.rpcsvr != nil {
  225. err := serverInstance.rpcsvr.Start()
  226. if err != nil {
  227. return err
  228. }
  229. Log.Info("starting rpc server ... OK")
  230. }
  231. tracer, closer := tracing.Init(serverInstance.name)
  232. // opentracing
  233. defer closer.Close()
  234. opentracing.InitGlobalTracer(tracer)
  235. Log.Info("sever launch successfully!")
  236. // loop to do something
  237. for {
  238. // server manager update
  239. err := serverInstance.svrmgr.RegisterServer()
  240. if err != nil {
  241. Log.Warnf("RegisterServer error: %s", err)
  242. } else {
  243. Log.Info("RegisterServer Success")
  244. }
  245. err = serverInstance.svrmgr.UpdateServerHosts()
  246. if err != nil {
  247. Log.Errorf("UpdateServerHosts error: %s", err)
  248. } else {
  249. Log.Info("UpdateServerHosts Success")
  250. }
  251. //timer task
  252. if serverInstance.timertask != nil {
  253. serverInstance.timertask.DoTask()
  254. }
  255. time.Sleep(60 * time.Second)
  256. }
  257. }