resolver_wrapper.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "context"
  21. "strings"
  22. "sync"
  23. "google.golang.org/grpc/internal/channelz"
  24. "google.golang.org/grpc/internal/grpcsync"
  25. "google.golang.org/grpc/internal/pretty"
  26. "google.golang.org/grpc/resolver"
  27. "google.golang.org/grpc/serviceconfig"
  28. )
  29. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  30. // It implements resolver.ClientConn interface.
  31. type ccResolverWrapper struct {
  32. // The following fields are initialized when the wrapper is created and are
  33. // read-only afterwards, and therefore can be accessed without a mutex.
  34. cc *ClientConn
  35. ignoreServiceConfig bool
  36. serializer *grpcsync.CallbackSerializer
  37. serializerCancel context.CancelFunc
  38. resolver resolver.Resolver // only accessed within the serializer
  39. // The following fields are protected by mu. Caller must take cc.mu before
  40. // taking mu.
  41. mu sync.Mutex
  42. curState resolver.State
  43. closed bool
  44. }
  45. // newCCResolverWrapper initializes the ccResolverWrapper. It can only be used
  46. // after calling start, which builds the resolver.
  47. func newCCResolverWrapper(cc *ClientConn) *ccResolverWrapper {
  48. ctx, cancel := context.WithCancel(cc.ctx)
  49. return &ccResolverWrapper{
  50. cc: cc,
  51. ignoreServiceConfig: cc.dopts.disableServiceConfig,
  52. serializer: grpcsync.NewCallbackSerializer(ctx),
  53. serializerCancel: cancel,
  54. }
  55. }
  56. // start builds the name resolver using the resolver.Builder in cc and returns
  57. // any error encountered. It must always be the first operation performed on
  58. // any newly created ccResolverWrapper, except that close may be called instead.
  59. func (ccr *ccResolverWrapper) start() error {
  60. errCh := make(chan error)
  61. ccr.serializer.Schedule(func(ctx context.Context) {
  62. if ctx.Err() != nil {
  63. return
  64. }
  65. opts := resolver.BuildOptions{
  66. DisableServiceConfig: ccr.cc.dopts.disableServiceConfig,
  67. DialCreds: ccr.cc.dopts.copts.TransportCredentials,
  68. CredsBundle: ccr.cc.dopts.copts.CredsBundle,
  69. Dialer: ccr.cc.dopts.copts.Dialer,
  70. }
  71. var err error
  72. ccr.resolver, err = ccr.cc.resolverBuilder.Build(ccr.cc.parsedTarget, ccr, opts)
  73. errCh <- err
  74. })
  75. return <-errCh
  76. }
  77. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  78. ccr.serializer.Schedule(func(ctx context.Context) {
  79. if ctx.Err() != nil || ccr.resolver == nil {
  80. return
  81. }
  82. ccr.resolver.ResolveNow(o)
  83. })
  84. }
  85. // close initiates async shutdown of the wrapper. To determine the wrapper has
  86. // finished shutting down, the channel should block on ccr.serializer.Done()
  87. // without cc.mu held.
  88. func (ccr *ccResolverWrapper) close() {
  89. channelz.Info(logger, ccr.cc.channelzID, "Closing the name resolver")
  90. ccr.mu.Lock()
  91. ccr.closed = true
  92. ccr.mu.Unlock()
  93. ccr.serializer.Schedule(func(context.Context) {
  94. if ccr.resolver == nil {
  95. return
  96. }
  97. ccr.resolver.Close()
  98. ccr.resolver = nil
  99. })
  100. ccr.serializerCancel()
  101. }
  102. // UpdateState is called by resolver implementations to report new state to gRPC
  103. // which includes addresses and service config.
  104. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
  105. ccr.cc.mu.Lock()
  106. ccr.mu.Lock()
  107. if ccr.closed {
  108. ccr.mu.Unlock()
  109. ccr.cc.mu.Unlock()
  110. return nil
  111. }
  112. if s.Endpoints == nil {
  113. s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
  114. for _, a := range s.Addresses {
  115. ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
  116. ep.Addresses[0].BalancerAttributes = nil
  117. s.Endpoints = append(s.Endpoints, ep)
  118. }
  119. }
  120. ccr.addChannelzTraceEvent(s)
  121. ccr.curState = s
  122. ccr.mu.Unlock()
  123. return ccr.cc.updateResolverStateAndUnlock(s, nil)
  124. }
  125. // ReportError is called by resolver implementations to report errors
  126. // encountered during name resolution to gRPC.
  127. func (ccr *ccResolverWrapper) ReportError(err error) {
  128. ccr.cc.mu.Lock()
  129. ccr.mu.Lock()
  130. if ccr.closed {
  131. ccr.mu.Unlock()
  132. ccr.cc.mu.Unlock()
  133. return
  134. }
  135. ccr.mu.Unlock()
  136. channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
  137. ccr.cc.updateResolverStateAndUnlock(resolver.State{}, err)
  138. }
  139. // NewAddress is called by the resolver implementation to send addresses to
  140. // gRPC.
  141. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  142. ccr.cc.mu.Lock()
  143. ccr.mu.Lock()
  144. if ccr.closed {
  145. ccr.mu.Unlock()
  146. ccr.cc.mu.Unlock()
  147. return
  148. }
  149. s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
  150. ccr.addChannelzTraceEvent(s)
  151. ccr.curState = s
  152. ccr.mu.Unlock()
  153. ccr.cc.updateResolverStateAndUnlock(s, nil)
  154. }
  155. // ParseServiceConfig is called by resolver implementations to parse a JSON
  156. // representation of the service config.
  157. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  158. return parseServiceConfig(scJSON)
  159. }
  160. // addChannelzTraceEvent adds a channelz trace event containing the new
  161. // state received from resolver implementations.
  162. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  163. var updates []string
  164. var oldSC, newSC *ServiceConfig
  165. var oldOK, newOK bool
  166. if ccr.curState.ServiceConfig != nil {
  167. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  168. }
  169. if s.ServiceConfig != nil {
  170. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  171. }
  172. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  173. updates = append(updates, "service config updated")
  174. }
  175. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  176. updates = append(updates, "resolver returned an empty address list")
  177. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  178. updates = append(updates, "resolver returned new addresses")
  179. }
  180. channelz.Infof(logger, ccr.cc.channelzID, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
  181. }