ghttp_server.go 18 KB


  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package ghttp
  7. import (
  8. "bytes"
  9. "context"
  10. "fmt"
  11. "net/http"
  12. "os"
  13. "runtime"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/olekukonko/tablewriter"
  18. "github.com/gogf/gf/v2/container/garray"
  19. "github.com/gogf/gf/v2/container/gset"
  20. "github.com/gogf/gf/v2/container/gtype"
  21. "github.com/gogf/gf/v2/debug/gdebug"
  22. "github.com/gogf/gf/v2/errors/gcode"
  23. "github.com/gogf/gf/v2/errors/gerror"
  24. "github.com/gogf/gf/v2/internal/intlog"
  25. "github.com/gogf/gf/v2/net/ghttp/internal/swaggerui"
  26. "github.com/gogf/gf/v2/net/goai"
  27. "github.com/gogf/gf/v2/net/gsvc"
  28. "github.com/gogf/gf/v2/os/gcache"
  29. "github.com/gogf/gf/v2/os/gctx"
  30. "github.com/gogf/gf/v2/os/genv"
  31. "github.com/gogf/gf/v2/os/gfile"
  32. "github.com/gogf/gf/v2/os/glog"
  33. "github.com/gogf/gf/v2/os/gproc"
  34. "github.com/gogf/gf/v2/os/gsession"
  35. "github.com/gogf/gf/v2/os/gtimer"
  36. "github.com/gogf/gf/v2/text/gregex"
  37. "github.com/gogf/gf/v2/text/gstr"
  38. "github.com/gogf/gf/v2/util/gconv"
  39. )
  40. func init() {
  41. // Initialize the method map.
  42. for _, v := range strings.Split(supportedHttpMethods, ",") {
  43. methodsMap[v] = struct{}{}
  44. }
  45. }
  46. // serverProcessInit initializes some process configurations, which can only be done once.
  47. func serverProcessInit() {
  48. var ctx = context.TODO()
  49. if !serverProcessInitialized.Cas(false, true) {
  50. return
  51. }
  52. // This means it is a restart server. It should kill its parent before starting its listening,
  53. // to avoid duplicated port listening in two processes.
  54. if !genv.Get(adminActionRestartEnvKey).IsEmpty() {
  55. if p, err := os.FindProcess(gproc.PPid()); err == nil {
  56. if err = p.Kill(); err != nil {
  57. intlog.Errorf(ctx, `%+v`, err)
  58. }
  59. if _, err = p.Wait(); err != nil {
  60. intlog.Errorf(ctx, `%+v`, err)
  61. }
  62. } else {
  63. glog.Error(ctx, err)
  64. }
  65. }
  66. // Process message handler.
  67. // It enabled only a graceful feature is enabled.
  68. if gracefulEnabled {
  69. intlog.Printf(ctx, "pid[%d]: graceful reload feature is enabled", gproc.Pid())
  70. go handleProcessMessage()
  71. } else {
  72. intlog.Printf(ctx, "pid[%d]: graceful reload feature is disabled", gproc.Pid())
  73. }
  74. // It's an ugly calling for better initializing the main package path
  75. // in source development environment. It is useful only be used in main goroutine.
  76. // It fails to retrieve the main package path in asynchronous goroutines.
  77. gfile.MainPkgPath()
  78. }
  79. // GetServer creates and returns a server instance using given name and default configurations.
  80. // Note that the parameter `name` should be unique for different servers. It returns an existing
  81. // server instance if given `name` is already existing in the server mapping.
  82. func GetServer(name ...interface{}) *Server {
  83. serverName := DefaultServerName
  84. if len(name) > 0 && name[0] != "" {
  85. serverName = gconv.String(name[0])
  86. }
  87. v := serverMapping.GetOrSetFuncLock(serverName, func() interface{} {
  88. s := &Server{
  89. instance: serverName,
  90. plugins: make([]Plugin, 0),
  91. servers: make([]*gracefulServer, 0),
  92. closeChan: make(chan struct{}, 10000),
  93. serverCount: gtype.NewInt(),
  94. statusHandlerMap: make(map[string][]HandlerFunc),
  95. serveTree: make(map[string]interface{}),
  96. serveCache: gcache.New(),
  97. routesMap: make(map[string][]*HandlerItem),
  98. openapi: goai.New(),
  99. registrar: gsvc.GetRegistry(),
  100. }
  101. // Initialize the server using default configurations.
  102. if err := s.SetConfig(NewConfig()); err != nil {
  103. panic(gerror.WrapCode(gcode.CodeInvalidConfiguration, err, ""))
  104. }
  105. // It enables OpenTelemetry for server in default.
  106. s.Use(internalMiddlewareServerTracing)
  107. return s
  108. })
  109. return v.(*Server)
  110. }
  111. // Start starts listening on configured port.
  112. // This function does not block the process, you can use function Wait blocking the process.
  113. func (s *Server) Start() error {
  114. var ctx = gctx.GetInitCtx()
  115. // Swagger UI.
  116. if s.config.SwaggerPath != "" {
  117. swaggerui.Init()
  118. s.AddStaticPath(s.config.SwaggerPath, swaggerUIPackedPath)
  119. s.BindHookHandler(s.config.SwaggerPath+"/*", HookBeforeServe, s.swaggerUI)
  120. }
  121. // OpenApi specification json producing handler.
  122. if s.config.OpenApiPath != "" {
  123. s.BindHandler(s.config.OpenApiPath, s.openapiSpec)
  124. }
  125. // Register group routes.
  126. s.handlePreBindItems(ctx)
  127. // Server process initialization, which can only be initialized once.
  128. serverProcessInit()
  129. // Server can only be run once.
  130. if s.Status() == ServerStatusRunning {
  131. return gerror.NewCode(gcode.CodeInvalidOperation, "server is already running")
  132. }
  133. // Logging path setting check.
  134. if s.config.LogPath != "" && s.config.LogPath != s.config.Logger.GetPath() {
  135. if err := s.config.Logger.SetPath(s.config.LogPath); err != nil {
  136. return err
  137. }
  138. }
  139. // Default session storage.
  140. if s.config.SessionStorage == nil {
  141. sessionStoragePath := ""
  142. if s.config.SessionPath != "" {
  143. sessionStoragePath = gfile.Join(s.config.SessionPath, s.config.Name)
  144. if !gfile.Exists(sessionStoragePath) {
  145. if err := gfile.Mkdir(sessionStoragePath); err != nil {
  146. return gerror.Wrapf(err, `mkdir failed for "%s"`, sessionStoragePath)
  147. }
  148. }
  149. }
  150. s.config.SessionStorage = gsession.NewStorageFile(sessionStoragePath, s.config.SessionMaxAge)
  151. }
  152. // Initialize session manager when start running.
  153. s.sessionManager = gsession.New(
  154. s.config.SessionMaxAge,
  155. s.config.SessionStorage,
  156. )
  157. // PProf feature.
  158. if s.config.PProfEnabled {
  159. s.EnablePProf(s.config.PProfPattern)
  160. }
  161. // Default HTTP handler.
  162. if s.config.Handler == nil {
  163. s.config.Handler = s.ServeHTTP
  164. }
  165. // Install external plugins.
  166. for _, p := range s.plugins {
  167. if err := p.Install(s); err != nil {
  168. s.Logger().Fatalf(ctx, `%+v`, err)
  169. }
  170. }
  171. // Check the group routes again for internally registered routes.
  172. s.handlePreBindItems(ctx)
  173. // If there's no route registered and no static service enabled,
  174. // it then returns an error of invalid usage of server.
  175. if len(s.routesMap) == 0 && !s.config.FileServerEnabled {
  176. return gerror.NewCode(
  177. gcode.CodeInvalidOperation,
  178. `there's no route set or static feature enabled, did you forget import the router?`,
  179. )
  180. }
  181. // ================================================================================================
  182. // Start the HTTP server.
  183. // ================================================================================================
  184. reloaded := false
  185. fdMapStr := genv.Get(adminActionReloadEnvKey).String()
  186. if len(fdMapStr) > 0 {
  187. sfm := bufferToServerFdMap([]byte(fdMapStr))
  188. if v, ok := sfm[s.config.Name]; ok {
  189. s.startServer(v)
  190. reloaded = true
  191. }
  192. }
  193. if !reloaded {
  194. s.startServer(nil)
  195. }
  196. // Swagger UI info.
  197. if s.config.SwaggerPath != "" {
  198. s.Logger().Infof(
  199. ctx,
  200. `swagger ui is serving at address: %s%s/`,
  201. s.getLocalListenedAddress(),
  202. s.config.SwaggerPath,
  203. )
  204. }
  205. // OpenApi specification info.
  206. if s.config.OpenApiPath != "" {
  207. s.Logger().Infof(
  208. ctx,
  209. `openapi specification is serving at address: %s%s`,
  210. s.getLocalListenedAddress(),
  211. s.config.OpenApiPath,
  212. )
  213. } else {
  214. if s.config.SwaggerPath != "" {
  215. s.Logger().Warning(
  216. ctx,
  217. `openapi specification is disabled but swagger ui is serving, which might make no sense`,
  218. )
  219. } else {
  220. s.Logger().Info(
  221. ctx,
  222. `openapi specification is disabled`,
  223. )
  224. }
  225. }
  226. // If this is a child process, it then notifies its parent exit.
  227. if gproc.IsChild() {
  228. gtimer.SetTimeout(ctx, time.Duration(s.config.GracefulTimeout)*time.Second, func(ctx context.Context) {
  229. if err := gproc.Send(gproc.PPid(), []byte("exit"), adminGProcCommGroup); err != nil {
  230. intlog.Errorf(ctx, `server error in process communication: %+v`, err)
  231. }
  232. })
  233. }
  234. s.initOpenApi()
  235. s.doServiceRegister()
  236. s.doRouterMapDump()
  237. return nil
  238. }
  239. func (s *Server) getLocalListenedAddress() string {
  240. return fmt.Sprintf(`http://127.0.0.1:%d`, s.GetListenedPort())
  241. }
  242. // doRouterMapDump checks and dumps the router map to the log.
  243. func (s *Server) doRouterMapDump() {
  244. if !s.config.DumpRouterMap {
  245. return
  246. }
  247. var (
  248. ctx = context.TODO()
  249. routes = s.GetRoutes()
  250. isJustDefaultServerAndDomain = true
  251. headers = []string{
  252. "SERVER", "DOMAIN", "ADDRESS", "METHOD", "ROUTE", "HANDLER", "MIDDLEWARE",
  253. }
  254. )
  255. for _, item := range routes {
  256. if item.Server != DefaultServerName || item.Domain != DefaultDomainName {
  257. isJustDefaultServerAndDomain = false
  258. break
  259. }
  260. }
  261. if isJustDefaultServerAndDomain {
  262. headers = []string{"ADDRESS", "METHOD", "ROUTE", "HANDLER", "MIDDLEWARE"}
  263. }
  264. if len(routes) > 0 {
  265. buffer := bytes.NewBuffer(nil)
  266. table := tablewriter.NewWriter(buffer)
  267. table.SetHeader(headers)
  268. table.SetRowLine(true)
  269. table.SetBorder(false)
  270. table.SetCenterSeparator("|")
  271. for _, item := range routes {
  272. var (
  273. data = make([]string, 0)
  274. handlerName = gstr.TrimRightStr(item.Handler.Name, "-fm")
  275. middlewares = gstr.SplitAndTrim(item.Middleware, ",")
  276. )
  277. for k, v := range middlewares {
  278. middlewares[k] = gstr.TrimRightStr(v, "-fm")
  279. }
  280. item.Middleware = gstr.Join(middlewares, "\n")
  281. if isJustDefaultServerAndDomain {
  282. data = append(
  283. data,
  284. item.Address,
  285. item.Method,
  286. item.Route,
  287. handlerName,
  288. item.Middleware,
  289. )
  290. } else {
  291. data = append(
  292. data,
  293. item.Server,
  294. item.Domain,
  295. item.Address,
  296. item.Method,
  297. item.Route,
  298. handlerName,
  299. item.Middleware,
  300. )
  301. }
  302. table.Append(data)
  303. }
  304. table.Render()
  305. s.config.Logger.Header(false).Printf(ctx, "\n%s", buffer.String())
  306. }
  307. }
  308. // GetOpenApi returns the OpenApi specification management object of current server.
  309. func (s *Server) GetOpenApi() *goai.OpenApiV3 {
  310. return s.openapi
  311. }
  312. // GetRoutes retrieves and returns the router array.
  313. func (s *Server) GetRoutes() []RouterItem {
  314. var (
  315. m = make(map[string]*garray.SortedArray)
  316. routeFilterSet = gset.NewStrSet()
  317. address = s.GetListenedAddress()
  318. )
  319. if s.config.HTTPSAddr != "" {
  320. if len(address) > 0 {
  321. address += ","
  322. }
  323. address += "tls" + s.config.HTTPSAddr
  324. }
  325. for k, handlerItems := range s.routesMap {
  326. array, _ := gregex.MatchString(`(.*?)%([A-Z]+):(.+)@(.+)`, k)
  327. for index := len(handlerItems) - 1; index >= 0; index-- {
  328. var (
  329. handlerItem = handlerItems[index]
  330. item = RouterItem{
  331. Server: s.config.Name,
  332. Address: address,
  333. Domain: array[4],
  334. Type: handlerItem.Type,
  335. Middleware: array[1],
  336. Method: array[2],
  337. Route: array[3],
  338. Priority: index,
  339. Handler: handlerItem,
  340. }
  341. )
  342. switch item.Handler.Type {
  343. case HandlerTypeObject, HandlerTypeHandler:
  344. item.IsServiceHandler = true
  345. case HandlerTypeMiddleware:
  346. item.Middleware = "GLOBAL MIDDLEWARE"
  347. }
  348. // Repeated route filtering for dump.
  349. var setKey = fmt.Sprintf(
  350. `%s|%s|%s|%s`,
  351. item.Method, item.Route, item.Domain, item.Type,
  352. )
  353. if !routeFilterSet.AddIfNotExist(setKey) {
  354. continue
  355. }
  356. if len(item.Handler.Middleware) > 0 {
  357. for _, v := range item.Handler.Middleware {
  358. if item.Middleware != "" {
  359. item.Middleware += ","
  360. }
  361. item.Middleware += gdebug.FuncName(v)
  362. }
  363. }
  364. // If the domain does not exist in the dump map, it creates the map.
  365. // The value of the map is a custom sorted array.
  366. if _, ok := m[item.Domain]; !ok {
  367. // Sort in ASC order.
  368. m[item.Domain] = garray.NewSortedArray(func(v1, v2 interface{}) int {
  369. item1 := v1.(RouterItem)
  370. item2 := v2.(RouterItem)
  371. r := 0
  372. if r = strings.Compare(item1.Domain, item2.Domain); r == 0 {
  373. if r = strings.Compare(item1.Route, item2.Route); r == 0 {
  374. if r = strings.Compare(item1.Method, item2.Method); r == 0 {
  375. if item1.Handler.Type == HandlerTypeMiddleware && item2.Handler.Type != HandlerTypeMiddleware {
  376. return -1
  377. } else if item1.Handler.Type == HandlerTypeMiddleware && item2.Handler.Type == HandlerTypeMiddleware {
  378. return 1
  379. } else if r = strings.Compare(item1.Middleware, item2.Middleware); r == 0 {
  380. r = item2.Priority - item1.Priority
  381. }
  382. }
  383. }
  384. }
  385. return r
  386. })
  387. }
  388. m[item.Domain].Add(item)
  389. }
  390. }
  391. routerArray := make([]RouterItem, 0, 128)
  392. for _, array := range m {
  393. for _, v := range array.Slice() {
  394. routerArray = append(routerArray, v.(RouterItem))
  395. }
  396. }
  397. return routerArray
  398. }
  399. // Run starts server listening in blocking way.
  400. // It's commonly used for single server situation.
  401. func (s *Server) Run() {
  402. var ctx = context.TODO()
  403. if err := s.Start(); err != nil {
  404. s.Logger().Fatalf(ctx, `%+v`, err)
  405. }
  406. // Signal handler in asynchronous way.
  407. go handleProcessSignal()
  408. // Blocking using channel for graceful restart.
  409. <-s.closeChan
  410. // Remove plugins.
  411. if len(s.plugins) > 0 {
  412. for _, p := range s.plugins {
  413. intlog.Printf(ctx, `remove plugin: %s`, p.Name())
  414. if err := p.Remove(); err != nil {
  415. intlog.Errorf(ctx, "%+v", err)
  416. }
  417. }
  418. }
  419. s.doServiceDeregister()
  420. s.Logger().Infof(ctx, "pid[%d]: all servers shutdown", gproc.Pid())
  421. }
  422. // Wait blocks to wait for all servers done.
  423. // It's commonly used in multiple server situation.
  424. func Wait() {
  425. var ctx = context.TODO()
  426. // Signal handler in asynchronous way.
  427. go handleProcessSignal()
  428. <-allShutdownChan
  429. // Remove plugins.
  430. serverMapping.Iterator(func(k string, v interface{}) bool {
  431. s := v.(*Server)
  432. if len(s.plugins) > 0 {
  433. for _, p := range s.plugins {
  434. intlog.Printf(ctx, `remove plugin: %s`, p.Name())
  435. if err := p.Remove(); err != nil {
  436. intlog.Errorf(ctx, `%+v`, err)
  437. }
  438. }
  439. }
  440. return true
  441. })
  442. glog.Infof(ctx, "pid[%d]: all servers shutdown", gproc.Pid())
  443. }
  444. // startServer starts the underlying server listening.
  445. func (s *Server) startServer(fdMap listenerFdMap) {
  446. var (
  447. ctx = context.TODO()
  448. httpsEnabled bool
  449. )
  450. // HTTPS
  451. if s.config.TLSConfig != nil || (s.config.HTTPSCertPath != "" && s.config.HTTPSKeyPath != "") {
  452. if len(s.config.HTTPSAddr) == 0 {
  453. if len(s.config.Address) > 0 {
  454. s.config.HTTPSAddr = s.config.Address
  455. s.config.Address = ""
  456. } else {
  457. s.config.HTTPSAddr = defaultHttpsAddr
  458. }
  459. }
  460. httpsEnabled = len(s.config.HTTPSAddr) > 0
  461. var array []string
  462. if v, ok := fdMap["https"]; ok && len(v) > 0 {
  463. array = strings.Split(v, ",")
  464. } else {
  465. array = strings.Split(s.config.HTTPSAddr, ",")
  466. }
  467. for _, v := range array {
  468. if len(v) == 0 {
  469. continue
  470. }
  471. var (
  472. fd = 0
  473. itemFunc = v
  474. addrAndFd = strings.Split(v, "#")
  475. )
  476. if len(addrAndFd) > 1 {
  477. itemFunc = addrAndFd[0]
  478. // The Windows OS does not support socket file descriptor passing
  479. // from parent process.
  480. if runtime.GOOS != "windows" {
  481. fd = gconv.Int(addrAndFd[1])
  482. }
  483. }
  484. if fd > 0 {
  485. s.servers = append(s.servers, s.newGracefulServer(itemFunc, fd))
  486. } else {
  487. s.servers = append(s.servers, s.newGracefulServer(itemFunc))
  488. }
  489. s.servers[len(s.servers)-1].isHttps = true
  490. }
  491. }
  492. // HTTP
  493. if !httpsEnabled && len(s.config.Address) == 0 {
  494. s.config.Address = defaultHttpAddr
  495. }
  496. var array []string
  497. if v, ok := fdMap["http"]; ok && len(v) > 0 {
  498. array = gstr.SplitAndTrim(v, ",")
  499. } else {
  500. array = gstr.SplitAndTrim(s.config.Address, ",")
  501. }
  502. for _, v := range array {
  503. if len(v) == 0 {
  504. continue
  505. }
  506. var (
  507. fd = 0
  508. itemFunc = v
  509. addrAndFd = strings.Split(v, "#")
  510. )
  511. if len(addrAndFd) > 1 {
  512. itemFunc = addrAndFd[0]
  513. // The Window OS does not support socket file descriptor passing
  514. // from the parent process.
  515. if runtime.GOOS != "windows" {
  516. fd = gconv.Int(addrAndFd[1])
  517. }
  518. }
  519. if fd > 0 {
  520. s.servers = append(s.servers, s.newGracefulServer(itemFunc, fd))
  521. } else {
  522. s.servers = append(s.servers, s.newGracefulServer(itemFunc))
  523. }
  524. }
  525. // Start listening asynchronously.
  526. serverRunning.Add(1)
  527. var wg = sync.WaitGroup{}
  528. for _, v := range s.servers {
  529. wg.Add(1)
  530. go func(server *gracefulServer) {
  531. s.serverCount.Add(1)
  532. var err error
  533. // Create listener.
  534. if server.isHttps {
  535. err = server.CreateListenerTLS(
  536. s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig,
  537. )
  538. } else {
  539. err = server.CreateListener()
  540. }
  541. if err != nil {
  542. s.Logger().Fatalf(ctx, `%+v`, err)
  543. }
  544. wg.Done()
  545. // Start listening and serving in blocking way.
  546. err = server.Serve(ctx)
  547. // The process exits if the server is closed with none closing error.
  548. if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
  549. s.Logger().Fatalf(ctx, `%+v`, err)
  550. }
  551. // If all the underlying servers' shutdown, the process exits.
  552. if s.serverCount.Add(-1) < 1 {
  553. s.closeChan <- struct{}{}
  554. if serverRunning.Add(-1) < 1 {
  555. serverMapping.Remove(s.instance)
  556. allShutdownChan <- struct{}{}
  557. }
  558. }
  559. }(v)
  560. }
  561. wg.Wait()
  562. }
  563. // Status retrieves and returns the server status.
  564. func (s *Server) Status() ServerStatus {
  565. if serverRunning.Val() == 0 {
  566. return ServerStatusStopped
  567. }
  568. // If any underlying server is running, the server status is running.
  569. for _, v := range s.servers {
  570. if v.status.Val() == ServerStatusRunning {
  571. return ServerStatusRunning
  572. }
  573. }
  574. return ServerStatusStopped
  575. }
  576. // getListenerFdMap retrieves and returns the socket file descriptors.
  577. // The key of the returned map is "http" and "https".
  578. func (s *Server) getListenerFdMap() map[string]string {
  579. m := map[string]string{
  580. "https": "",
  581. "http": "",
  582. }
  583. for _, v := range s.servers {
  584. str := v.address + "#" + gconv.String(v.Fd()) + ","
  585. if v.isHttps {
  586. if len(m["https"]) > 0 {
  587. m["https"] += ","
  588. }
  589. m["https"] += str
  590. } else {
  591. if len(m["http"]) > 0 {
  592. m["http"] += ","
  593. }
  594. m["http"] += str
  595. }
  596. }
  597. return m
  598. }
  599. // GetListenedPort retrieves and returns one port which is listened by current server.
  600. func (s *Server) GetListenedPort() int {
  601. ports := s.GetListenedPorts()
  602. if len(ports) > 0 {
  603. return ports[0]
  604. }
  605. return 0
  606. }
  607. // GetListenedPorts retrieves and returns the ports which are listened by current server.
  608. func (s *Server) GetListenedPorts() []int {
  609. ports := make([]int, 0)
  610. for _, server := range s.servers {
  611. ports = append(ports, server.GetListenedPort())
  612. }
  613. return ports
  614. }
  615. // GetListenedAddress retrieves and returns the address string which are listened by current server.
  616. func (s *Server) GetListenedAddress() string {
  617. if !gstr.Contains(s.config.Address, FreePortAddress) {
  618. return s.config.Address
  619. }
  620. var (
  621. address = s.config.Address
  622. listenedPorts = s.GetListenedPorts()
  623. )
  624. for _, listenedPort := range listenedPorts {
  625. address = gstr.Replace(address, FreePortAddress, fmt.Sprintf(`:%d`, listenedPort), 1)
  626. }
  627. return address
  628. }