gsvc_discovery.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gsvc
  7. import (
  8. "context"
  9. "time"
  10. "github.com/gogf/gf/v2/container/gmap"
  11. "github.com/gogf/gf/v2/errors/gcode"
  12. "github.com/gogf/gf/v2/errors/gerror"
  13. "github.com/gogf/gf/v2/internal/intlog"
  14. "github.com/gogf/gf/v2/util/gutil"
  15. )
  16. // watchedServiceMap stores used service
  17. var watchedServiceMap = gmap.New(true)
  18. // ServiceWatch is used to watch the service status.
  19. type ServiceWatch func(service Service)
  20. // Get retrieves and returns the service by service name.
  21. func Get(ctx context.Context, name string) (service Service, err error) {
  22. return GetAndWatch(ctx, name, nil)
  23. }
  24. // GetAndWatch is used to getting the service with custom watch callback function.
  25. func GetAndWatch(ctx context.Context, name string, watch ServiceWatch) (service Service, err error) {
  26. v := watchedServiceMap.GetOrSetFuncLock(name, func() interface{} {
  27. var (
  28. services []Service
  29. watcher Watcher
  30. )
  31. services, err = Search(ctx, SearchInput{
  32. Name: name,
  33. })
  34. if err != nil {
  35. return nil
  36. }
  37. if len(services) == 0 {
  38. err = gerror.NewCodef(gcode.CodeNotFound, `service not found with name "%s"`, name)
  39. return nil
  40. }
  41. // Just pick one if multiple.
  42. service = services[0]
  43. // Watch the service changes in goroutine.
  44. if watch != nil {
  45. if watcher, err = Watch(ctx, service.GetPrefix()); err != nil {
  46. return nil
  47. }
  48. go watchAndUpdateService(watcher, service, watch)
  49. }
  50. return service
  51. })
  52. if v != nil {
  53. service = v.(Service)
  54. }
  55. return
  56. }
  57. // watchAndUpdateService watches and updates the service in memory if it is changed.
  58. func watchAndUpdateService(watcher Watcher, service Service, watchFunc ServiceWatch) {
  59. var (
  60. ctx = context.Background()
  61. err error
  62. services []Service
  63. )
  64. for {
  65. time.Sleep(time.Second)
  66. services, err = watcher.Proceed()
  67. if err != nil {
  68. intlog.Errorf(ctx, `%+v`, err)
  69. continue
  70. }
  71. if len(services) > 0 {
  72. watchedServiceMap.Set(service.GetName(), services[0])
  73. if watchFunc != nil {
  74. gutil.TryCatch(ctx, func(ctx context.Context) {
  75. watchFunc(services[0])
  76. }, func(ctx context.Context, exception error) {
  77. intlog.Errorf(ctx, `%+v`, exception)
  78. })
  79. }
  80. }
  81. }
  82. }
  83. // Search searches and returns services with specified condition.
  84. func Search(ctx context.Context, in SearchInput) ([]Service, error) {
  85. if defaultRegistry == nil {
  86. return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
  87. }
  88. ctx, _ = context.WithTimeout(ctx, defaultTimeout)
  89. return defaultRegistry.Search(ctx, in)
  90. }
  91. // Watch watches specified condition changes.
  92. func Watch(ctx context.Context, key string) (Watcher, error) {
  93. if defaultRegistry == nil {
  94. return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
  95. }
  96. return defaultRegistry.Watch(ctx, key)
  97. }