transport.go 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. // Package transport defines and implements message oriented communication
  19. // channel to complete various transactions (e.g., an RPC). It is meant for
  20. // grpc-internal usage and is not intended to be imported directly by users.
  21. package transport
  22. import (
  23. "bytes"
  24. "context"
  25. "errors"
  26. "fmt"
  27. "io"
  28. "net"
  29. "sync"
  30. "sync/atomic"
  31. "google.golang.org/grpc/codes"
  32. "google.golang.org/grpc/credentials"
  33. "google.golang.org/grpc/keepalive"
  34. "google.golang.org/grpc/metadata"
  35. "google.golang.org/grpc/stats"
  36. "google.golang.org/grpc/status"
  37. "google.golang.org/grpc/tap"
  38. )
  39. type bufferPool struct {
  40. pool sync.Pool
  41. }
  42. func newBufferPool() *bufferPool {
  43. return &bufferPool{
  44. pool: sync.Pool{
  45. New: func() interface{} {
  46. return new(bytes.Buffer)
  47. },
  48. },
  49. }
  50. }
  51. func (p *bufferPool) get() *bytes.Buffer {
  52. return p.pool.Get().(*bytes.Buffer)
  53. }
  54. func (p *bufferPool) put(b *bytes.Buffer) {
  55. p.pool.Put(b)
  56. }
  57. // recvMsg represents the received msg from the transport. All transport
  58. // protocol specific info has been removed.
  59. type recvMsg struct {
  60. buffer *bytes.Buffer
  61. // nil: received some data
  62. // io.EOF: stream is completed. data is nil.
  63. // other non-nil error: transport failure. data is nil.
  64. err error
  65. }
  66. // recvBuffer is an unbounded channel of recvMsg structs.
  67. //
  68. // Note: recvBuffer differs from buffer.Unbounded only in the fact that it
  69. // holds a channel of recvMsg structs instead of objects implementing "item"
  70. // interface. recvBuffer is written to much more often and using strict recvMsg
  71. // structs helps avoid allocation in "recvBuffer.put"
  72. type recvBuffer struct {
  73. c chan recvMsg
  74. mu sync.Mutex
  75. backlog []recvMsg
  76. err error
  77. }
  78. func newRecvBuffer() *recvBuffer {
  79. b := &recvBuffer{
  80. c: make(chan recvMsg, 1),
  81. }
  82. return b
  83. }
  84. func (b *recvBuffer) put(r recvMsg) {
  85. b.mu.Lock()
  86. if b.err != nil {
  87. b.mu.Unlock()
  88. // An error had occurred earlier, don't accept more
  89. // data or errors.
  90. return
  91. }
  92. b.err = r.err
  93. if len(b.backlog) == 0 {
  94. select {
  95. case b.c <- r:
  96. b.mu.Unlock()
  97. return
  98. default:
  99. }
  100. }
  101. b.backlog = append(b.backlog, r)
  102. b.mu.Unlock()
  103. }
  104. func (b *recvBuffer) load() {
  105. b.mu.Lock()
  106. if len(b.backlog) > 0 {
  107. select {
  108. case b.c <- b.backlog[0]:
  109. b.backlog[0] = recvMsg{}
  110. b.backlog = b.backlog[1:]
  111. default:
  112. }
  113. }
  114. b.mu.Unlock()
  115. }
  116. // get returns the channel that receives a recvMsg in the buffer.
  117. //
  118. // Upon receipt of a recvMsg, the caller should call load to send another
  119. // recvMsg onto the channel if there is any.
  120. func (b *recvBuffer) get() <-chan recvMsg {
  121. return b.c
  122. }
  123. // recvBufferReader implements io.Reader interface to read the data from
  124. // recvBuffer.
  125. type recvBufferReader struct {
  126. closeStream func(error) // Closes the client transport stream with the given error and nil trailer metadata.
  127. ctx context.Context
  128. ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
  129. recv *recvBuffer
  130. last *bytes.Buffer // Stores the remaining data in the previous calls.
  131. err error
  132. freeBuffer func(*bytes.Buffer)
  133. }
  134. // Read reads the next len(p) bytes from last. If last is drained, it tries to
  135. // read additional data from recv. It blocks if there no additional data available
  136. // in recv. If Read returns any non-nil error, it will continue to return that error.
  137. func (r *recvBufferReader) Read(p []byte) (n int, err error) {
  138. if r.err != nil {
  139. return 0, r.err
  140. }
  141. if r.last != nil {
  142. // Read remaining data left in last call.
  143. copied, _ := r.last.Read(p)
  144. if r.last.Len() == 0 {
  145. r.freeBuffer(r.last)
  146. r.last = nil
  147. }
  148. return copied, nil
  149. }
  150. if r.closeStream != nil {
  151. n, r.err = r.readClient(p)
  152. } else {
  153. n, r.err = r.read(p)
  154. }
  155. return n, r.err
  156. }
  157. func (r *recvBufferReader) read(p []byte) (n int, err error) {
  158. select {
  159. case <-r.ctxDone:
  160. return 0, ContextErr(r.ctx.Err())
  161. case m := <-r.recv.get():
  162. return r.readAdditional(m, p)
  163. }
  164. }
  165. func (r *recvBufferReader) readClient(p []byte) (n int, err error) {
  166. // If the context is canceled, then closes the stream with nil metadata.
  167. // closeStream writes its error parameter to r.recv as a recvMsg.
  168. // r.readAdditional acts on that message and returns the necessary error.
  169. select {
  170. case <-r.ctxDone:
  171. // Note that this adds the ctx error to the end of recv buffer, and
  172. // reads from the head. This will delay the error until recv buffer is
  173. // empty, thus will delay ctx cancellation in Recv().
  174. //
  175. // It's done this way to fix a race between ctx cancel and trailer. The
  176. // race was, stream.Recv() may return ctx error if ctxDone wins the
  177. // race, but stream.Trailer() may return a non-nil md because the stream
  178. // was not marked as done when trailer is received. This closeStream
  179. // call will mark stream as done, thus fix the race.
  180. //
  181. // TODO: delaying ctx error seems like a unnecessary side effect. What
  182. // we really want is to mark the stream as done, and return ctx error
  183. // faster.
  184. r.closeStream(ContextErr(r.ctx.Err()))
  185. m := <-r.recv.get()
  186. return r.readAdditional(m, p)
  187. case m := <-r.recv.get():
  188. return r.readAdditional(m, p)
  189. }
  190. }
  191. func (r *recvBufferReader) readAdditional(m recvMsg, p []byte) (n int, err error) {
  192. r.recv.load()
  193. if m.err != nil {
  194. return 0, m.err
  195. }
  196. copied, _ := m.buffer.Read(p)
  197. if m.buffer.Len() == 0 {
  198. r.freeBuffer(m.buffer)
  199. r.last = nil
  200. } else {
  201. r.last = m.buffer
  202. }
  203. return copied, nil
  204. }
  205. type streamState uint32
  206. const (
  207. streamActive streamState = iota
  208. streamWriteDone // EndStream sent
  209. streamReadDone // EndStream received
  210. streamDone // the entire stream is finished.
  211. )
  212. // Stream represents an RPC in the transport layer.
  213. type Stream struct {
  214. id uint32
  215. st ServerTransport // nil for client side Stream
  216. ct *http2Client // nil for server side Stream
  217. ctx context.Context // the associated context of the stream
  218. cancel context.CancelFunc // always nil for client side Stream
  219. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
  220. ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
  221. method string // the associated RPC method of the stream
  222. recvCompress string
  223. sendCompress string
  224. buf *recvBuffer
  225. trReader io.Reader
  226. fc *inFlow
  227. wq *writeQuota
  228. // Callback to state application's intentions to read data. This
  229. // is used to adjust flow control, if needed.
  230. requestRead func(int)
  231. headerChan chan struct{} // closed to indicate the end of header metadata.
  232. headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
  233. // headerValid indicates whether a valid header was received. Only
  234. // meaningful after headerChan is closed (always call waitOnHeader() before
  235. // reading its value). Not valid on server side.
  236. headerValid bool
  237. // hdrMu protects header and trailer metadata on the server-side.
  238. hdrMu sync.Mutex
  239. // On client side, header keeps the received header metadata.
  240. //
  241. // On server side, header keeps the header set by SetHeader(). The complete
  242. // header will merged into this after t.WriteHeader() is called.
  243. header metadata.MD
  244. trailer metadata.MD // the key-value map of trailer metadata.
  245. noHeaders bool // set if the client never received headers (set only after the stream is done).
  246. // On the server-side, headerSent is atomically set to 1 when the headers are sent out.
  247. headerSent uint32
  248. state streamState
  249. // On client-side it is the status error received from the server.
  250. // On server-side it is unused.
  251. status *status.Status
  252. bytesReceived uint32 // indicates whether any bytes have been received on this stream
  253. unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
  254. // contentSubtype is the content-subtype for requests.
  255. // this must be lowercase or the behavior is undefined.
  256. contentSubtype string
  257. }
  258. // isHeaderSent is only valid on the server-side.
  259. func (s *Stream) isHeaderSent() bool {
  260. return atomic.LoadUint32(&s.headerSent) == 1
  261. }
  262. // updateHeaderSent updates headerSent and returns true
  263. // if it was alreay set. It is valid only on server-side.
  264. func (s *Stream) updateHeaderSent() bool {
  265. return atomic.SwapUint32(&s.headerSent, 1) == 1
  266. }
  267. func (s *Stream) swapState(st streamState) streamState {
  268. return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
  269. }
  270. func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
  271. return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
  272. }
  273. func (s *Stream) getState() streamState {
  274. return streamState(atomic.LoadUint32((*uint32)(&s.state)))
  275. }
  276. func (s *Stream) waitOnHeader() {
  277. if s.headerChan == nil {
  278. // On the server headerChan is always nil since a stream originates
  279. // only after having received headers.
  280. return
  281. }
  282. select {
  283. case <-s.ctx.Done():
  284. // Close the stream to prevent headers/trailers from changing after
  285. // this function returns.
  286. s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
  287. // headerChan could possibly not be closed yet if closeStream raced
  288. // with operateHeaders; wait until it is closed explicitly here.
  289. <-s.headerChan
  290. case <-s.headerChan:
  291. }
  292. }
  293. // RecvCompress returns the compression algorithm applied to the inbound
  294. // message. It is empty string if there is no compression applied.
  295. func (s *Stream) RecvCompress() string {
  296. s.waitOnHeader()
  297. return s.recvCompress
  298. }
  299. // SetSendCompress sets the compression algorithm to the stream.
  300. func (s *Stream) SetSendCompress(str string) {
  301. s.sendCompress = str
  302. }
  303. // Done returns a channel which is closed when it receives the final status
  304. // from the server.
  305. func (s *Stream) Done() <-chan struct{} {
  306. return s.done
  307. }
  308. // Header returns the header metadata of the stream.
  309. //
  310. // On client side, it acquires the key-value pairs of header metadata once it is
  311. // available. It blocks until i) the metadata is ready or ii) there is no header
  312. // metadata or iii) the stream is canceled/expired.
  313. //
  314. // On server side, it returns the out header after t.WriteHeader is called. It
  315. // does not block and must not be called until after WriteHeader.
  316. func (s *Stream) Header() (metadata.MD, error) {
  317. if s.headerChan == nil {
  318. // On server side, return the header in stream. It will be the out
  319. // header after t.WriteHeader is called.
  320. return s.header.Copy(), nil
  321. }
  322. s.waitOnHeader()
  323. if !s.headerValid {
  324. return nil, s.status.Err()
  325. }
  326. return s.header.Copy(), nil
  327. }
  328. // TrailersOnly blocks until a header or trailers-only frame is received and
  329. // then returns true if the stream was trailers-only. If the stream ends
  330. // before headers are received, returns true, nil. Client-side only.
  331. func (s *Stream) TrailersOnly() bool {
  332. s.waitOnHeader()
  333. return s.noHeaders
  334. }
  335. // Trailer returns the cached trailer metedata. Note that if it is not called
  336. // after the entire stream is done, it could return an empty MD. Client
  337. // side only.
  338. // It can be safely read only after stream has ended that is either read
  339. // or write have returned io.EOF.
  340. func (s *Stream) Trailer() metadata.MD {
  341. c := s.trailer.Copy()
  342. return c
  343. }
  344. // ContentSubtype returns the content-subtype for a request. For example, a
  345. // content-subtype of "proto" will result in a content-type of
  346. // "application/grpc+proto". This will always be lowercase. See
  347. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
  348. // more details.
  349. func (s *Stream) ContentSubtype() string {
  350. return s.contentSubtype
  351. }
  352. // Context returns the context of the stream.
  353. func (s *Stream) Context() context.Context {
  354. return s.ctx
  355. }
  356. // Method returns the method for the stream.
  357. func (s *Stream) Method() string {
  358. return s.method
  359. }
  360. // Status returns the status received from the server.
  361. // Status can be read safely only after the stream has ended,
  362. // that is, after Done() is closed.
  363. func (s *Stream) Status() *status.Status {
  364. return s.status
  365. }
  366. // SetHeader sets the header metadata. This can be called multiple times.
  367. // Server side only.
  368. // This should not be called in parallel to other data writes.
  369. func (s *Stream) SetHeader(md metadata.MD) error {
  370. if md.Len() == 0 {
  371. return nil
  372. }
  373. if s.isHeaderSent() || s.getState() == streamDone {
  374. return ErrIllegalHeaderWrite
  375. }
  376. s.hdrMu.Lock()
  377. s.header = metadata.Join(s.header, md)
  378. s.hdrMu.Unlock()
  379. return nil
  380. }
  381. // SendHeader sends the given header metadata. The given metadata is
  382. // combined with any metadata set by previous calls to SetHeader and
  383. // then written to the transport stream.
  384. func (s *Stream) SendHeader(md metadata.MD) error {
  385. return s.st.WriteHeader(s, md)
  386. }
  387. // SetTrailer sets the trailer metadata which will be sent with the RPC status
  388. // by the server. This can be called multiple times. Server side only.
  389. // This should not be called parallel to other data writes.
  390. func (s *Stream) SetTrailer(md metadata.MD) error {
  391. if md.Len() == 0 {
  392. return nil
  393. }
  394. if s.getState() == streamDone {
  395. return ErrIllegalHeaderWrite
  396. }
  397. s.hdrMu.Lock()
  398. s.trailer = metadata.Join(s.trailer, md)
  399. s.hdrMu.Unlock()
  400. return nil
  401. }
  402. func (s *Stream) write(m recvMsg) {
  403. s.buf.put(m)
  404. }
  405. // Read reads all p bytes from the wire for this stream.
  406. func (s *Stream) Read(p []byte) (n int, err error) {
  407. // Don't request a read if there was an error earlier
  408. if er := s.trReader.(*transportReader).er; er != nil {
  409. return 0, er
  410. }
  411. s.requestRead(len(p))
  412. return io.ReadFull(s.trReader, p)
  413. }
  414. // tranportReader reads all the data available for this Stream from the transport and
  415. // passes them into the decoder, which converts them into a gRPC message stream.
  416. // The error is io.EOF when the stream is done or another non-nil error if
  417. // the stream broke.
  418. type transportReader struct {
  419. reader io.Reader
  420. // The handler to control the window update procedure for both this
  421. // particular stream and the associated transport.
  422. windowHandler func(int)
  423. er error
  424. }
  425. func (t *transportReader) Read(p []byte) (n int, err error) {
  426. n, err = t.reader.Read(p)
  427. if err != nil {
  428. t.er = err
  429. return
  430. }
  431. t.windowHandler(n)
  432. return
  433. }
  434. // BytesReceived indicates whether any bytes have been received on this stream.
  435. func (s *Stream) BytesReceived() bool {
  436. return atomic.LoadUint32(&s.bytesReceived) == 1
  437. }
  438. // Unprocessed indicates whether the server did not process this stream --
  439. // i.e. it sent a refused stream or GOAWAY including this stream ID.
  440. func (s *Stream) Unprocessed() bool {
  441. return atomic.LoadUint32(&s.unprocessed) == 1
  442. }
  443. // GoString is implemented by Stream so context.String() won't
  444. // race when printing %#v.
  445. func (s *Stream) GoString() string {
  446. return fmt.Sprintf("<stream: %p, %v>", s, s.method)
  447. }
  448. // state of transport
  449. type transportState int
  450. const (
  451. reachable transportState = iota
  452. closing
  453. draining
  454. )
  455. // ServerConfig consists of all the configurations to establish a server transport.
  456. type ServerConfig struct {
  457. MaxStreams uint32
  458. AuthInfo credentials.AuthInfo
  459. InTapHandle tap.ServerInHandle
  460. StatsHandler stats.Handler
  461. KeepaliveParams keepalive.ServerParameters
  462. KeepalivePolicy keepalive.EnforcementPolicy
  463. InitialWindowSize int32
  464. InitialConnWindowSize int32
  465. WriteBufferSize int
  466. ReadBufferSize int
  467. ChannelzParentID int64
  468. MaxHeaderListSize *uint32
  469. HeaderTableSize *uint32
  470. }
  471. // NewServerTransport creates a ServerTransport with conn or non-nil error
  472. // if it fails.
  473. func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
  474. return newHTTP2Server(conn, config)
  475. }
  476. // ConnectOptions covers all relevant options for communicating with the server.
  477. type ConnectOptions struct {
  478. // UserAgent is the application user agent.
  479. UserAgent string
  480. // Dialer specifies how to dial a network address.
  481. Dialer func(context.Context, string) (net.Conn, error)
  482. // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
  483. FailOnNonTempDialError bool
  484. // PerRPCCredentials stores the PerRPCCredentials required to issue RPCs.
  485. PerRPCCredentials []credentials.PerRPCCredentials
  486. // TransportCredentials stores the Authenticator required to setup a client
  487. // connection. Only one of TransportCredentials and CredsBundle is non-nil.
  488. TransportCredentials credentials.TransportCredentials
  489. // CredsBundle is the credentials bundle to be used. Only one of
  490. // TransportCredentials and CredsBundle is non-nil.
  491. CredsBundle credentials.Bundle
  492. // KeepaliveParams stores the keepalive parameters.
  493. KeepaliveParams keepalive.ClientParameters
  494. // StatsHandler stores the handler for stats.
  495. StatsHandler stats.Handler
  496. // InitialWindowSize sets the initial window size for a stream.
  497. InitialWindowSize int32
  498. // InitialConnWindowSize sets the initial window size for a connection.
  499. InitialConnWindowSize int32
  500. // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire.
  501. WriteBufferSize int
  502. // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall.
  503. ReadBufferSize int
  504. // ChannelzParentID sets the addrConn id which initiate the creation of this client transport.
  505. ChannelzParentID int64
  506. // MaxHeaderListSize sets the max (uncompressed) size of header list that is prepared to be received.
  507. MaxHeaderListSize *uint32
  508. }
  509. // TargetInfo contains the information of the target such as network address and metadata.
  510. type TargetInfo struct {
  511. Addr string
  512. Metadata interface{}
  513. Authority string
  514. }
  515. // NewClientTransport establishes the transport with the required ConnectOptions
  516. // and returns it to the caller.
  517. func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
  518. return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
  519. }
  520. // Options provides additional hints and information for message
  521. // transmission.
  522. type Options struct {
  523. // Last indicates whether this write is the last piece for
  524. // this stream.
  525. Last bool
  526. }
  527. // CallHdr carries the information of a particular RPC.
  528. type CallHdr struct {
  529. // Host specifies the peer's host.
  530. Host string
  531. // Method specifies the operation to perform.
  532. Method string
  533. // SendCompress specifies the compression algorithm applied on
  534. // outbound message.
  535. SendCompress string
  536. // Creds specifies credentials.PerRPCCredentials for a call.
  537. Creds credentials.PerRPCCredentials
  538. // ContentSubtype specifies the content-subtype for a request. For example, a
  539. // content-subtype of "proto" will result in a content-type of
  540. // "application/grpc+proto". The value of ContentSubtype must be all
  541. // lowercase, otherwise the behavior is undefined. See
  542. // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests
  543. // for more details.
  544. ContentSubtype string
  545. PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
  546. }
  547. // ClientTransport is the common interface for all gRPC client-side transport
  548. // implementations.
  549. type ClientTransport interface {
  550. // Close tears down this transport. Once it returns, the transport
  551. // should not be accessed any more. The caller must make sure this
  552. // is called only once.
  553. Close() error
  554. // GracefulClose starts to tear down the transport: the transport will stop
  555. // accepting new RPCs and NewStream will return error. Once all streams are
  556. // finished, the transport will close.
  557. //
  558. // It does not block.
  559. GracefulClose()
  560. // Write sends the data for the given stream. A nil stream indicates
  561. // the write is to be performed on the transport as a whole.
  562. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  563. // NewStream creates a Stream for an RPC.
  564. NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
  565. // CloseStream clears the footprint of a stream when the stream is
  566. // not needed any more. The err indicates the error incurred when
  567. // CloseStream is called. Must be called when a stream is finished
  568. // unless the associated transport is closing.
  569. CloseStream(stream *Stream, err error)
  570. // Error returns a channel that is closed when some I/O error
  571. // happens. Typically the caller should have a goroutine to monitor
  572. // this in order to take action (e.g., close the current transport
  573. // and create a new one) in error case. It should not return nil
  574. // once the transport is initiated.
  575. Error() <-chan struct{}
  576. // GoAway returns a channel that is closed when ClientTransport
  577. // receives the draining signal from the server (e.g., GOAWAY frame in
  578. // HTTP/2).
  579. GoAway() <-chan struct{}
  580. // GetGoAwayReason returns the reason why GoAway frame was received.
  581. GetGoAwayReason() GoAwayReason
  582. // RemoteAddr returns the remote network address.
  583. RemoteAddr() net.Addr
  584. // IncrMsgSent increments the number of message sent through this transport.
  585. IncrMsgSent()
  586. // IncrMsgRecv increments the number of message received through this transport.
  587. IncrMsgRecv()
  588. }
  589. // ServerTransport is the common interface for all gRPC server-side transport
  590. // implementations.
  591. //
  592. // Methods may be called concurrently from multiple goroutines, but
  593. // Write methods for a given Stream will be called serially.
  594. type ServerTransport interface {
  595. // HandleStreams receives incoming streams using the given handler.
  596. HandleStreams(func(*Stream), func(context.Context, string) context.Context)
  597. // WriteHeader sends the header metadata for the given stream.
  598. // WriteHeader may not be called on all streams.
  599. WriteHeader(s *Stream, md metadata.MD) error
  600. // Write sends the data for the given stream.
  601. // Write may not be called on all streams.
  602. Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  603. // WriteStatus sends the status of a stream to the client. WriteStatus is
  604. // the final call made on a stream and always occurs.
  605. WriteStatus(s *Stream, st *status.Status) error
  606. // Close tears down the transport. Once it is called, the transport
  607. // should not be accessed any more. All the pending streams and their
  608. // handlers will be terminated asynchronously.
  609. Close() error
  610. // RemoteAddr returns the remote network address.
  611. RemoteAddr() net.Addr
  612. // Drain notifies the client this ServerTransport stops accepting new RPCs.
  613. Drain()
  614. // IncrMsgSent increments the number of message sent through this transport.
  615. IncrMsgSent()
  616. // IncrMsgRecv increments the number of message received through this transport.
  617. IncrMsgRecv()
  618. }
  619. // connectionErrorf creates an ConnectionError with the specified error description.
  620. func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
  621. return ConnectionError{
  622. Desc: fmt.Sprintf(format, a...),
  623. temp: temp,
  624. err: e,
  625. }
  626. }
  627. // ConnectionError is an error that results in the termination of the
  628. // entire connection and the retry of all the active streams.
  629. type ConnectionError struct {
  630. Desc string
  631. temp bool
  632. err error
  633. }
  634. func (e ConnectionError) Error() string {
  635. return fmt.Sprintf("connection error: desc = %q", e.Desc)
  636. }
  637. // Temporary indicates if this connection error is temporary or fatal.
  638. func (e ConnectionError) Temporary() bool {
  639. return e.temp
  640. }
  641. // Origin returns the original error of this connection error.
  642. func (e ConnectionError) Origin() error {
  643. // Never return nil error here.
  644. // If the original error is nil, return itself.
  645. if e.err == nil {
  646. return e
  647. }
  648. return e.err
  649. }
  650. var (
  651. // ErrConnClosing indicates that the transport is closing.
  652. ErrConnClosing = connectionErrorf(true, nil, "transport is closing")
  653. // errStreamDrain indicates that the stream is rejected because the
  654. // connection is draining. This could be caused by goaway or balancer
  655. // removing the address.
  656. errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
  657. // errStreamDone is returned from write at the client side to indiacte application
  658. // layer of an error.
  659. errStreamDone = errors.New("the stream is done")
  660. // StatusGoAway indicates that the server sent a GOAWAY that included this
  661. // stream's ID in unprocessed RPCs.
  662. statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
  663. )
  664. // GoAwayReason contains the reason for the GoAway frame received.
  665. type GoAwayReason uint8
  666. const (
  667. // GoAwayInvalid indicates that no GoAway frame is received.
  668. GoAwayInvalid GoAwayReason = 0
  669. // GoAwayNoReason is the default value when GoAway frame is received.
  670. GoAwayNoReason GoAwayReason = 1
  671. // GoAwayTooManyPings indicates that a GoAway frame with
  672. // ErrCodeEnhanceYourCalm was received and that the debug data said
  673. // "too_many_pings".
  674. GoAwayTooManyPings GoAwayReason = 2
  675. )
  676. // channelzData is used to store channelz related data for http2Client and http2Server.
  677. // These fields cannot be embedded in the original structs (e.g. http2Client), since to do atomic
  678. // operation on int64 variable on 32-bit machine, user is responsible to enforce memory alignment.
  679. // Here, by grouping those int64 fields inside a struct, we are enforcing the alignment.
  680. type channelzData struct {
  681. kpCount int64
  682. // The number of streams that have started, including already finished ones.
  683. streamsStarted int64
  684. // Client side: The number of streams that have ended successfully by receiving
  685. // EoS bit set frame from server.
  686. // Server side: The number of streams that have ended successfully by sending
  687. // frame with EoS bit set.
  688. streamsSucceeded int64
  689. streamsFailed int64
  690. // lastStreamCreatedTime stores the timestamp that the last stream gets created. It is of int64 type
  691. // instead of time.Time since it's more costly to atomically update time.Time variable than int64
  692. // variable. The same goes for lastMsgSentTime and lastMsgRecvTime.
  693. lastStreamCreatedTime int64
  694. msgSent int64
  695. msgRecv int64
  696. lastMsgSentTime int64
  697. lastMsgRecvTime int64
  698. }
  699. // ContextErr converts the error from context package into a status error.
  700. func ContextErr(err error) error {
  701. switch err {
  702. case context.DeadlineExceeded:
  703. return status.Error(codes.DeadlineExceeded, err.Error())
  704. case context.Canceled:
  705. return status.Error(codes.Canceled, err.Error())
  706. }
  707. return status.Errorf(codes.Internal, "Unexpected error from context packet: %v", err)
  708. }