http2_client.go 45 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "strconv"
  26. "strings"
  27. "sync"
  28. "sync/atomic"
  29. "time"
  30. "golang.org/x/net/http2"
  31. "golang.org/x/net/http2/hpack"
  32. "google.golang.org/grpc/codes"
  33. "google.golang.org/grpc/credentials"
  34. "google.golang.org/grpc/internal"
  35. "google.golang.org/grpc/internal/channelz"
  36. "google.golang.org/grpc/internal/syscall"
  37. "google.golang.org/grpc/keepalive"
  38. "google.golang.org/grpc/metadata"
  39. "google.golang.org/grpc/peer"
  40. "google.golang.org/grpc/stats"
  41. "google.golang.org/grpc/status"
  42. )
  43. // clientConnectionCounter counts the number of connections a client has
  44. // initiated (equal to the number of http2Clients created). Must be accessed
  45. // atomically.
  46. var clientConnectionCounter uint64
  47. // http2Client implements the ClientTransport interface with HTTP2.
  48. type http2Client struct {
  49. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  50. ctx context.Context
  51. cancel context.CancelFunc
  52. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  53. userAgent string
  54. md interface{}
  55. conn net.Conn // underlying communication channel
  56. loopy *loopyWriter
  57. remoteAddr net.Addr
  58. localAddr net.Addr
  59. authInfo credentials.AuthInfo // auth info about the connection
  60. readerDone chan struct{} // sync point to enable testing.
  61. writerDone chan struct{} // sync point to enable testing.
  62. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  63. // that the server sent GoAway on this transport.
  64. goAway chan struct{}
  65. framer *framer
  66. // controlBuf delivers all the control related tasks (e.g., window
  67. // updates, reset streams, and various settings) to the controller.
  68. controlBuf *controlBuffer
  69. fc *trInFlow
  70. // The scheme used: https if TLS is on, http otherwise.
  71. scheme string
  72. isSecure bool
  73. perRPCCreds []credentials.PerRPCCredentials
  74. kp keepalive.ClientParameters
  75. keepaliveEnabled bool
  76. statsHandler stats.Handler
  77. initialWindowSize int32
  78. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  79. maxSendHeaderListSize *uint32
  80. bdpEst *bdpEstimator
  81. // onPrefaceReceipt is a callback that client transport calls upon
  82. // receiving server preface to signal that a succefull HTTP2
  83. // connection was established.
  84. onPrefaceReceipt func()
  85. maxConcurrentStreams uint32
  86. streamQuota int64
  87. streamsQuotaAvailable chan struct{}
  88. waitingStreams uint32
  89. nextID uint32
  90. mu sync.Mutex // guard the following variables
  91. state transportState
  92. activeStreams map[uint32]*Stream
  93. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  94. prevGoAwayID uint32
  95. // goAwayReason records the http2.ErrCode and debug data received with the
  96. // GoAway frame.
  97. goAwayReason GoAwayReason
  98. // A condition variable used to signal when the keepalive goroutine should
  99. // go dormant. The condition for dormancy is based on the number of active
  100. // streams and the `PermitWithoutStream` keepalive client parameter. And
  101. // since the number of active streams is guarded by the above mutex, we use
  102. // the same for this condition variable as well.
  103. kpDormancyCond *sync.Cond
  104. // A boolean to track whether the keepalive goroutine is dormant or not.
  105. // This is checked before attempting to signal the above condition
  106. // variable.
  107. kpDormant bool
  108. // Fields below are for channelz metric collection.
  109. channelzID int64 // channelz unique identification number
  110. czData *channelzData
  111. onGoAway func(GoAwayReason)
  112. onClose func()
  113. bufferPool *bufferPool
  114. connectionID uint64
  115. }
  116. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
  117. if fn != nil {
  118. return fn(ctx, addr)
  119. }
  120. return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
  121. }
  122. func isTemporary(err error) bool {
  123. switch err := err.(type) {
  124. case interface {
  125. Temporary() bool
  126. }:
  127. return err.Temporary()
  128. case interface {
  129. Timeout() bool
  130. }:
  131. // Timeouts may be resolved upon retry, and are thus treated as
  132. // temporary.
  133. return err.Timeout()
  134. }
  135. return true
  136. }
  137. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  138. // and starts to receive messages on it. Non-nil error returns if construction
  139. // fails.
  140. func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
  141. scheme := "http"
  142. ctx, cancel := context.WithCancel(ctx)
  143. defer func() {
  144. if err != nil {
  145. cancel()
  146. }
  147. }()
  148. conn, err := dial(connectCtx, opts.Dialer, addr.Addr)
  149. if err != nil {
  150. if opts.FailOnNonTempDialError {
  151. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  152. }
  153. return nil, connectionErrorf(true, err, "transport: Error while dialing %v", err)
  154. }
  155. // Any further errors will close the underlying connection
  156. defer func(conn net.Conn) {
  157. if err != nil {
  158. conn.Close()
  159. }
  160. }(conn)
  161. kp := opts.KeepaliveParams
  162. // Validate keepalive parameters.
  163. if kp.Time == 0 {
  164. kp.Time = defaultClientKeepaliveTime
  165. }
  166. if kp.Timeout == 0 {
  167. kp.Timeout = defaultClientKeepaliveTimeout
  168. }
  169. keepaliveEnabled := false
  170. if kp.Time != infinity {
  171. if err = syscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  172. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  173. }
  174. keepaliveEnabled = true
  175. }
  176. var (
  177. isSecure bool
  178. authInfo credentials.AuthInfo
  179. )
  180. transportCreds := opts.TransportCredentials
  181. perRPCCreds := opts.PerRPCCredentials
  182. if b := opts.CredsBundle; b != nil {
  183. if t := b.TransportCredentials(); t != nil {
  184. transportCreds = t
  185. }
  186. if t := b.PerRPCCredentials(); t != nil {
  187. perRPCCreds = append(perRPCCreds, t)
  188. }
  189. }
  190. if transportCreds != nil {
  191. scheme = "https"
  192. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.Authority, conn)
  193. if err != nil {
  194. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  195. }
  196. isSecure = true
  197. }
  198. dynamicWindow := true
  199. icwz := int32(initialWindowSize)
  200. if opts.InitialConnWindowSize >= defaultWindowSize {
  201. icwz = opts.InitialConnWindowSize
  202. dynamicWindow = false
  203. }
  204. writeBufSize := opts.WriteBufferSize
  205. readBufSize := opts.ReadBufferSize
  206. maxHeaderListSize := defaultClientMaxHeaderListSize
  207. if opts.MaxHeaderListSize != nil {
  208. maxHeaderListSize = *opts.MaxHeaderListSize
  209. }
  210. t := &http2Client{
  211. ctx: ctx,
  212. ctxDone: ctx.Done(), // Cache Done chan.
  213. cancel: cancel,
  214. userAgent: opts.UserAgent,
  215. md: addr.Metadata,
  216. conn: conn,
  217. remoteAddr: conn.RemoteAddr(),
  218. localAddr: conn.LocalAddr(),
  219. authInfo: authInfo,
  220. readerDone: make(chan struct{}),
  221. writerDone: make(chan struct{}),
  222. goAway: make(chan struct{}),
  223. framer: newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize),
  224. fc: &trInFlow{limit: uint32(icwz)},
  225. scheme: scheme,
  226. activeStreams: make(map[uint32]*Stream),
  227. isSecure: isSecure,
  228. perRPCCreds: perRPCCreds,
  229. kp: kp,
  230. statsHandler: opts.StatsHandler,
  231. initialWindowSize: initialWindowSize,
  232. onPrefaceReceipt: onPrefaceReceipt,
  233. nextID: 1,
  234. maxConcurrentStreams: defaultMaxStreamsClient,
  235. streamQuota: defaultMaxStreamsClient,
  236. streamsQuotaAvailable: make(chan struct{}, 1),
  237. czData: new(channelzData),
  238. onGoAway: onGoAway,
  239. onClose: onClose,
  240. keepaliveEnabled: keepaliveEnabled,
  241. bufferPool: newBufferPool(),
  242. }
  243. t.controlBuf = newControlBuffer(t.ctxDone)
  244. if opts.InitialWindowSize >= defaultWindowSize {
  245. t.initialWindowSize = opts.InitialWindowSize
  246. dynamicWindow = false
  247. }
  248. if dynamicWindow {
  249. t.bdpEst = &bdpEstimator{
  250. bdp: initialWindowSize,
  251. updateFlowControl: t.updateFlowControl,
  252. }
  253. }
  254. if t.statsHandler != nil {
  255. t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{
  256. RemoteAddr: t.remoteAddr,
  257. LocalAddr: t.localAddr,
  258. })
  259. connBegin := &stats.ConnBegin{
  260. Client: true,
  261. }
  262. t.statsHandler.HandleConn(t.ctx, connBegin)
  263. }
  264. if channelz.IsOn() {
  265. t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  266. }
  267. if t.keepaliveEnabled {
  268. t.kpDormancyCond = sync.NewCond(&t.mu)
  269. go t.keepalive()
  270. }
  271. // Start the reader goroutine for incoming message. Each transport has
  272. // a dedicated goroutine which reads HTTP2 frame from network. Then it
  273. // dispatches the frame to the corresponding stream entity.
  274. go t.reader()
  275. // Send connection preface to server.
  276. n, err := t.conn.Write(clientPreface)
  277. if err != nil {
  278. t.Close()
  279. return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  280. }
  281. if n != len(clientPreface) {
  282. t.Close()
  283. return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  284. }
  285. var ss []http2.Setting
  286. if t.initialWindowSize != defaultWindowSize {
  287. ss = append(ss, http2.Setting{
  288. ID: http2.SettingInitialWindowSize,
  289. Val: uint32(t.initialWindowSize),
  290. })
  291. }
  292. if opts.MaxHeaderListSize != nil {
  293. ss = append(ss, http2.Setting{
  294. ID: http2.SettingMaxHeaderListSize,
  295. Val: *opts.MaxHeaderListSize,
  296. })
  297. }
  298. err = t.framer.fr.WriteSettings(ss...)
  299. if err != nil {
  300. t.Close()
  301. return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  302. }
  303. // Adjust the connection flow control window if needed.
  304. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  305. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  306. t.Close()
  307. return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  308. }
  309. }
  310. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  311. if err := t.framer.writer.Flush(); err != nil {
  312. return nil, err
  313. }
  314. go func() {
  315. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
  316. err := t.loopy.run()
  317. if err != nil {
  318. errorf("transport: loopyWriter.run returning. Err: %v", err)
  319. }
  320. // If it's a connection error, let reader goroutine handle it
  321. // since there might be data in the buffers.
  322. if _, ok := err.(net.Error); !ok {
  323. t.conn.Close()
  324. }
  325. close(t.writerDone)
  326. }()
  327. return t, nil
  328. }
  329. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  330. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  331. s := &Stream{
  332. ct: t,
  333. done: make(chan struct{}),
  334. method: callHdr.Method,
  335. sendCompress: callHdr.SendCompress,
  336. buf: newRecvBuffer(),
  337. headerChan: make(chan struct{}),
  338. contentSubtype: callHdr.ContentSubtype,
  339. }
  340. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  341. s.requestRead = func(n int) {
  342. t.adjustWindow(s, uint32(n))
  343. }
  344. // The client side stream context should have exactly the same life cycle with the user provided context.
  345. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  346. // So we use the original context here instead of creating a copy.
  347. s.ctx = ctx
  348. s.trReader = &transportReader{
  349. reader: &recvBufferReader{
  350. ctx: s.ctx,
  351. ctxDone: s.ctx.Done(),
  352. recv: s.buf,
  353. closeStream: func(err error) {
  354. t.CloseStream(s, err)
  355. },
  356. freeBuffer: t.bufferPool.put,
  357. },
  358. windowHandler: func(n int) {
  359. t.updateWindow(s, uint32(n))
  360. },
  361. }
  362. return s
  363. }
  364. func (t *http2Client) getPeer() *peer.Peer {
  365. return &peer.Peer{
  366. Addr: t.remoteAddr,
  367. AuthInfo: t.authInfo,
  368. }
  369. }
  370. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  371. aud := t.createAudience(callHdr)
  372. ri := credentials.RequestInfo{
  373. Method: callHdr.Method,
  374. }
  375. ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri)
  376. authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
  377. if err != nil {
  378. return nil, err
  379. }
  380. callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
  381. if err != nil {
  382. return nil, err
  383. }
  384. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  385. // first and create a slice of that exact size.
  386. // Make the slice of certain predictable size to reduce allocations made by append.
  387. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  388. hfLen += len(authData) + len(callAuthData)
  389. headerFields := make([]hpack.HeaderField, 0, hfLen)
  390. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  391. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  392. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  393. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  394. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(callHdr.ContentSubtype)})
  395. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  396. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  397. if callHdr.PreviousAttempts > 0 {
  398. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  399. }
  400. if callHdr.SendCompress != "" {
  401. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  402. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: callHdr.SendCompress})
  403. }
  404. if dl, ok := ctx.Deadline(); ok {
  405. // Send out timeout regardless its value. The server can detect timeout context by itself.
  406. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  407. timeout := time.Until(dl)
  408. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: encodeTimeout(timeout)})
  409. }
  410. for k, v := range authData {
  411. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  412. }
  413. for k, v := range callAuthData {
  414. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  415. }
  416. if b := stats.OutgoingTags(ctx); b != nil {
  417. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  418. }
  419. if b := stats.OutgoingTrace(ctx); b != nil {
  420. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  421. }
  422. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  423. var k string
  424. for k, vv := range md {
  425. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  426. if isReservedHeader(k) {
  427. continue
  428. }
  429. for _, v := range vv {
  430. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  431. }
  432. }
  433. for _, vv := range added {
  434. for i, v := range vv {
  435. if i%2 == 0 {
  436. k = v
  437. continue
  438. }
  439. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  440. if isReservedHeader(k) {
  441. continue
  442. }
  443. headerFields = append(headerFields, hpack.HeaderField{Name: strings.ToLower(k), Value: encodeMetadataHeader(k, v)})
  444. }
  445. }
  446. }
  447. if md, ok := t.md.(*metadata.MD); ok {
  448. for k, vv := range *md {
  449. if isReservedHeader(k) {
  450. continue
  451. }
  452. for _, v := range vv {
  453. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  454. }
  455. }
  456. }
  457. return headerFields, nil
  458. }
  459. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  460. // Create an audience string only if needed.
  461. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  462. return ""
  463. }
  464. // Construct URI required to get auth request metadata.
  465. // Omit port if it is the default one.
  466. host := strings.TrimSuffix(callHdr.Host, ":443")
  467. pos := strings.LastIndex(callHdr.Method, "/")
  468. if pos == -1 {
  469. pos = len(callHdr.Method)
  470. }
  471. return "https://" + host + callHdr.Method[:pos]
  472. }
  473. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  474. if len(t.perRPCCreds) == 0 {
  475. return nil, nil
  476. }
  477. authData := map[string]string{}
  478. for _, c := range t.perRPCCreds {
  479. data, err := c.GetRequestMetadata(ctx, audience)
  480. if err != nil {
  481. if _, ok := status.FromError(err); ok {
  482. return nil, err
  483. }
  484. return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
  485. }
  486. for k, v := range data {
  487. // Capital header names are illegal in HTTP/2.
  488. k = strings.ToLower(k)
  489. authData[k] = v
  490. }
  491. }
  492. return authData, nil
  493. }
  494. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  495. var callAuthData map[string]string
  496. // Check if credentials.PerRPCCredentials were provided via call options.
  497. // Note: if these credentials are provided both via dial options and call
  498. // options, then both sets of credentials will be applied.
  499. if callCreds := callHdr.Creds; callCreds != nil {
  500. if !t.isSecure && callCreds.RequireTransportSecurity() {
  501. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  502. }
  503. data, err := callCreds.GetRequestMetadata(ctx, audience)
  504. if err != nil {
  505. return nil, status.Errorf(codes.Internal, "transport: %v", err)
  506. }
  507. callAuthData = make(map[string]string, len(data))
  508. for k, v := range data {
  509. // Capital header names are illegal in HTTP/2
  510. k = strings.ToLower(k)
  511. callAuthData[k] = v
  512. }
  513. }
  514. return callAuthData, nil
  515. }
  516. // NewStream creates a stream and registers it into the transport as "active"
  517. // streams.
  518. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Stream, err error) {
  519. ctx = peer.NewContext(ctx, t.getPeer())
  520. headerFields, err := t.createHeaderFields(ctx, callHdr)
  521. if err != nil {
  522. return nil, err
  523. }
  524. s := t.newStream(ctx, callHdr)
  525. cleanup := func(err error) {
  526. if s.swapState(streamDone) == streamDone {
  527. // If it was already done, return.
  528. return
  529. }
  530. // The stream was unprocessed by the server.
  531. atomic.StoreUint32(&s.unprocessed, 1)
  532. s.write(recvMsg{err: err})
  533. close(s.done)
  534. // If headerChan isn't closed, then close it.
  535. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  536. close(s.headerChan)
  537. }
  538. }
  539. hdr := &headerFrame{
  540. hf: headerFields,
  541. endStream: false,
  542. initStream: func(id uint32) error {
  543. t.mu.Lock()
  544. if state := t.state; state != reachable {
  545. t.mu.Unlock()
  546. // Do a quick cleanup.
  547. err := error(errStreamDrain)
  548. if state == closing {
  549. err = ErrConnClosing
  550. }
  551. cleanup(err)
  552. return err
  553. }
  554. t.activeStreams[id] = s
  555. if channelz.IsOn() {
  556. atomic.AddInt64(&t.czData.streamsStarted, 1)
  557. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  558. }
  559. // If the keepalive goroutine has gone dormant, wake it up.
  560. if t.kpDormant {
  561. t.kpDormancyCond.Signal()
  562. }
  563. t.mu.Unlock()
  564. return nil
  565. },
  566. onOrphaned: cleanup,
  567. wq: s.wq,
  568. }
  569. firstTry := true
  570. var ch chan struct{}
  571. checkForStreamQuota := func(it interface{}) bool {
  572. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  573. if firstTry {
  574. t.waitingStreams++
  575. }
  576. ch = t.streamsQuotaAvailable
  577. return false
  578. }
  579. if !firstTry {
  580. t.waitingStreams--
  581. }
  582. t.streamQuota--
  583. h := it.(*headerFrame)
  584. h.streamID = t.nextID
  585. t.nextID += 2
  586. s.id = h.streamID
  587. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  588. if t.streamQuota > 0 && t.waitingStreams > 0 {
  589. select {
  590. case t.streamsQuotaAvailable <- struct{}{}:
  591. default:
  592. }
  593. }
  594. return true
  595. }
  596. var hdrListSizeErr error
  597. checkForHeaderListSize := func(it interface{}) bool {
  598. if t.maxSendHeaderListSize == nil {
  599. return true
  600. }
  601. hdrFrame := it.(*headerFrame)
  602. var sz int64
  603. for _, f := range hdrFrame.hf {
  604. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  605. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  606. return false
  607. }
  608. }
  609. return true
  610. }
  611. for {
  612. success, err := t.controlBuf.executeAndPut(func(it interface{}) bool {
  613. if !checkForStreamQuota(it) {
  614. return false
  615. }
  616. if !checkForHeaderListSize(it) {
  617. return false
  618. }
  619. return true
  620. }, hdr)
  621. if err != nil {
  622. return nil, err
  623. }
  624. if success {
  625. break
  626. }
  627. if hdrListSizeErr != nil {
  628. return nil, hdrListSizeErr
  629. }
  630. firstTry = false
  631. select {
  632. case <-ch:
  633. case <-s.ctx.Done():
  634. return nil, ContextErr(s.ctx.Err())
  635. case <-t.goAway:
  636. return nil, errStreamDrain
  637. case <-t.ctx.Done():
  638. return nil, ErrConnClosing
  639. }
  640. }
  641. if t.statsHandler != nil {
  642. header, _, _ := metadata.FromOutgoingContextRaw(ctx)
  643. outHeader := &stats.OutHeader{
  644. Client: true,
  645. FullMethod: callHdr.Method,
  646. RemoteAddr: t.remoteAddr,
  647. LocalAddr: t.localAddr,
  648. Compression: callHdr.SendCompress,
  649. Header: header.Copy(),
  650. }
  651. t.statsHandler.HandleRPC(s.ctx, outHeader)
  652. }
  653. return s, nil
  654. }
  655. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  656. // This must not be executed in reader's goroutine.
  657. func (t *http2Client) CloseStream(s *Stream, err error) {
  658. var (
  659. rst bool
  660. rstCode http2.ErrCode
  661. )
  662. if err != nil {
  663. rst = true
  664. rstCode = http2.ErrCodeCancel
  665. }
  666. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  667. }
  668. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  669. // Set stream status to done.
  670. if s.swapState(streamDone) == streamDone {
  671. // If it was already done, return. If multiple closeStream calls
  672. // happen simultaneously, wait for the first to finish.
  673. <-s.done
  674. return
  675. }
  676. // status and trailers can be updated here without any synchronization because the stream goroutine will
  677. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  678. // only after updating this.
  679. s.status = st
  680. if len(mdata) > 0 {
  681. s.trailer = mdata
  682. }
  683. if err != nil {
  684. // This will unblock reads eventually.
  685. s.write(recvMsg{err: err})
  686. }
  687. // If headerChan isn't closed, then close it.
  688. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  689. s.noHeaders = true
  690. close(s.headerChan)
  691. }
  692. cleanup := &cleanupStream{
  693. streamID: s.id,
  694. onWrite: func() {
  695. t.mu.Lock()
  696. if t.activeStreams != nil {
  697. delete(t.activeStreams, s.id)
  698. }
  699. t.mu.Unlock()
  700. if channelz.IsOn() {
  701. if eosReceived {
  702. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  703. } else {
  704. atomic.AddInt64(&t.czData.streamsFailed, 1)
  705. }
  706. }
  707. },
  708. rst: rst,
  709. rstCode: rstCode,
  710. }
  711. addBackStreamQuota := func(interface{}) bool {
  712. t.streamQuota++
  713. if t.streamQuota > 0 && t.waitingStreams > 0 {
  714. select {
  715. case t.streamsQuotaAvailable <- struct{}{}:
  716. default:
  717. }
  718. }
  719. return true
  720. }
  721. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  722. // This will unblock write.
  723. close(s.done)
  724. }
  725. // Close kicks off the shutdown process of the transport. This should be called
  726. // only once on a transport. Once it is called, the transport should not be
  727. // accessed any more.
  728. //
  729. // This method blocks until the addrConn that initiated this transport is
  730. // re-connected. This happens because t.onClose() begins reconnect logic at the
  731. // addrConn level and blocks until the addrConn is successfully connected.
  732. func (t *http2Client) Close() error {
  733. t.mu.Lock()
  734. // Make sure we only Close once.
  735. if t.state == closing {
  736. t.mu.Unlock()
  737. return nil
  738. }
  739. // Call t.onClose before setting the state to closing to prevent the client
  740. // from attempting to create new streams ASAP.
  741. t.onClose()
  742. t.state = closing
  743. streams := t.activeStreams
  744. t.activeStreams = nil
  745. if t.kpDormant {
  746. // If the keepalive goroutine is blocked on this condition variable, we
  747. // should unblock it so that the goroutine eventually exits.
  748. t.kpDormancyCond.Signal()
  749. }
  750. t.mu.Unlock()
  751. t.controlBuf.finish()
  752. t.cancel()
  753. err := t.conn.Close()
  754. if channelz.IsOn() {
  755. channelz.RemoveEntry(t.channelzID)
  756. }
  757. // Notify all active streams.
  758. for _, s := range streams {
  759. t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false)
  760. }
  761. if t.statsHandler != nil {
  762. connEnd := &stats.ConnEnd{
  763. Client: true,
  764. }
  765. t.statsHandler.HandleConn(t.ctx, connEnd)
  766. }
  767. return err
  768. }
  769. // GracefulClose sets the state to draining, which prevents new streams from
  770. // being created and causes the transport to be closed when the last active
  771. // stream is closed. If there are no active streams, the transport is closed
  772. // immediately. This does nothing if the transport is already draining or
  773. // closing.
  774. func (t *http2Client) GracefulClose() {
  775. t.mu.Lock()
  776. // Make sure we move to draining only from active.
  777. if t.state == draining || t.state == closing {
  778. t.mu.Unlock()
  779. return
  780. }
  781. t.state = draining
  782. active := len(t.activeStreams)
  783. t.mu.Unlock()
  784. if active == 0 {
  785. t.Close()
  786. return
  787. }
  788. t.controlBuf.put(&incomingGoAway{})
  789. }
  790. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  791. // should proceed only if Write returns nil.
  792. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  793. if opts.Last {
  794. // If it's the last message, update stream state.
  795. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  796. return errStreamDone
  797. }
  798. } else if s.getState() != streamActive {
  799. return errStreamDone
  800. }
  801. df := &dataFrame{
  802. streamID: s.id,
  803. endStream: opts.Last,
  804. }
  805. if hdr != nil || data != nil { // If it's not an empty data frame.
  806. // Add some data to grpc message header so that we can equally
  807. // distribute bytes across frames.
  808. emptyLen := http2MaxFrameLen - len(hdr)
  809. if emptyLen > len(data) {
  810. emptyLen = len(data)
  811. }
  812. hdr = append(hdr, data[:emptyLen]...)
  813. data = data[emptyLen:]
  814. df.h, df.d = hdr, data
  815. // TODO(mmukhi): The above logic in this if can be moved to loopyWriter's data handler.
  816. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  817. return err
  818. }
  819. }
  820. return t.controlBuf.put(df)
  821. }
  822. func (t *http2Client) getStream(f http2.Frame) *Stream {
  823. t.mu.Lock()
  824. s := t.activeStreams[f.Header().StreamID]
  825. t.mu.Unlock()
  826. return s
  827. }
  828. // adjustWindow sends out extra window update over the initial window size
  829. // of stream if the application is requesting data larger in size than
  830. // the window.
  831. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  832. if w := s.fc.maybeAdjust(n); w > 0 {
  833. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  834. }
  835. }
  836. // updateWindow adjusts the inbound quota for the stream.
  837. // Window updates will be sent out when the cumulative quota
  838. // exceeds the corresponding threshold.
  839. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  840. if w := s.fc.onRead(n); w > 0 {
  841. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  842. }
  843. }
  844. // updateFlowControl updates the incoming flow control windows
  845. // for the transport and the stream based on the current bdp
  846. // estimation.
  847. func (t *http2Client) updateFlowControl(n uint32) {
  848. t.mu.Lock()
  849. for _, s := range t.activeStreams {
  850. s.fc.newLimit(n)
  851. }
  852. t.mu.Unlock()
  853. updateIWS := func(interface{}) bool {
  854. t.initialWindowSize = int32(n)
  855. return true
  856. }
  857. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  858. t.controlBuf.put(&outgoingSettings{
  859. ss: []http2.Setting{
  860. {
  861. ID: http2.SettingInitialWindowSize,
  862. Val: n,
  863. },
  864. },
  865. })
  866. }
  867. func (t *http2Client) handleData(f *http2.DataFrame) {
  868. size := f.Header().Length
  869. var sendBDPPing bool
  870. if t.bdpEst != nil {
  871. sendBDPPing = t.bdpEst.add(size)
  872. }
  873. // Decouple connection's flow control from application's read.
  874. // An update on connection's flow control should not depend on
  875. // whether user application has read the data or not. Such a
  876. // restriction is already imposed on the stream's flow control,
  877. // and therefore the sender will be blocked anyways.
  878. // Decoupling the connection flow control will prevent other
  879. // active(fast) streams from starving in presence of slow or
  880. // inactive streams.
  881. //
  882. if w := t.fc.onData(size); w > 0 {
  883. t.controlBuf.put(&outgoingWindowUpdate{
  884. streamID: 0,
  885. increment: w,
  886. })
  887. }
  888. if sendBDPPing {
  889. // Avoid excessive ping detection (e.g. in an L7 proxy)
  890. // by sending a window update prior to the BDP ping.
  891. if w := t.fc.reset(); w > 0 {
  892. t.controlBuf.put(&outgoingWindowUpdate{
  893. streamID: 0,
  894. increment: w,
  895. })
  896. }
  897. t.controlBuf.put(bdpPing)
  898. }
  899. // Select the right stream to dispatch.
  900. s := t.getStream(f)
  901. if s == nil {
  902. return
  903. }
  904. if size > 0 {
  905. if err := s.fc.onData(size); err != nil {
  906. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  907. return
  908. }
  909. if f.Header().Flags.Has(http2.FlagDataPadded) {
  910. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  911. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  912. }
  913. }
  914. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  915. // guarantee f.Data() is consumed before the arrival of next frame.
  916. // Can this copy be eliminated?
  917. if len(f.Data()) > 0 {
  918. buffer := t.bufferPool.get()
  919. buffer.Reset()
  920. buffer.Write(f.Data())
  921. s.write(recvMsg{buffer: buffer})
  922. }
  923. }
  924. // The server has closed the stream without sending trailers. Record that
  925. // the read direction is closed, and set the status appropriately.
  926. if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
  927. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  928. }
  929. }
  930. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  931. s := t.getStream(f)
  932. if s == nil {
  933. return
  934. }
  935. if f.ErrCode == http2.ErrCodeRefusedStream {
  936. // The stream was unprocessed by the server.
  937. atomic.StoreUint32(&s.unprocessed, 1)
  938. }
  939. statusCode, ok := http2ErrConvTab[f.ErrCode]
  940. if !ok {
  941. warningf("transport: http2Client.handleRSTStream found no mapped gRPC status for the received http2 error %v", f.ErrCode)
  942. statusCode = codes.Unknown
  943. }
  944. if statusCode == codes.Canceled {
  945. if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
  946. // Our deadline was already exceeded, and that was likely the cause
  947. // of this cancelation. Alter the status code accordingly.
  948. statusCode = codes.DeadlineExceeded
  949. }
  950. }
  951. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  952. }
  953. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  954. if f.IsAck() {
  955. return
  956. }
  957. var maxStreams *uint32
  958. var ss []http2.Setting
  959. var updateFuncs []func()
  960. f.ForeachSetting(func(s http2.Setting) error {
  961. switch s.ID {
  962. case http2.SettingMaxConcurrentStreams:
  963. maxStreams = new(uint32)
  964. *maxStreams = s.Val
  965. case http2.SettingMaxHeaderListSize:
  966. updateFuncs = append(updateFuncs, func() {
  967. t.maxSendHeaderListSize = new(uint32)
  968. *t.maxSendHeaderListSize = s.Val
  969. })
  970. default:
  971. ss = append(ss, s)
  972. }
  973. return nil
  974. })
  975. if isFirst && maxStreams == nil {
  976. maxStreams = new(uint32)
  977. *maxStreams = math.MaxUint32
  978. }
  979. sf := &incomingSettings{
  980. ss: ss,
  981. }
  982. if maxStreams != nil {
  983. updateStreamQuota := func() {
  984. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  985. t.maxConcurrentStreams = *maxStreams
  986. t.streamQuota += delta
  987. if delta > 0 && t.waitingStreams > 0 {
  988. close(t.streamsQuotaAvailable) // wake all of them up.
  989. t.streamsQuotaAvailable = make(chan struct{}, 1)
  990. }
  991. }
  992. updateFuncs = append(updateFuncs, updateStreamQuota)
  993. }
  994. t.controlBuf.executeAndPut(func(interface{}) bool {
  995. for _, f := range updateFuncs {
  996. f()
  997. }
  998. return true
  999. }, sf)
  1000. }
  1001. func (t *http2Client) handlePing(f *http2.PingFrame) {
  1002. if f.IsAck() {
  1003. // Maybe it's a BDP ping.
  1004. if t.bdpEst != nil {
  1005. t.bdpEst.calculate(f.Data)
  1006. }
  1007. return
  1008. }
  1009. pingAck := &ping{ack: true}
  1010. copy(pingAck.data[:], f.Data[:])
  1011. t.controlBuf.put(pingAck)
  1012. }
  1013. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  1014. t.mu.Lock()
  1015. if t.state == closing {
  1016. t.mu.Unlock()
  1017. return
  1018. }
  1019. if f.ErrCode == http2.ErrCodeEnhanceYourCalm {
  1020. infof("Client received GoAway with http2.ErrCodeEnhanceYourCalm.")
  1021. }
  1022. id := f.LastStreamID
  1023. if id > 0 && id%2 != 1 {
  1024. t.mu.Unlock()
  1025. t.Close()
  1026. return
  1027. }
  1028. // A client can receive multiple GoAways from the server (see
  1029. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1030. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1031. // sent after an RTT delay with the ID of the last stream the server will
  1032. // process.
  1033. //
  1034. // Therefore, when we get the first GoAway we don't necessarily close any
  1035. // streams. While in case of second GoAway we close all streams created after
  1036. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1037. // server was being sent don't get killed.
  1038. select {
  1039. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1040. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1041. if id > t.prevGoAwayID {
  1042. t.mu.Unlock()
  1043. t.Close()
  1044. return
  1045. }
  1046. default:
  1047. t.setGoAwayReason(f)
  1048. close(t.goAway)
  1049. t.controlBuf.put(&incomingGoAway{})
  1050. // Notify the clientconn about the GOAWAY before we set the state to
  1051. // draining, to allow the client to stop attempting to create streams
  1052. // before disallowing new streams on this connection.
  1053. t.onGoAway(t.goAwayReason)
  1054. t.state = draining
  1055. }
  1056. // All streams with IDs greater than the GoAwayId
  1057. // and smaller than the previous GoAway ID should be killed.
  1058. upperLimit := t.prevGoAwayID
  1059. if upperLimit == 0 { // This is the first GoAway Frame.
  1060. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1061. }
  1062. for streamID, stream := range t.activeStreams {
  1063. if streamID > id && streamID <= upperLimit {
  1064. // The stream was unprocessed by the server.
  1065. atomic.StoreUint32(&stream.unprocessed, 1)
  1066. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1067. }
  1068. }
  1069. t.prevGoAwayID = id
  1070. active := len(t.activeStreams)
  1071. t.mu.Unlock()
  1072. if active == 0 {
  1073. t.Close()
  1074. }
  1075. }
  1076. // setGoAwayReason sets the value of t.goAwayReason based
  1077. // on the GoAway frame received.
  1078. // It expects a lock on transport's mutext to be held by
  1079. // the caller.
  1080. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1081. t.goAwayReason = GoAwayNoReason
  1082. switch f.ErrCode {
  1083. case http2.ErrCodeEnhanceYourCalm:
  1084. if string(f.DebugData()) == "too_many_pings" {
  1085. t.goAwayReason = GoAwayTooManyPings
  1086. }
  1087. }
  1088. }
  1089. func (t *http2Client) GetGoAwayReason() GoAwayReason {
  1090. t.mu.Lock()
  1091. defer t.mu.Unlock()
  1092. return t.goAwayReason
  1093. }
  1094. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1095. t.controlBuf.put(&incomingWindowUpdate{
  1096. streamID: f.Header().StreamID,
  1097. increment: f.Increment,
  1098. })
  1099. }
  1100. // operateHeaders takes action on the decoded headers.
  1101. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1102. s := t.getStream(frame)
  1103. if s == nil {
  1104. return
  1105. }
  1106. endStream := frame.StreamEnded()
  1107. atomic.StoreUint32(&s.bytesReceived, 1)
  1108. initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
  1109. if !initialHeader && !endStream {
  1110. // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
  1111. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
  1112. t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
  1113. return
  1114. }
  1115. state := &decodeState{}
  1116. // Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
  1117. state.data.isGRPC = !initialHeader
  1118. if err := state.decodeHeader(frame); err != nil {
  1119. t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
  1120. return
  1121. }
  1122. isHeader := false
  1123. defer func() {
  1124. if t.statsHandler != nil {
  1125. if isHeader {
  1126. inHeader := &stats.InHeader{
  1127. Client: true,
  1128. WireLength: int(frame.Header().Length),
  1129. Header: s.header.Copy(),
  1130. }
  1131. t.statsHandler.HandleRPC(s.ctx, inHeader)
  1132. } else {
  1133. inTrailer := &stats.InTrailer{
  1134. Client: true,
  1135. WireLength: int(frame.Header().Length),
  1136. Trailer: s.trailer.Copy(),
  1137. }
  1138. t.statsHandler.HandleRPC(s.ctx, inTrailer)
  1139. }
  1140. }
  1141. }()
  1142. // If headerChan hasn't been closed yet
  1143. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  1144. s.headerValid = true
  1145. if !endStream {
  1146. // HEADERS frame block carries a Response-Headers.
  1147. isHeader = true
  1148. // These values can be set without any synchronization because
  1149. // stream goroutine will read it only after seeing a closed
  1150. // headerChan which we'll close after setting this.
  1151. s.recvCompress = state.data.encoding
  1152. if len(state.data.mdata) > 0 {
  1153. s.header = state.data.mdata
  1154. }
  1155. } else {
  1156. // HEADERS frame block carries a Trailers-Only.
  1157. s.noHeaders = true
  1158. }
  1159. close(s.headerChan)
  1160. }
  1161. if !endStream {
  1162. return
  1163. }
  1164. // if client received END_STREAM from server while stream was still active, send RST_STREAM
  1165. rst := s.getState() == streamActive
  1166. t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
  1167. }
  1168. // reader runs as a separate goroutine in charge of reading data from network
  1169. // connection.
  1170. //
  1171. // TODO(zhaoq): currently one reader per transport. Investigate whether this is
  1172. // optimal.
  1173. // TODO(zhaoq): Check the validity of the incoming frame sequence.
  1174. func (t *http2Client) reader() {
  1175. defer close(t.readerDone)
  1176. // Check the validity of server preface.
  1177. frame, err := t.framer.fr.ReadFrame()
  1178. if err != nil {
  1179. t.Close() // this kicks off resetTransport, so must be last before return
  1180. return
  1181. }
  1182. t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
  1183. if t.keepaliveEnabled {
  1184. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1185. }
  1186. sf, ok := frame.(*http2.SettingsFrame)
  1187. if !ok {
  1188. t.Close() // this kicks off resetTransport, so must be last before return
  1189. return
  1190. }
  1191. t.onPrefaceReceipt()
  1192. t.handleSettings(sf, true)
  1193. // loop to keep reading incoming messages on this transport.
  1194. for {
  1195. t.controlBuf.throttle()
  1196. frame, err := t.framer.fr.ReadFrame()
  1197. if t.keepaliveEnabled {
  1198. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1199. }
  1200. if err != nil {
  1201. // Abort an active stream if the http2.Framer returns a
  1202. // http2.StreamError. This can happen only if the server's response
  1203. // is malformed http2.
  1204. if se, ok := err.(http2.StreamError); ok {
  1205. t.mu.Lock()
  1206. s := t.activeStreams[se.StreamID]
  1207. t.mu.Unlock()
  1208. if s != nil {
  1209. // use error detail to provide better err message
  1210. code := http2ErrConvTab[se.Code]
  1211. msg := t.framer.fr.ErrorDetail().Error()
  1212. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1213. }
  1214. continue
  1215. } else {
  1216. // Transport error.
  1217. t.Close()
  1218. return
  1219. }
  1220. }
  1221. switch frame := frame.(type) {
  1222. case *http2.MetaHeadersFrame:
  1223. t.operateHeaders(frame)
  1224. case *http2.DataFrame:
  1225. t.handleData(frame)
  1226. case *http2.RSTStreamFrame:
  1227. t.handleRSTStream(frame)
  1228. case *http2.SettingsFrame:
  1229. t.handleSettings(frame, false)
  1230. case *http2.PingFrame:
  1231. t.handlePing(frame)
  1232. case *http2.GoAwayFrame:
  1233. t.handleGoAway(frame)
  1234. case *http2.WindowUpdateFrame:
  1235. t.handleWindowUpdate(frame)
  1236. default:
  1237. errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1238. }
  1239. }
  1240. }
  1241. func minTime(a, b time.Duration) time.Duration {
  1242. if a < b {
  1243. return a
  1244. }
  1245. return b
  1246. }
  1247. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
  1248. func (t *http2Client) keepalive() {
  1249. p := &ping{data: [8]byte{}}
  1250. // True iff a ping has been sent, and no data has been received since then.
  1251. outstandingPing := false
  1252. // Amount of time remaining before which we should receive an ACK for the
  1253. // last sent ping.
  1254. timeoutLeft := time.Duration(0)
  1255. // Records the last value of t.lastRead before we go block on the timer.
  1256. // This is required to check for read activity since then.
  1257. prevNano := time.Now().UnixNano()
  1258. timer := time.NewTimer(t.kp.Time)
  1259. for {
  1260. select {
  1261. case <-timer.C:
  1262. lastRead := atomic.LoadInt64(&t.lastRead)
  1263. if lastRead > prevNano {
  1264. // There has been read activity since the last time we were here.
  1265. outstandingPing = false
  1266. // Next timer should fire at kp.Time seconds from lastRead time.
  1267. timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1268. prevNano = lastRead
  1269. continue
  1270. }
  1271. if outstandingPing && timeoutLeft <= 0 {
  1272. t.Close()
  1273. return
  1274. }
  1275. t.mu.Lock()
  1276. if t.state == closing {
  1277. // If the transport is closing, we should exit from the
  1278. // keepalive goroutine here. If not, we could have a race
  1279. // between the call to Signal() from Close() and the call to
  1280. // Wait() here, whereby the keepalive goroutine ends up
  1281. // blocking on the condition variable which will never be
  1282. // signalled again.
  1283. t.mu.Unlock()
  1284. return
  1285. }
  1286. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1287. // If a ping was sent out previously (because there were active
  1288. // streams at that point) which wasn't acked and its timeout
  1289. // hadn't fired, but we got here and are about to go dormant,
  1290. // we should make sure that we unconditionally send a ping once
  1291. // we awaken.
  1292. outstandingPing = false
  1293. t.kpDormant = true
  1294. t.kpDormancyCond.Wait()
  1295. }
  1296. t.kpDormant = false
  1297. t.mu.Unlock()
  1298. // We get here either because we were dormant and a new stream was
  1299. // created which unblocked the Wait() call, or because the
  1300. // keepalive timer expired. In both cases, we need to send a ping.
  1301. if !outstandingPing {
  1302. if channelz.IsOn() {
  1303. atomic.AddInt64(&t.czData.kpCount, 1)
  1304. }
  1305. t.controlBuf.put(p)
  1306. timeoutLeft = t.kp.Timeout
  1307. outstandingPing = true
  1308. }
  1309. // The amount of time to sleep here is the minimum of kp.Time and
  1310. // timeoutLeft. This will ensure that we wait only for kp.Time
  1311. // before sending out the next ping (for cases where the ping is
  1312. // acked).
  1313. sleepDuration := minTime(t.kp.Time, timeoutLeft)
  1314. timeoutLeft -= sleepDuration
  1315. timer.Reset(sleepDuration)
  1316. case <-t.ctx.Done():
  1317. if !timer.Stop() {
  1318. <-timer.C
  1319. }
  1320. return
  1321. }
  1322. }
  1323. }
  1324. func (t *http2Client) Error() <-chan struct{} {
  1325. return t.ctx.Done()
  1326. }
  1327. func (t *http2Client) GoAway() <-chan struct{} {
  1328. return t.goAway
  1329. }
  1330. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1331. s := channelz.SocketInternalMetric{
  1332. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1333. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1334. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1335. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1336. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1337. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1338. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1339. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1340. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1341. LocalFlowControlWindow: int64(t.fc.getSize()),
  1342. SocketOptions: channelz.GetSocketOption(t.conn),
  1343. LocalAddr: t.localAddr,
  1344. RemoteAddr: t.remoteAddr,
  1345. // RemoteName :
  1346. }
  1347. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1348. s.Security = au.GetSecurityValue()
  1349. }
  1350. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1351. return &s
  1352. }
  1353. func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
  1354. func (t *http2Client) IncrMsgSent() {
  1355. atomic.AddInt64(&t.czData.msgSent, 1)
  1356. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1357. }
  1358. func (t *http2Client) IncrMsgRecv() {
  1359. atomic.AddInt64(&t.czData.msgRecv, 1)
  1360. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1361. }
  1362. func (t *http2Client) getOutFlowWindow() int64 {
  1363. resp := make(chan uint32, 1)
  1364. timer := time.NewTimer(time.Second)
  1365. defer timer.Stop()
  1366. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1367. select {
  1368. case sz := <-resp:
  1369. return int64(sz)
  1370. case <-t.ctxDone:
  1371. return -1
  1372. case <-timer.C:
  1373. return -2
  1374. }
  1375. }