balancer.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /*
  2. *
  3. * Copyright 2017 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package base
  19. import (
  20. "context"
  21. "errors"
  22. "google.golang.org/grpc/balancer"
  23. "google.golang.org/grpc/connectivity"
  24. "google.golang.org/grpc/grpclog"
  25. "google.golang.org/grpc/resolver"
  26. )
  27. type baseBuilder struct {
  28. name string
  29. pickerBuilder PickerBuilder
  30. v2PickerBuilder V2PickerBuilder
  31. config Config
  32. }
  33. func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
  34. bal := &baseBalancer{
  35. cc: cc,
  36. pickerBuilder: bb.pickerBuilder,
  37. v2PickerBuilder: bb.v2PickerBuilder,
  38. subConns: make(map[resolver.Address]balancer.SubConn),
  39. scStates: make(map[balancer.SubConn]connectivity.State),
  40. csEvltr: &balancer.ConnectivityStateEvaluator{},
  41. config: bb.config,
  42. }
  43. // Initialize picker to a picker that always returns
  44. // ErrNoSubConnAvailable, because when state of a SubConn changes, we
  45. // may call UpdateState with this picker.
  46. if bb.pickerBuilder != nil {
  47. bal.picker = NewErrPicker(balancer.ErrNoSubConnAvailable)
  48. } else {
  49. bal.v2Picker = NewErrPickerV2(balancer.ErrNoSubConnAvailable)
  50. }
  51. return bal
  52. }
  53. func (bb *baseBuilder) Name() string {
  54. return bb.name
  55. }
  56. var _ balancer.V2Balancer = (*baseBalancer)(nil) // Assert that we implement V2Balancer
  57. type baseBalancer struct {
  58. cc balancer.ClientConn
  59. pickerBuilder PickerBuilder
  60. v2PickerBuilder V2PickerBuilder
  61. csEvltr *balancer.ConnectivityStateEvaluator
  62. state connectivity.State
  63. subConns map[resolver.Address]balancer.SubConn
  64. scStates map[balancer.SubConn]connectivity.State
  65. picker balancer.Picker
  66. v2Picker balancer.V2Picker
  67. config Config
  68. }
  69. func (b *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
  70. panic("not implemented")
  71. }
  72. func (b *baseBalancer) ResolverError(err error) {
  73. switch b.state {
  74. case connectivity.TransientFailure, connectivity.Idle, connectivity.Connecting:
  75. if b.picker != nil {
  76. b.picker = NewErrPicker(err)
  77. } else {
  78. b.v2Picker = NewErrPickerV2(err)
  79. }
  80. }
  81. }
  82. func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
  83. // TODO: handle s.ResolverState.Err (log if not nil) once implemented.
  84. // TODO: handle s.ResolverState.ServiceConfig?
  85. if grpclog.V(2) {
  86. grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
  87. }
  88. // addrsSet is the set converted from addrs, it's used for quick lookup of an address.
  89. addrsSet := make(map[resolver.Address]struct{})
  90. for _, a := range s.ResolverState.Addresses {
  91. addrsSet[a] = struct{}{}
  92. if _, ok := b.subConns[a]; !ok {
  93. // a is a new address (not existing in b.subConns).
  94. sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
  95. if err != nil {
  96. grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
  97. continue
  98. }
  99. b.subConns[a] = sc
  100. b.scStates[sc] = connectivity.Idle
  101. sc.Connect()
  102. }
  103. }
  104. for a, sc := range b.subConns {
  105. // a was removed by resolver.
  106. if _, ok := addrsSet[a]; !ok {
  107. b.cc.RemoveSubConn(sc)
  108. delete(b.subConns, a)
  109. // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
  110. // The entry will be deleted in HandleSubConnStateChange.
  111. }
  112. }
  113. return nil
  114. }
  115. // regeneratePicker takes a snapshot of the balancer, and generates a picker
  116. // from it. The picker is
  117. // - errPicker with ErrTransientFailure if the balancer is in TransientFailure,
  118. // - built by the pickerBuilder with all READY SubConns otherwise.
  119. func (b *baseBalancer) regeneratePicker(err error) {
  120. if b.state == connectivity.TransientFailure {
  121. if b.pickerBuilder != nil {
  122. b.picker = NewErrPicker(balancer.ErrTransientFailure)
  123. } else {
  124. if err != nil {
  125. b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(err))
  126. } else {
  127. // This means the last subchannel transition was not to
  128. // TransientFailure (otherwise err must be set), but the
  129. // aggregate state of the balancer is TransientFailure, meaning
  130. // there are no other addresses.
  131. b.v2Picker = NewErrPickerV2(balancer.TransientFailureError(errors.New("resolver returned no addresses")))
  132. }
  133. }
  134. return
  135. }
  136. if b.pickerBuilder != nil {
  137. readySCs := make(map[resolver.Address]balancer.SubConn)
  138. // Filter out all ready SCs from full subConn map.
  139. for addr, sc := range b.subConns {
  140. if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
  141. readySCs[addr] = sc
  142. }
  143. }
  144. b.picker = b.pickerBuilder.Build(readySCs)
  145. } else {
  146. readySCs := make(map[balancer.SubConn]SubConnInfo)
  147. // Filter out all ready SCs from full subConn map.
  148. for addr, sc := range b.subConns {
  149. if st, ok := b.scStates[sc]; ok && st == connectivity.Ready {
  150. readySCs[sc] = SubConnInfo{Address: addr}
  151. }
  152. }
  153. b.v2Picker = b.v2PickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})
  154. }
  155. }
  156. func (b *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
  157. panic("not implemented")
  158. }
  159. func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
  160. s := state.ConnectivityState
  161. if grpclog.V(2) {
  162. grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
  163. }
  164. oldS, ok := b.scStates[sc]
  165. if !ok {
  166. if grpclog.V(2) {
  167. grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
  168. }
  169. return
  170. }
  171. b.scStates[sc] = s
  172. switch s {
  173. case connectivity.Idle:
  174. sc.Connect()
  175. case connectivity.Shutdown:
  176. // When an address was removed by resolver, b called RemoveSubConn but
  177. // kept the sc's state in scStates. Remove state for this sc here.
  178. delete(b.scStates, sc)
  179. }
  180. oldAggrState := b.state
  181. b.state = b.csEvltr.RecordTransition(oldS, s)
  182. // Regenerate picker when one of the following happens:
  183. // - this sc became ready from not-ready
  184. // - this sc became not-ready from ready
  185. // - the aggregated state of balancer became TransientFailure from non-TransientFailure
  186. // - the aggregated state of balancer became non-TransientFailure from TransientFailure
  187. if (s == connectivity.Ready) != (oldS == connectivity.Ready) ||
  188. (b.state == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
  189. b.regeneratePicker(state.ConnectionError)
  190. }
  191. if b.picker != nil {
  192. b.cc.UpdateBalancerState(b.state, b.picker)
  193. } else {
  194. b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.v2Picker})
  195. }
  196. }
  197. // Close is a nop because base balancer doesn't have internal state to clean up,
  198. // and it doesn't need to call RemoveSubConn for the SubConns.
  199. func (b *baseBalancer) Close() {
  200. }
  201. // NewErrPicker returns a picker that always returns err on Pick().
  202. func NewErrPicker(err error) balancer.Picker {
  203. return &errPicker{err: err}
  204. }
  205. type errPicker struct {
  206. err error // Pick() always returns this err.
  207. }
  208. func (p *errPicker) Pick(context.Context, balancer.PickInfo) (balancer.SubConn, func(balancer.DoneInfo), error) {
  209. return nil, nil, p.err
  210. }
  211. // NewErrPickerV2 returns a V2Picker that always returns err on Pick().
  212. func NewErrPickerV2(err error) balancer.V2Picker {
  213. return &errPickerV2{err: err}
  214. }
  215. type errPickerV2 struct {
  216. err error // Pick() always returns this err.
  217. }
  218. func (p *errPickerV2) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
  219. return balancer.PickResult{}, p.err
  220. }