gclient_discovery.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gclient
  7. import (
  8. "context"
  9. "net/http"
  10. "github.com/gogf/gf/v2/container/gmap"
  11. "github.com/gogf/gf/v2/internal/intlog"
  12. "github.com/gogf/gf/v2/net/gsel"
  13. "github.com/gogf/gf/v2/net/gsvc"
  14. "github.com/gogf/gf/v2/os/gctx"
  15. )
  16. const (
  17. discoveryMiddlewareHandled gctx.StrKey = `MiddlewareClientDiscoveryHandled`
  18. )
  19. type discoveryNode struct {
  20. service gsvc.Service
  21. address string
  22. }
  23. // Service is the client discovery service.
  24. func (n *discoveryNode) Service() gsvc.Service {
  25. return n.service
  26. }
  27. // Address returns the address of the node.
  28. func (n *discoveryNode) Address() string {
  29. return n.address
  30. }
  31. var clientSelectorMap = gmap.New(true)
  32. // internalMiddlewareDiscovery is a client middleware that enables service discovery feature for client.
  33. func internalMiddlewareDiscovery(c *Client, r *http.Request) (response *Response, err error) {
  34. ctx := r.Context()
  35. // Mark this request is handled by server tracing middleware,
  36. // to avoid repeated handling by the same middleware.
  37. if ctx.Value(discoveryMiddlewareHandled) != nil {
  38. return c.Next(r)
  39. }
  40. if gsvc.GetRegistry() == nil {
  41. return c.Next(r)
  42. }
  43. var service gsvc.Service
  44. service, err = gsvc.GetAndWatch(ctx, r.URL.Host, func(service gsvc.Service) {
  45. intlog.Printf(ctx, `http client watching service "%s" changed`, service.GetPrefix())
  46. if v := clientSelectorMap.Get(service.GetPrefix()); v != nil {
  47. if err = updateSelectorNodesByService(ctx, v.(gsel.Selector), service); err != nil {
  48. intlog.Errorf(context.Background(), `%+v`, err)
  49. }
  50. }
  51. })
  52. if err != nil {
  53. return nil, err
  54. }
  55. if service == nil {
  56. return c.Next(r)
  57. }
  58. // Balancer.
  59. var (
  60. selectorMapKey = service.GetPrefix()
  61. selectorMapValue = clientSelectorMap.GetOrSetFuncLock(selectorMapKey, func() interface{} {
  62. intlog.Printf(ctx, `http client create selector for service "%s"`, selectorMapKey)
  63. selector := gsel.GetBuilder().Build()
  64. // Update selector nodes.
  65. if err = updateSelectorNodesByService(ctx, selector, service); err != nil {
  66. return nil
  67. }
  68. return selector
  69. })
  70. )
  71. if err != nil {
  72. return nil, err
  73. }
  74. selector := selectorMapValue.(gsel.Selector)
  75. // Pick one node from multiple addresses.
  76. node, done, err := selector.Pick(ctx)
  77. if err != nil {
  78. return nil, err
  79. }
  80. if done != nil {
  81. defer done(ctx, gsel.DoneInfo{})
  82. }
  83. r.URL.Host = node.Address()
  84. r.Host = node.Address()
  85. return c.Next(r)
  86. }
  87. func updateSelectorNodesByService(ctx context.Context, selector gsel.Selector, service gsvc.Service) error {
  88. nodes := make(gsel.Nodes, 0)
  89. for _, endpoint := range service.GetEndpoints() {
  90. nodes = append(nodes, &discoveryNode{
  91. service: service,
  92. address: endpoint.String(),
  93. })
  94. }
  95. return selector.Update(ctx, nodes)
  96. }