resolver_conn_wrapper.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package grpc
  19. import (
  20. "fmt"
  21. "strings"
  22. "sync"
  23. "time"
  24. "google.golang.org/grpc/balancer"
  25. "google.golang.org/grpc/credentials"
  26. "google.golang.org/grpc/grpclog"
  27. "google.golang.org/grpc/internal/channelz"
  28. "google.golang.org/grpc/internal/grpcsync"
  29. "google.golang.org/grpc/resolver"
  30. "google.golang.org/grpc/serviceconfig"
  31. )
  32. // ccResolverWrapper is a wrapper on top of cc for resolvers.
  33. // It implements resolver.ClientConnection interface.
  34. type ccResolverWrapper struct {
  35. cc *ClientConn
  36. resolverMu sync.Mutex
  37. resolver resolver.Resolver
  38. done *grpcsync.Event
  39. curState resolver.State
  40. pollingMu sync.Mutex
  41. polling chan struct{}
  42. }
  43. // split2 returns the values from strings.SplitN(s, sep, 2).
  44. // If sep is not found, it returns ("", "", false) instead.
  45. func split2(s, sep string) (string, string, bool) {
  46. spl := strings.SplitN(s, sep, 2)
  47. if len(spl) < 2 {
  48. return "", "", false
  49. }
  50. return spl[0], spl[1], true
  51. }
  52. // parseTarget splits target into a struct containing scheme, authority and
  53. // endpoint.
  54. //
  55. // If target is not a valid scheme://authority/endpoint, it returns {Endpoint:
  56. // target}.
  57. func parseTarget(target string) (ret resolver.Target) {
  58. var ok bool
  59. ret.Scheme, ret.Endpoint, ok = split2(target, "://")
  60. if !ok {
  61. return resolver.Target{Endpoint: target}
  62. }
  63. ret.Authority, ret.Endpoint, ok = split2(ret.Endpoint, "/")
  64. if !ok {
  65. return resolver.Target{Endpoint: target}
  66. }
  67. return ret
  68. }
  69. // newCCResolverWrapper uses the resolver.Builder stored in the ClientConn to
  70. // build a Resolver and returns a ccResolverWrapper object which wraps the
  71. // newly built resolver.
  72. func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {
  73. rb := cc.dopts.resolverBuilder
  74. if rb == nil {
  75. return nil, fmt.Errorf("could not get resolver for scheme: %q", cc.parsedTarget.Scheme)
  76. }
  77. ccr := &ccResolverWrapper{
  78. cc: cc,
  79. done: grpcsync.NewEvent(),
  80. }
  81. var credsClone credentials.TransportCredentials
  82. if creds := cc.dopts.copts.TransportCredentials; creds != nil {
  83. credsClone = creds.Clone()
  84. }
  85. rbo := resolver.BuildOptions{
  86. DisableServiceConfig: cc.dopts.disableServiceConfig,
  87. DialCreds: credsClone,
  88. CredsBundle: cc.dopts.copts.CredsBundle,
  89. Dialer: cc.dopts.copts.Dialer,
  90. }
  91. var err error
  92. // We need to hold the lock here while we assign to the ccr.resolver field
  93. // to guard against a data race caused by the following code path,
  94. // rb.Build-->ccr.ReportError-->ccr.poll-->ccr.resolveNow, would end up
  95. // accessing ccr.resolver which is being assigned here.
  96. ccr.resolverMu.Lock()
  97. defer ccr.resolverMu.Unlock()
  98. ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
  99. if err != nil {
  100. return nil, err
  101. }
  102. return ccr, nil
  103. }
  104. func (ccr *ccResolverWrapper) resolveNow(o resolver.ResolveNowOptions) {
  105. ccr.resolverMu.Lock()
  106. if !ccr.done.HasFired() {
  107. ccr.resolver.ResolveNow(o)
  108. }
  109. ccr.resolverMu.Unlock()
  110. }
  111. func (ccr *ccResolverWrapper) close() {
  112. ccr.resolverMu.Lock()
  113. ccr.resolver.Close()
  114. ccr.done.Fire()
  115. ccr.resolverMu.Unlock()
  116. }
  117. // poll begins or ends asynchronous polling of the resolver based on whether
  118. // err is ErrBadResolverState.
  119. func (ccr *ccResolverWrapper) poll(err error) {
  120. ccr.pollingMu.Lock()
  121. defer ccr.pollingMu.Unlock()
  122. if err != balancer.ErrBadResolverState {
  123. // stop polling
  124. if ccr.polling != nil {
  125. close(ccr.polling)
  126. ccr.polling = nil
  127. }
  128. return
  129. }
  130. if ccr.polling != nil {
  131. // already polling
  132. return
  133. }
  134. p := make(chan struct{})
  135. ccr.polling = p
  136. go func() {
  137. for i := 0; ; i++ {
  138. ccr.resolveNow(resolver.ResolveNowOptions{})
  139. t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
  140. select {
  141. case <-p:
  142. t.Stop()
  143. return
  144. case <-ccr.done.Done():
  145. // Resolver has been closed.
  146. t.Stop()
  147. return
  148. case <-t.C:
  149. select {
  150. case <-p:
  151. return
  152. default:
  153. }
  154. // Timer expired; re-resolve.
  155. }
  156. }
  157. }()
  158. }
  159. func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
  160. if ccr.done.HasFired() {
  161. return
  162. }
  163. grpclog.Infof("ccResolverWrapper: sending update to cc: %v", s)
  164. if channelz.IsOn() {
  165. ccr.addChannelzTraceEvent(s)
  166. }
  167. ccr.curState = s
  168. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  169. }
  170. func (ccr *ccResolverWrapper) ReportError(err error) {
  171. if ccr.done.HasFired() {
  172. return
  173. }
  174. grpclog.Warningf("ccResolverWrapper: reporting error to cc: %v", err)
  175. if channelz.IsOn() {
  176. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  177. Desc: fmt.Sprintf("Resolver reported error: %v", err),
  178. Severity: channelz.CtWarning,
  179. })
  180. }
  181. ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err))
  182. }
  183. // NewAddress is called by the resolver implementation to send addresses to gRPC.
  184. func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
  185. if ccr.done.HasFired() {
  186. return
  187. }
  188. grpclog.Infof("ccResolverWrapper: sending new addresses to cc: %v", addrs)
  189. if channelz.IsOn() {
  190. ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
  191. }
  192. ccr.curState.Addresses = addrs
  193. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  194. }
  195. // NewServiceConfig is called by the resolver implementation to send service
  196. // configs to gRPC.
  197. func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
  198. if ccr.done.HasFired() {
  199. return
  200. }
  201. grpclog.Infof("ccResolverWrapper: got new service config: %v", sc)
  202. if ccr.cc.dopts.disableServiceConfig {
  203. grpclog.Infof("Service config lookups disabled; ignoring config")
  204. return
  205. }
  206. scpr := parseServiceConfig(sc)
  207. if scpr.Err != nil {
  208. grpclog.Warningf("ccResolverWrapper: error parsing service config: %v", scpr.Err)
  209. if channelz.IsOn() {
  210. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  211. Desc: fmt.Sprintf("Error parsing service config: %v", scpr.Err),
  212. Severity: channelz.CtWarning,
  213. })
  214. }
  215. ccr.poll(balancer.ErrBadResolverState)
  216. return
  217. }
  218. if channelz.IsOn() {
  219. ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
  220. }
  221. ccr.curState.ServiceConfig = scpr
  222. ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil))
  223. }
  224. func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {
  225. return parseServiceConfig(scJSON)
  226. }
  227. func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
  228. var updates []string
  229. var oldSC, newSC *ServiceConfig
  230. var oldOK, newOK bool
  231. if ccr.curState.ServiceConfig != nil {
  232. oldSC, oldOK = ccr.curState.ServiceConfig.Config.(*ServiceConfig)
  233. }
  234. if s.ServiceConfig != nil {
  235. newSC, newOK = s.ServiceConfig.Config.(*ServiceConfig)
  236. }
  237. if oldOK != newOK || (oldOK && newOK && oldSC.rawJSONString != newSC.rawJSONString) {
  238. updates = append(updates, "service config updated")
  239. }
  240. if len(ccr.curState.Addresses) > 0 && len(s.Addresses) == 0 {
  241. updates = append(updates, "resolver returned an empty address list")
  242. } else if len(ccr.curState.Addresses) == 0 && len(s.Addresses) > 0 {
  243. updates = append(updates, "resolver returned new addresses")
  244. }
  245. channelz.AddTraceEvent(ccr.cc.channelzID, &channelz.TraceEventDesc{
  246. Desc: fmt.Sprintf("Resolver state updated: %+v (%v)", s, strings.Join(updates, "; ")),
  247. Severity: channelz.CtINFO,
  248. })
  249. }