gsvc_discovery.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gsvc
  7. import (
  8. "context"
  9. "time"
  10. "github.com/gogf/gf/v2/container/gmap"
  11. "github.com/gogf/gf/v2/errors/gcode"
  12. "github.com/gogf/gf/v2/errors/gerror"
  13. "github.com/gogf/gf/v2/internal/intlog"
  14. "github.com/gogf/gf/v2/util/gutil"
  15. )
  16. // watchedMap stores discovery object and its watched service mapping.
  17. var watchedMap = gmap.New(true)
  18. // ServiceWatch is used to watch the service status.
  19. type ServiceWatch func(service Service)
  20. // Get retrieves and returns the service by service name.
  21. func Get(ctx context.Context, name string) (service Service, err error) {
  22. return GetAndWatchWithDiscovery(ctx, defaultRegistry, name, nil)
  23. }
  24. // GetWithDiscovery retrieves and returns the service by service name in `discovery`.
  25. func GetWithDiscovery(ctx context.Context, discovery Discovery, name string) (service Service, err error) {
  26. return GetAndWatchWithDiscovery(ctx, discovery, name, nil)
  27. }
  28. // GetAndWatch is used to getting the service with custom watch callback function.
  29. func GetAndWatch(ctx context.Context, name string, watch ServiceWatch) (service Service, err error) {
  30. return GetAndWatchWithDiscovery(ctx, defaultRegistry, name, watch)
  31. }
  32. // GetAndWatchWithDiscovery is used to getting the service with custom watch callback function in `discovery`.
  33. func GetAndWatchWithDiscovery(ctx context.Context, discovery Discovery, name string, watch ServiceWatch) (service Service, err error) {
  34. if discovery == nil {
  35. return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `discovery cannot be nil`)
  36. }
  37. // Retrieve service map by discovery object.
  38. watchedServiceMap := watchedMap.GetOrSetFunc(discovery, func() interface{} {
  39. return gmap.NewStrAnyMap(true)
  40. }).(*gmap.StrAnyMap)
  41. // Retrieve service by name.
  42. storedService := watchedServiceMap.GetOrSetFuncLock(name, func() interface{} {
  43. var (
  44. services []Service
  45. watcher Watcher
  46. )
  47. services, err = discovery.Search(ctx, SearchInput{
  48. Name: name,
  49. })
  50. if err != nil {
  51. return nil
  52. }
  53. if len(services) == 0 {
  54. err = gerror.NewCodef(gcode.CodeNotFound, `service not found with name "%s"`, name)
  55. return nil
  56. }
  57. // Just pick one if multiple.
  58. service = services[0]
  59. // Watch the service changes in goroutine.
  60. if watch != nil {
  61. if watcher, err = discovery.Watch(ctx, service.GetPrefix()); err != nil {
  62. return nil
  63. }
  64. go watchAndUpdateService(watchedServiceMap, watcher, service, watch)
  65. }
  66. return service
  67. })
  68. if storedService != nil {
  69. service = storedService.(Service)
  70. }
  71. return
  72. }
  73. // watchAndUpdateService watches and updates the service in memory if it is changed.
  74. func watchAndUpdateService(watchedServiceMap *gmap.StrAnyMap, watcher Watcher, service Service, watchFunc ServiceWatch) {
  75. var (
  76. ctx = context.Background()
  77. err error
  78. services []Service
  79. )
  80. for {
  81. time.Sleep(time.Second)
  82. if services, err = watcher.Proceed(); err != nil {
  83. intlog.Errorf(ctx, `%+v`, err)
  84. continue
  85. }
  86. if len(services) > 0 {
  87. watchedServiceMap.Set(service.GetName(), services[0])
  88. if watchFunc != nil {
  89. gutil.TryCatch(ctx, func(ctx context.Context) {
  90. watchFunc(services[0])
  91. }, func(ctx context.Context, exception error) {
  92. intlog.Errorf(ctx, `%+v`, exception)
  93. })
  94. }
  95. }
  96. }
  97. }
  98. // Search searches and returns services with specified condition.
  99. func Search(ctx context.Context, in SearchInput) ([]Service, error) {
  100. if defaultRegistry == nil {
  101. return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
  102. }
  103. ctx, _ = context.WithTimeout(ctx, defaultTimeout)
  104. return defaultRegistry.Search(ctx, in)
  105. }
  106. // Watch watches specified condition changes.
  107. func Watch(ctx context.Context, key string) (Watcher, error) {
  108. if defaultRegistry == nil {
  109. return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
  110. }
  111. return defaultRegistry.Watch(ctx, key)
  112. }