config_selector.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. /*
  2. *
  3. * Copyright 2020 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package resolver provides internal resolver-related functionality.
  19. package resolver
  20. import (
  21. "context"
  22. "sync"
  23. "google.golang.org/grpc/internal/serviceconfig"
  24. "google.golang.org/grpc/metadata"
  25. "google.golang.org/grpc/resolver"
  26. )
  27. // ConfigSelector controls what configuration to use for every RPC.
  28. type ConfigSelector interface {
  29. // Selects the configuration for the RPC, or terminates it using the error.
  30. // This error will be converted by the gRPC library to a status error with
  31. // code UNKNOWN if it is not returned as a status error.
  32. SelectConfig(RPCInfo) (*RPCConfig, error)
  33. }
  34. // RPCInfo contains RPC information needed by a ConfigSelector.
  35. type RPCInfo struct {
  36. // Context is the user's context for the RPC and contains headers and
  37. // application timeout. It is passed for interception purposes and for
  38. // efficiency reasons. SelectConfig should not be blocking.
  39. Context context.Context
  40. Method string // i.e. "/Service/Method"
  41. }
  42. // RPCConfig describes the configuration to use for each RPC.
  43. type RPCConfig struct {
  44. // The context to use for the remainder of the RPC; can pass info to LB
  45. // policy or affect timeout or metadata.
  46. Context context.Context
  47. MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
  48. OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
  49. Interceptor ClientInterceptor
  50. }
  51. // ClientStream is the same as grpc.ClientStream, but defined here for circular
  52. // dependency reasons.
  53. type ClientStream interface {
  54. // Header returns the header metadata received from the server if there
  55. // is any. It blocks if the metadata is not ready to read.
  56. Header() (metadata.MD, error)
  57. // Trailer returns the trailer metadata from the server, if there is any.
  58. // It must only be called after stream.CloseAndRecv has returned, or
  59. // stream.Recv has returned a non-nil error (including io.EOF).
  60. Trailer() metadata.MD
  61. // CloseSend closes the send direction of the stream. It closes the stream
  62. // when non-nil error is met. It is also not safe to call CloseSend
  63. // concurrently with SendMsg.
  64. CloseSend() error
  65. // Context returns the context for this stream.
  66. //
  67. // It should not be called until after Header or RecvMsg has returned. Once
  68. // called, subsequent client-side retries are disabled.
  69. Context() context.Context
  70. // SendMsg is generally called by generated code. On error, SendMsg aborts
  71. // the stream. If the error was generated by the client, the status is
  72. // returned directly; otherwise, io.EOF is returned and the status of
  73. // the stream may be discovered using RecvMsg.
  74. //
  75. // SendMsg blocks until:
  76. // - There is sufficient flow control to schedule m with the transport, or
  77. // - The stream is done, or
  78. // - The stream breaks.
  79. //
  80. // SendMsg does not wait until the message is received by the server. An
  81. // untimely stream closure may result in lost messages. To ensure delivery,
  82. // users should ensure the RPC completed successfully using RecvMsg.
  83. //
  84. // It is safe to have a goroutine calling SendMsg and another goroutine
  85. // calling RecvMsg on the same stream at the same time, but it is not safe
  86. // to call SendMsg on the same stream in different goroutines. It is also
  87. // not safe to call CloseSend concurrently with SendMsg.
  88. SendMsg(m any) error
  89. // RecvMsg blocks until it receives a message into m or the stream is
  90. // done. It returns io.EOF when the stream completes successfully. On
  91. // any other error, the stream is aborted and the error contains the RPC
  92. // status.
  93. //
  94. // It is safe to have a goroutine calling SendMsg and another goroutine
  95. // calling RecvMsg on the same stream at the same time, but it is not
  96. // safe to call RecvMsg on the same stream in different goroutines.
  97. RecvMsg(m any) error
  98. }
  99. // ClientInterceptor is an interceptor for gRPC client streams.
  100. type ClientInterceptor interface {
  101. // NewStream produces a ClientStream for an RPC which may optionally use
  102. // the provided function to produce a stream for delegation. Note:
  103. // RPCInfo.Context should not be used (will be nil).
  104. //
  105. // done is invoked when the RPC is finished using its connection, or could
  106. // not be assigned a connection. RPC operations may still occur on
  107. // ClientStream after done is called, since the interceptor is invoked by
  108. // application-layer operations. done must never be nil when called.
  109. NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
  110. }
  111. // ServerInterceptor is an interceptor for incoming RPC's on gRPC server side.
  112. type ServerInterceptor interface {
  113. // AllowRPC checks if an incoming RPC is allowed to proceed based on
  114. // information about connection RPC was received on, and HTTP Headers. This
  115. // information will be piped into context.
  116. AllowRPC(ctx context.Context) error // TODO: Make this a real interceptor for filters such as rate limiting.
  117. }
  118. type csKeyType string
  119. const csKey = csKeyType("grpc.internal.resolver.configSelector")
  120. // SetConfigSelector sets the config selector in state and returns the new
  121. // state.
  122. func SetConfigSelector(state resolver.State, cs ConfigSelector) resolver.State {
  123. state.Attributes = state.Attributes.WithValue(csKey, cs)
  124. return state
  125. }
  126. // GetConfigSelector retrieves the config selector from state, if present, and
  127. // returns it or nil if absent.
  128. func GetConfigSelector(state resolver.State) ConfigSelector {
  129. cs, _ := state.Attributes.Value(csKey).(ConfigSelector)
  130. return cs
  131. }
  132. // SafeConfigSelector allows for safe switching of ConfigSelector
  133. // implementations such that previous values are guaranteed to not be in use
  134. // when UpdateConfigSelector returns.
  135. type SafeConfigSelector struct {
  136. mu sync.RWMutex
  137. cs ConfigSelector
  138. }
  139. // UpdateConfigSelector swaps to the provided ConfigSelector and blocks until
  140. // all uses of the previous ConfigSelector have completed.
  141. func (scs *SafeConfigSelector) UpdateConfigSelector(cs ConfigSelector) {
  142. scs.mu.Lock()
  143. defer scs.mu.Unlock()
  144. scs.cs = cs
  145. }
  146. // SelectConfig defers to the current ConfigSelector in scs.
  147. func (scs *SafeConfigSelector) SelectConfig(r RPCInfo) (*RPCConfig, error) {
  148. scs.mu.RLock()
  149. defer scs.mu.RUnlock()
  150. return scs.cs.SelectConfig(r)
  151. }