transport.go 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package transport defines and implements message oriented communication
  19. // channel to complete various transactions (e.g., an RPC). It is meant for
  20. // grpc-internal usage and is not intended to be imported directly by users.
  21. package transport
  22. import (
  23. "bytes"
  24. "context"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "net"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal/channelz"
  35. "google.golang.org/grpc/keepalive"
  36. "google.golang.org/grpc/metadata"
  37. "google.golang.org/grpc/peer"
  38. "google.golang.org/grpc/resolver"
  39. "google.golang.org/grpc/stats"
  40. "google.golang.org/grpc/status"
  41. "google.golang.org/grpc/tap"
  42. )
  43. const logLevel = 2
  44. type bufferPool struct {
  45. pool sync.Pool
  46. }
  47. func newBufferPool() *bufferPool {
  48. return &bufferPool{
  49. pool: sync.Pool{
  50. New: func() any {
  51. return new(bytes.Buffer)
  52. },
  53. },
  54. }
  55. }
  56. func (p *bufferPool) get() *bytes.Buffer {
  57. return p.pool.Get().(*bytes.Buffer)
  58. }
  59. func (p *bufferPool) put(b *bytes.Buffer) {
  60. p.pool.Put(b)
  61. }
  62. // recvMsg represents the received msg from the transport. All transport
  63. // protocol specific info has been removed.
  64. type recvMsg struct {
  65. buffer *bytes.Buffer
  66. // nil: received some data
  67. // io.EOF: stream is completed. data is nil.
  68. // other non-nil error: transport failure. data is nil.
  69. err error
  70. }
  71. // recvBuffer is an unbounded channel of recvMsg structs.
  72. //
  73. // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
  74. // holds a channel of recvMsg structs instead of objects implementing "item"
  75. // interface. recvBuffer is written to much more often and using strict recvMsg
  76. // structs helps avoid allocation in "recvBuffer.put"
  77. type recvBuffer struct {
  78. c chan recvMsg
  79. mu sync.Mutex
  80. backlog []recvMsg
  81. err error
  82. }
  83. func newRecvBuffer() *recvBuffer {
  84. b := &recvBuffer{
  85. c: make(chan recvMsg, 1),
  86. }
  87. return b
  88. }
  89. func (b *recvBuffer) put(r recvMsg) {
  90. b.mu.Lock()
  91. if b.err != nil {
  92. b.mu.Unlock()
  93. // An error had occurred earlier, don't accept more
  94. // data or errors.
  95. return
  96. }
  97. b.err = r.err
  98. if len(b.backlog) == 0 {
  99. select {
  100. case b.c <- r:
  101. b.mu.Unlock()
  102. return
  103. default:
  104. }
  105. }
  106. b.backlog = append(b.backlog, r)
  107. b.mu.Unlock()
  108. }
  109. func (b *recvBuffer) load() {
  110. b.mu.Lock()
  111. if len(b.backlog) > 0 {
  112. select {
  113. case b.c <- b.backlog[0]:
  114. b.backlog[0] = recvMsg{}
  115. b.backlog = b.backlog[1:]
  116. default:
  117. }
  118. }
  119. b.mu.Unlock()
  120. }
  121. // get returns the channel that receives a recvMsg in the buffer.
  122. //
  123. // Upon receipt of a recvMsg, the caller should call load to send another
  124. // recvMsg onto the channel if there is any.
  125. func (b *recvBuffer) get() <-chan recvMsg {
  126. return b.c
  127. }
  128. // recvBufferReader implements io.Reader interface to read the data from
  129. // recvBuffer.
  130. type recvBufferReader struct {
  131. closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
  132. ctx context.Context
  133. ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
  134. recv *recvBuffer
  135. last *bytes.Buffer // Stores the remaining data in the previous calls.
  136. err error
  137. freeBuffer func(*bytes.Buffer)
  138. }
  139. // Read reads the next len(p) bytes from last. If last is drained, it tries to
  140. // read additional data from recv. It blocks if there no additional data available
  141. // in recv. If Read returns any non-nil error, it will continue to return that error.
  142. func (r *recvBufferReader) Read(p []byte) (n int, err error) {
  143. if r.err != nil {
  144. return 0, r.err
  145. }
  146. if r.last != nil {
  147. // Read remaining data left in last call.
  148. copied, _ := r.last.Read(p)
  149. if r.last.Len() == 0 {
  150. r.freeBuffer(r.last)
  151. r.last = nil
  152. }
  153. return copied, nil
  154. }
  155. if r.closeStream != nil {
  156. n, r.err = r.readClient(p)
  157. } else {
  158. n, r.err = r.read(p)
  159. }
  160. return n, r.err
  161. }
  162. func (r *recvBufferReader) read(p []byte) (n int, err error) {
  163. select {
  164. case <-r.ctxDone:
  165. return 0, ContextErr(r.ctx.Err())
  166. case m := <-r.recv.get():
  167. return r.readAdditional(m, p)
  168. }
  169. }
  170. func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
  171. // If the context is canceled, then closes the stream with nil metadata.
  172. // closeStream writes its error parameter to r.recv as a recvMsg.
  173. // r.readAdditional acts on that message and returns the necessary error.
  174. select {
  175. case <-r.ctxDone:
  176. // Note that this adds the ctx error to the end of recv buffer, and
  177. // reads from the head. This will delay the error until recv buffer is
  178. // empty, thus will delay ctx cancellation in Recv().
  179. //
  180. // It's done this way to fix a race between ctx cancel and trailer. The
  181. // race was, stream.Recv() may return ctx error if ctxDone wins the
  182. // race, but stream.Trailer() may return a non-nil md because the stream
  183. // was not marked as done when trailer is received. This closeStream
  184. // call will mark stream as done, thus fix the race.
  185. //
  186. // TODO: delaying ctx error seems like a unnecessary side effect. What
  187. // we really want is to mark the stream as done, and return ctx error
  188. // faster.
  189. r.closeStream(ContextErr(r.ctx.Err()))
  190. m := <-r.recv.get()
  191. return r.readAdditional(m, p)
  192. case m := <-r.recv.get():
  193. return r.readAdditional(m, p)
  194. }
  195. }
  196. func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
  197. r.recv.load()
  198. if m.err != nil {
  199. return 0, m.err
  200. }
  201. copied, _ := m.buffer.Read(p)
  202. if m.buffer.Len() == 0 {
  203. r.freeBuffer(m.buffer)
  204. r.last = nil
  205. } else {
  206. r.last = m.buffer
  207. }
  208. return copied, nil
  209. }
  210. type streamState uint32
  211. const (
  212. streamActive streamState = iota
  213. streamWriteDone // EndStream sent
  214. streamReadDone // EndStream received
  215. streamDone // the entire stream is finished.
  216. )
  217. // Stream represents an RPC in the transport layer.
  218. type Stream struct {
  219. id uint32
  220. st ServerTransport // nil for client side Stream
  221. ct *http2Client // nil for server side Stream
  222. ctx context.Context // the associated context of the stream
  223. cancel context.CancelFunc // always nil for client side Stream
  224. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  225. doneFunc func() // invoked at the end of stream on client side.
  226. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  227. method string // the associated RPC method of the stream
  228. recvCompress string
  229. sendCompress string
  230. buf *recvBuffer
  231. trReader io.Reader
  232. fc *inFlow
  233. wq *writeQuota
  234. // Holds compressor names passed in grpc-accept-encoding metadata from the
  235. // client. This is empty for the client side stream.
  236. clientAdvertisedCompressors string
  237. // Callback to state application's intentions to read data. This
  238. // is used to adjust flow control, if needed.
  239. requestRead func(int)
  240. headerChan chan struct{} // closed to indicate the end of header metadata.
  241. headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  242. // headerValid indicates whether a valid header was received. Only
  243. // meaningful after headerChan is closed (always call waitOnHeader() before
  244. // reading its value). Not valid on server side.
  245. headerValid bool
  246. headerWireLength int // Only set on server side.
  247. // hdrMu protects header and trailer metadata on the server-side.
  248. hdrMu sync.Mutex
  249. // On client side, header keeps the received header metadata.
  250. //
  251. // On server side, header keeps the header set by SetHeader(). The complete
  252. // header will merged into this after t.WriteHeader() is called.
  253. header metadata.MD
  254. trailer metadata.MD // the key-value map of trailer metadata.
  255. noHeaders bool // set if the client never received headers (set only after the stream is done).
  256. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  257. headerSent uint32
  258. state streamState
  259. // On client-side it is the status error received from the server.
  260. // On server-side it is unused.
  261. status *status.Status
  262. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  263. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  264. // contentSubtype is the content-subtype for requests.
  265. // this must be lowercase or the behavior is undefined.
  266. contentSubtype string
  267. }
  268. // isHeaderSent is only valid on the server-side.
  269. func (s *Stream) isHeaderSent() bool {
  270. return atomic.LoadUint32(&s.headerSent) == 1
  271. }
  272. // updateHeaderSent updates headerSent and returns true
  273. // if it was alreay set. It is valid only on server-side.
  274. func (s *Stream) updateHeaderSent() bool {
  275. return atomic.SwapUint32(&s.headerSent, 1) == 1
  276. }
  277. func (s *Stream) swapState(st streamState) streamState {
  278. return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
  279. }
  280. func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
  281. return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
  282. }
  283. func (s *Stream) getState() streamState {
  284. return streamState(atomic.LoadUint32((*uint32)(&s.state)))
  285. }
  286. func (s *Stream) waitOnHeader() {
  287. if s.headerChan == nil {
  288. // On the server headerChan is always nil since a stream originates
  289. // only after having received headers.
  290. return
  291. }
  292. select {
  293. case <-s.ctx.Done():
  294. // Close the stream to prevent headers/trailers from changing after
  295. // this function returns.
  296. s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
  297. // headerChan could possibly not be closed yet if closeStream raced
  298. // with operateHeaders; wait until it is closed explicitly here.
  299. <-s.headerChan
  300. case <-s.headerChan:
  301. }
  302. }
  303. // RecvCompress returns the compression algorithm applied to the inbound
  304. // message. It is empty string if there is no compression applied.
  305. func (s *Stream) RecvCompress() string {
  306. s.waitOnHeader()
  307. return s.recvCompress
  308. }
  309. // SetSendCompress sets the compression algorithm to the stream.
  310. func (s *Stream) SetSendCompress(name string) error {
  311. if s.isHeaderSent() || s.getState() == streamDone {
  312. return errors.New("transport: set send compressor called after headers sent or stream done")
  313. }
  314. s.sendCompress = name
  315. return nil
  316. }
  317. // SendCompress returns the send compressor name.
  318. func (s *Stream) SendCompress() string {
  319. return s.sendCompress
  320. }
  321. // ClientAdvertisedCompressors returns the compressor names advertised by the
  322. // client via grpc-accept-encoding header.
  323. func (s *Stream) ClientAdvertisedCompressors() string {
  324. return s.clientAdvertisedCompressors
  325. }
  326. // Done returns a channel which is closed when it receives the final status
  327. // from the server.
  328. func (s *Stream) Done() <-chan struct{} {
  329. return s.done
  330. }
  331. // Header returns the header metadata of the stream.
  332. //
  333. // On client side, it acquires the key-value pairs of header metadata once it is
  334. // available. It blocks until i) the metadata is ready or ii) there is no header
  335. // metadata or iii) the stream is canceled/expired.
  336. //
  337. // On server side, it returns the out header after t.WriteHeader is called. It
  338. // does not block and must not be called until after WriteHeader.
  339. func (s *Stream) Header() (metadata.MD, error) {
  340. if s.headerChan == nil {
  341. // On server side, return the header in stream. It will be the out
  342. // header after t.WriteHeader is called.
  343. return s.header.Copy(), nil
  344. }
  345. s.waitOnHeader()
  346. if !s.headerValid || s.noHeaders {
  347. return nil, s.status.Err()
  348. }
  349. return s.header.Copy(), nil
  350. }
  351. // TrailersOnly blocks until a header or trailers-only frame is received and
  352. // then returns true if the stream was trailers-only. If the stream ends
  353. // before headers are received, returns true, nil. Client-side only.
  354. func (s *Stream) TrailersOnly() bool {
  355. s.waitOnHeader()
  356. return s.noHeaders
  357. }
  358. // Trailer returns the cached trailer metedata. Note that if it is not called
  359. // after the entire stream is done, it could return an empty MD. Client
  360. // side only.
  361. // It can be safely read only after stream has ended that is either read
  362. // or write have returned io.EOF.
  363. func (s *Stream) Trailer() metadata.MD {
  364. c := s.trailer.Copy()
  365. return c
  366. }
  367. // ContentSubtype returns the content-subtype for a request. For example, a
  368. // content-subtype of "proto" will result in a content-type of
  369. // "application/grpc+proto". This will always be lowercase. See
  370. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  371. // more details.
  372. func (s *Stream) ContentSubtype() string {
  373. return s.contentSubtype
  374. }
  375. // Context returns the context of the stream.
  376. func (s *Stream) Context() context.Context {
  377. return s.ctx
  378. }
  379. // SetContext sets the context of the stream. This will be deleted once the
  380. // stats handler callouts all move to gRPC layer.
  381. func (s *Stream) SetContext(ctx context.Context) {
  382. s.ctx = ctx
  383. }
  384. // Method returns the method for the stream.
  385. func (s *Stream) Method() string {
  386. return s.method
  387. }
  388. // Status returns the status received from the server.
  389. // Status can be read safely only after the stream has ended,
  390. // that is, after Done() is closed.
  391. func (s *Stream) Status() *status.Status {
  392. return s.status
  393. }
  394. // HeaderWireLength returns the size of the headers of the stream as received
  395. // from the wire. Valid only on the server.
  396. func (s *Stream) HeaderWireLength() int {
  397. return s.headerWireLength
  398. }
  399. // SetHeader sets the header metadata. This can be called multiple times.
  400. // Server side only.
  401. // This should not be called in parallel to other data writes.
  402. func (s *Stream) SetHeader(md metadata.MD) error {
  403. if md.Len() == 0 {
  404. return nil
  405. }
  406. if s.isHeaderSent() || s.getState() == streamDone {
  407. return ErrIllegalHeaderWrite
  408. }
  409. s.hdrMu.Lock()
  410. s.header = metadata.Join(s.header, md)
  411. s.hdrMu.Unlock()
  412. return nil
  413. }
  414. // SendHeader sends the given header metadata. The given metadata is
  415. // combined with any metadata set by previous calls to SetHeader and
  416. // then written to the transport stream.
  417. func (s *Stream) SendHeader(md metadata.MD) error {
  418. return s.st.WriteHeader(s, md)
  419. }
  420. // SetTrailer sets the trailer metadata which will be sent with the RPC status
  421. // by the server. This can be called multiple times. Server side only.
  422. // This should not be called parallel to other data writes.
  423. func (s *Stream) SetTrailer(md metadata.MD) error {
  424. if md.Len() == 0 {
  425. return nil
  426. }
  427. if s.getState() == streamDone {
  428. return ErrIllegalHeaderWrite
  429. }
  430. s.hdrMu.Lock()
  431. s.trailer = metadata.Join(s.trailer, md)
  432. s.hdrMu.Unlock()
  433. return nil
  434. }
  435. func (s *Stream) write(m recvMsg) {
  436. s.buf.put(m)
  437. }
  438. // Read reads all p bytes from the wire for this stream.
  439. func (s *Stream) Read(p []byte) (n int, err error) {
  440. // Don't request a read if there was an error earlier
  441. if er := s.trReader.(*transportReader).er; er != nil {
  442. return 0, er
  443. }
  444. s.requestRead(len(p))
  445. return io.ReadFull(s.trReader, p)
  446. }
  447. // tranportReader reads all the data available for this Stream from the transport and
  448. // passes them into the decoder, which converts them into a gRPC message stream.
  449. // The error is io.EOF when the stream is done or another non-nil error if
  450. // the stream broke.
  451. type transportReader struct {
  452. reader io.Reader
  453. // The handler to control the window update procedure for both this
  454. // particular stream and the associated transport.
  455. windowHandler func(int)
  456. er error
  457. }
  458. func (t *transportReader) Read(p []byte) (n int, err error) {
  459. n, err = t.reader.Read(p)
  460. if err != nil {
  461. t.er = err
  462. return
  463. }
  464. t.windowHandler(n)
  465. return
  466. }
  467. // BytesReceived indicates whether any bytes have been received on this stream.
  468. func (s *Stream) BytesReceived() bool {
  469. return atomic.LoadUint32(&s.bytesReceived) == 1
  470. }
  471. // Unprocessed indicates whether the server did not process this stream --
  472. // i.e. it sent a refused stream or GOAWAY including this stream ID.
  473. func (s *Stream) Unprocessed() bool {
  474. return atomic.LoadUint32(&s.unprocessed) == 1
  475. }
  476. // GoString is implemented by Stream so context.String() won't
  477. // race when printing %#v.
  478. func (s *Stream) GoString() string {
  479. return fmt.Sprintf("<stream: %p, %v>", s, s.method)
  480. }
  481. // state of transport
  482. type transportState int
  483. const (
  484. reachable transportState = iota
  485. closing
  486. draining
  487. )
  488. // ServerConfig consists of all the configurations to establish a server transport.
  489. type ServerConfig struct {
  490. MaxStreams uint32
  491. ConnectionTimeout time.Duration
  492. Credentials credentials.TransportCredentials
  493. InTapHandle tap.ServerInHandle
  494. StatsHandlers []stats.Handler
  495. KeepaliveParams keepalive.ServerParameters
  496. KeepalivePolicy keepalive.EnforcementPolicy
  497. InitialWindowSize int32
  498. InitialConnWindowSize int32
  499. WriteBufferSize int
  500. ReadBufferSize int
  501. SharedWriteBuffer bool
  502. ChannelzParentID *channelz.Identifier
  503. MaxHeaderListSize *uint32
  504. HeaderTableSize *uint32
  505. }
  506. // ConnectOptions covers all relevant options for communicating with the server.
  507. type ConnectOptions struct {
  508. // UserAgent is the application user agent.
  509. UserAgent string
  510. // Dialer specifies how to dial a network address.
  511. Dialer func(context.Context, string) (net.Conn, error)
  512. // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
  513. FailOnNonTempDialError bool
  514. // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
  515. PerRPCCredentials []credentials.PerRPCCredentials
  516. // TransportCredentials stores the Authenticator required to setup a client
  517. // connection. Only one of TransportCredentials and CredsBundle is non-nil.
  518. TransportCredentials credentials.TransportCredentials
  519. // CredsBundle is the credentials bundle to be used. Only one of
  520. // TransportCredentials and CredsBundle is non-nil.
  521. CredsBundle credentials.Bundle
  522. // KeepaliveParams stores the keepalive parameters.
  523. KeepaliveParams keepalive.ClientParameters
  524. // StatsHandlers stores the handler for stats.
  525. StatsHandlers []stats.Handler
  526. // InitialWindowSize sets the initial window size for a stream.
  527. InitialWindowSize int32
  528. // InitialConnWindowSize sets the initial window size for a connection.
  529. InitialConnWindowSize int32
  530. // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
  531. WriteBufferSize int
  532. // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
  533. ReadBufferSize int
  534. // SharedWriteBuffer indicates whether connections should reuse write buffer
  535. SharedWriteBuffer bool
  536. // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
  537. ChannelzParentID *channelz.Identifier
  538. // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
  539. MaxHeaderListSize *uint32
  540. // UseProxy specifies if a proxy should be used.
  541. UseProxy bool
  542. }
  543. // NewClientTransport establishes the transport with the required ConnectOptions
  544. // and returns it to the caller.
  545. func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
  546. return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
  547. }
  548. // Options provides additional hints and information for message
  549. // transmission.
  550. type Options struct {
  551. // Last indicates whether this write is the last piece for
  552. // this stream.
  553. Last bool
  554. }
  555. // CallHdr carries the information of a particular RPC.
  556. type CallHdr struct {
  557. // Host specifies the peer's host.
  558. Host string
  559. // Method specifies the operation to perform.
  560. Method string
  561. // SendCompress specifies the compression algorithm applied on
  562. // outbound message.
  563. SendCompress string
  564. // Creds specifies credentials.PerRPCCredentials for a call.
  565. Creds credentials.PerRPCCredentials
  566. // ContentSubtype specifies the content-subtype for a request. For example, a
  567. // content-subtype of "proto" will result in a content-type of
  568. // "application/grpc+proto". The value of ContentSubtype must be all
  569. // lowercase, otherwise the behavior is undefined. See
  570. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  571. // for more details.
  572. ContentSubtype string
  573. PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
  574. DoneFunc func() // called when the stream is finished
  575. }
  576. // ClientTransport is the common interface for all gRPC client-side transport
  577. // implementations.
  578. type ClientTransport interface {
  579. // Close tears down this transport. Once it returns, the transport
  580. // should not be accessed any more. The caller must make sure this
  581. // is called only once.
  582. Close(err error)
  583. // GracefulClose starts to tear down the transport: the transport will stop
  584. // accepting new RPCs and NewStream will return error. Once all streams are
  585. // finished, the transport will close.
  586. //
  587. // It does not block.
  588. GracefulClose()
  589. // Write sends the data for the given stream. A nil stream indicates
  590. // the write is to be performed on the transport as a whole.
  591. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  592. // NewStream creates a Stream for an RPC.
  593. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
  594. // CloseStream clears the footprint of a stream when the stream is
  595. // not needed any more. The err indicates the error incurred when
  596. // CloseStream is called. Must be called when a stream is finished
  597. // unless the associated transport is closing.
  598. CloseStream(stream *Stream, err error)
  599. // Error returns a channel that is closed when some I/O error
  600. // happens. Typically the caller should have a goroutine to monitor
  601. // this in order to take action (e.g., close the current transport
  602. // and create a new one) in error case. It should not return nil
  603. // once the transport is initiated.
  604. Error() <-chan struct{}
  605. // GoAway returns a channel that is closed when ClientTransport
  606. // receives the draining signal from the server (e.g., GOAWAY frame in
  607. // HTTP/2).
  608. GoAway() <-chan struct{}
  609. // GetGoAwayReason returns the reason why GoAway frame was received, along
  610. // with a human readable string with debug info.
  611. GetGoAwayReason() (GoAwayReason, string)
  612. // RemoteAddr returns the remote network address.
  613. RemoteAddr() net.Addr
  614. // IncrMsgSent increments the number of message sent through this transport.
  615. IncrMsgSent()
  616. // IncrMsgRecv increments the number of message received through this transport.
  617. IncrMsgRecv()
  618. }
  619. // ServerTransport is the common interface for all gRPC server-side transport
  620. // implementations.
  621. //
  622. // Methods may be called concurrently from multiple goroutines, but
  623. // Write methods for a given Stream will be called serially.
  624. type ServerTransport interface {
  625. // HandleStreams receives incoming streams using the given handler.
  626. HandleStreams(context.Context, func(*Stream))
  627. // WriteHeader sends the header metadata for the given stream.
  628. // WriteHeader may not be called on all streams.
  629. WriteHeader(s *Stream, md metadata.MD) error
  630. // Write sends the data for the given stream.
  631. // Write may not be called on all streams.
  632. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  633. // WriteStatus sends the status of a stream to the client. WriteStatus is
  634. // the final call made on a stream and always occurs.
  635. WriteStatus(s *Stream, st *status.Status) error
  636. // Close tears down the transport. Once it is called, the transport
  637. // should not be accessed any more. All the pending streams and their
  638. // handlers will be terminated asynchronously.
  639. Close(err error)
  640. // Peer returns the peer of the server transport.
  641. Peer() *peer.Peer
  642. // Drain notifies the client this ServerTransport stops accepting new RPCs.
  643. Drain(debugData string)
  644. // IncrMsgSent increments the number of message sent through this transport.
  645. IncrMsgSent()
  646. // IncrMsgRecv increments the number of message received through this transport.
  647. IncrMsgRecv()
  648. }
  649. // connectionErrorf creates an ConnectionError with the specified error description.
  650. func connectionErrorf(temp bool, e error, format string, a ...any) ConnectionError {
  651. return ConnectionError{
  652. Desc: fmt.Sprintf(format, a...),
  653. temp: temp,
  654. err: e,
  655. }
  656. }
  657. // ConnectionError is an error that results in the termination of the
  658. // entire connection and the retry of all the active streams.
  659. type ConnectionError struct {
  660. Desc string
  661. temp bool
  662. err error
  663. }
  664. func (e ConnectionError) Error() string {
  665. return fmt.Sprintf("connection error: desc = %q", e.Desc)
  666. }
  667. // Temporary indicates if this connection error is temporary or fatal.
  668. func (e ConnectionError) Temporary() bool {
  669. return e.temp
  670. }
  671. // Origin returns the original error of this connection error.
  672. func (e ConnectionError) Origin() error {
  673. // Never return nil error here.
  674. // If the original error is nil, return itself.
  675. if e.err == nil {
  676. return e
  677. }
  678. return e.err
  679. }
  680. // Unwrap returns the original error of this connection error or nil when the
  681. // origin is nil.
  682. func (e ConnectionError) Unwrap() error {
  683. return e.err
  684. }
  685. var (
  686. // ErrConnClosing indicates that the transport is closing.
  687. ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
  688. // errStreamDrain indicates that the stream is rejected because the
  689. // connection is draining. This could be caused by goaway or balancer
  690. // removing the address.
  691. errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
  692. // errStreamDone is returned from write at the client side to indiacte application
  693. // layer of an error.
  694. errStreamDone = errors.New("the stream is done")
  695. // StatusGoAway indicates that the server sent a GOAWAY that included this
  696. // stream's ID in unprocessed RPCs.
  697. statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
  698. )
  699. // GoAwayReason contains the reason for the GoAway frame received.
  700. type GoAwayReason uint8
  701. const (
  702. // GoAwayInvalid indicates that no GoAway frame is received.
  703. GoAwayInvalid GoAwayReason = 0
  704. // GoAwayNoReason is the default value when GoAway frame is received.
  705. GoAwayNoReason GoAwayReason = 1
  706. // GoAwayTooManyPings indicates that a GoAway frame with
  707. // ErrCodeEnhanceYourCalm was received and that the debug data said
  708. // "too_many_pings".
  709. GoAwayTooManyPings GoAwayReason = 2
  710. )
  711. // channelzData is used to store channelz related data for http2Client and http2Server.
  712. // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
  713. // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
  714. // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
  715. type channelzData struct {
  716. kpCount int64
  717. // The number of streams that have started, including already finished ones.
  718. streamsStarted int64
  719. // Client side: The number of streams that have ended successfully by receiving
  720. // EoS bit set frame from server.
  721. // Server side: The number of streams that have ended successfully by sending
  722. // frame with EoS bit set.
  723. streamsSucceeded int64
  724. streamsFailed int64
  725. // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
  726. // instead of time.Time since it's more costly to atomically update time.Time variable than int64
  727. // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
  728. lastStreamCreatedTime int64
  729. msgSent int64
  730. msgRecv int64
  731. lastMsgSentTime int64
  732. lastMsgRecvTime int64
  733. }
  734. // ContextErr converts the error from context package into a status error.
  735. func ContextErr(err error) error {
  736. switch err {
  737. case context.DeadlineExceeded:
  738. return status.Error(codes.DeadlineExceeded, err.Error())
  739. case context.Canceled:
  740. return status.Error(codes.Canceled, err.Error())
  741. }
  742. return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
  743. }