gclient_discovery.go 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gclient
  7. import (
  8. "context"
  9. "net/http"
  10. "github.com/gogf/gf/v2/container/gmap"
  11. "github.com/gogf/gf/v2/internal/intlog"
  12. "github.com/gogf/gf/v2/net/gsel"
  13. "github.com/gogf/gf/v2/net/gsvc"
  14. )
  15. type discoveryNode struct {
  16. service gsvc.Service
  17. address string
  18. }
  19. // Service is the client discovery service.
  20. func (n *discoveryNode) Service() gsvc.Service {
  21. return n.service
  22. }
  23. // Address returns the address of the node.
  24. func (n *discoveryNode) Address() string {
  25. return n.address
  26. }
  27. var clientSelectorMap = gmap.New(true)
  28. // internalMiddlewareDiscovery is a client middleware that enables service discovery feature for client.
  29. func internalMiddlewareDiscovery(c *Client, r *http.Request) (response *Response, err error) {
  30. if c.discovery == nil {
  31. return c.Next(r)
  32. }
  33. var (
  34. ctx = r.Context()
  35. service gsvc.Service
  36. )
  37. service, err = gsvc.GetAndWatchWithDiscovery(ctx, c.discovery, r.URL.Host, func(service gsvc.Service) {
  38. intlog.Printf(ctx, `http client watching service "%s" changed`, service.GetPrefix())
  39. if v := clientSelectorMap.Get(service.GetPrefix()); v != nil {
  40. if err = updateSelectorNodesByService(ctx, v.(gsel.Selector), service); err != nil {
  41. intlog.Errorf(ctx, `%+v`, err)
  42. }
  43. }
  44. })
  45. if err != nil {
  46. return nil, err
  47. }
  48. if service == nil {
  49. return c.Next(r)
  50. }
  51. // Balancer.
  52. var (
  53. selectorMapKey = service.GetPrefix()
  54. selectorMapValue = clientSelectorMap.GetOrSetFuncLock(selectorMapKey, func() interface{} {
  55. intlog.Printf(ctx, `http client create selector for service "%s"`, selectorMapKey)
  56. selector := c.builder.Build()
  57. // Update selector nodes.
  58. if err = updateSelectorNodesByService(ctx, selector, service); err != nil {
  59. return nil
  60. }
  61. return selector
  62. })
  63. )
  64. if err != nil {
  65. return nil, err
  66. }
  67. selector := selectorMapValue.(gsel.Selector)
  68. // Pick one node from multiple addresses.
  69. node, done, err := selector.Pick(ctx)
  70. if err != nil {
  71. return nil, err
  72. }
  73. if done != nil {
  74. defer done(ctx, gsel.DoneInfo{})
  75. }
  76. r.Host = node.Address()
  77. r.URL.Host = node.Address()
  78. return c.Next(r)
  79. }
  80. func updateSelectorNodesByService(ctx context.Context, selector gsel.Selector, service gsvc.Service) error {
  81. nodes := make(gsel.Nodes, 0)
  82. for _, endpoint := range service.GetEndpoints() {
  83. nodes = append(nodes, &discoveryNode{
  84. service: service,
  85. address: endpoint.String(),
  86. })
  87. }
  88. return selector.Update(ctx, nodes)
  89. }