http2_client.go 57 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "context"
  21. "fmt"
  22. "io"
  23. "math"
  24. "net"
  25. "net/http"
  26. "path/filepath"
  27. "strconv"
  28. "strings"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "golang.org/x/net/http2"
  33. "golang.org/x/net/http2/hpack"
  34. "google.golang.org/grpc/codes"
  35. "google.golang.org/grpc/credentials"
  36. "google.golang.org/grpc/internal"
  37. "google.golang.org/grpc/internal/channelz"
  38. icredentials "google.golang.org/grpc/internal/credentials"
  39. "google.golang.org/grpc/internal/grpclog"
  40. "google.golang.org/grpc/internal/grpcsync"
  41. "google.golang.org/grpc/internal/grpcutil"
  42. imetadata "google.golang.org/grpc/internal/metadata"
  43. istatus "google.golang.org/grpc/internal/status"
  44. isyscall "google.golang.org/grpc/internal/syscall"
  45. "google.golang.org/grpc/internal/transport/networktype"
  46. "google.golang.org/grpc/keepalive"
  47. "google.golang.org/grpc/metadata"
  48. "google.golang.org/grpc/peer"
  49. "google.golang.org/grpc/resolver"
  50. "google.golang.org/grpc/stats"
  51. "google.golang.org/grpc/status"
  52. )
  53. // clientConnectionCounter counts the number of connections a client has
  54. // initiated (equal to the number of http2Clients created). Must be accessed
  55. // atomically.
  56. var clientConnectionCounter uint64
  57. // http2Client implements the ClientTransport interface with HTTP2.
  58. type http2Client struct {
  59. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  60. ctx context.Context
  61. cancel context.CancelFunc
  62. ctxDone <-chan struct{} // Cache the ctx.Done() chan.
  63. userAgent string
  64. // address contains the resolver returned address for this transport.
  65. // If the `ServerName` field is set, it takes precedence over `CallHdr.Host`
  66. // passed to `NewStream`, when determining the :authority header.
  67. address resolver.Address
  68. md metadata.MD
  69. conn net.Conn // underlying communication channel
  70. loopy *loopyWriter
  71. remoteAddr net.Addr
  72. localAddr net.Addr
  73. authInfo credentials.AuthInfo // auth info about the connection
  74. readerDone chan struct{} // sync point to enable testing.
  75. writerDone chan struct{} // sync point to enable testing.
  76. // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
  77. // that the server sent GoAway on this transport.
  78. goAway chan struct{}
  79. framer *framer
  80. // controlBuf delivers all the control related tasks (e.g., window
  81. // updates, reset streams, and various settings) to the controller.
  82. // Do not access controlBuf with mu held.
  83. controlBuf *controlBuffer
  84. fc *trInFlow
  85. // The scheme used: https if TLS is on, http otherwise.
  86. scheme string
  87. isSecure bool
  88. perRPCCreds []credentials.PerRPCCredentials
  89. kp keepalive.ClientParameters
  90. keepaliveEnabled bool
  91. statsHandlers []stats.Handler
  92. initialWindowSize int32
  93. // configured by peer through SETTINGS_MAX_HEADER_LIST_SIZE
  94. maxSendHeaderListSize *uint32
  95. bdpEst *bdpEstimator
  96. maxConcurrentStreams uint32
  97. streamQuota int64
  98. streamsQuotaAvailable chan struct{}
  99. waitingStreams uint32
  100. nextID uint32
  101. registeredCompressors string
  102. // Do not access controlBuf with mu held.
  103. mu sync.Mutex // guard the following variables
  104. state transportState
  105. activeStreams map[uint32]*Stream
  106. // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
  107. prevGoAwayID uint32
  108. // goAwayReason records the http2.ErrCode and debug data received with the
  109. // GoAway frame.
  110. goAwayReason GoAwayReason
  111. // goAwayDebugMessage contains a detailed human readable string about a
  112. // GoAway frame, useful for error messages.
  113. goAwayDebugMessage string
  114. // A condition variable used to signal when the keepalive goroutine should
  115. // go dormant. The condition for dormancy is based on the number of active
  116. // streams and the `PermitWithoutStream` keepalive client parameter. And
  117. // since the number of active streams is guarded by the above mutex, we use
  118. // the same for this condition variable as well.
  119. kpDormancyCond *sync.Cond
  120. // A boolean to track whether the keepalive goroutine is dormant or not.
  121. // This is checked before attempting to signal the above condition
  122. // variable.
  123. kpDormant bool
  124. // Fields below are for channelz metric collection.
  125. channelzID *channelz.Identifier
  126. czData *channelzData
  127. onClose func(GoAwayReason)
  128. bufferPool *bufferPool
  129. connectionID uint64
  130. logger *grpclog.PrefixLogger
  131. }
  132. func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
  133. address := addr.Addr
  134. networkType, ok := networktype.Get(addr)
  135. if fn != nil {
  136. // Special handling for unix scheme with custom dialer. Back in the day,
  137. // we did not have a unix resolver and therefore targets with a unix
  138. // scheme would end up using the passthrough resolver. So, user's used a
  139. // custom dialer in this case and expected the original dial target to
  140. // be passed to the custom dialer. Now, we have a unix resolver. But if
  141. // a custom dialer is specified, we want to retain the old behavior in
  142. // terms of the address being passed to the custom dialer.
  143. if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
  144. // Supported unix targets are either "unix://absolute-path" or
  145. // "unix:relative-path".
  146. if filepath.IsAbs(address) {
  147. return fn(ctx, "unix://"+address)
  148. }
  149. return fn(ctx, "unix:"+address)
  150. }
  151. return fn(ctx, address)
  152. }
  153. if !ok {
  154. networkType, address = parseDialTarget(address)
  155. }
  156. if networkType == "tcp" && useProxy {
  157. return proxyDial(ctx, address, grpcUA)
  158. }
  159. return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
  160. }
  161. func isTemporary(err error) bool {
  162. switch err := err.(type) {
  163. case interface {
  164. Temporary() bool
  165. }:
  166. return err.Temporary()
  167. case interface {
  168. Timeout() bool
  169. }:
  170. // Timeouts may be resolved upon retry, and are thus treated as
  171. // temporary.
  172. return err.Timeout()
  173. }
  174. return true
  175. }
  176. // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
  177. // and starts to receive messages on it. Non-nil error returns if construction
  178. // fails.
  179. func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
  180. scheme := "http"
  181. ctx, cancel := context.WithCancel(ctx)
  182. defer func() {
  183. if err != nil {
  184. cancel()
  185. }
  186. }()
  187. // gRPC, resolver, balancer etc. can specify arbitrary data in the
  188. // Attributes field of resolver.Address, which is shoved into connectCtx
  189. // and passed to the dialer and credential handshaker. This makes it possible for
  190. // address specific arbitrary data to reach custom dialers and credential handshakers.
  191. connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
  192. conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
  193. if err != nil {
  194. if opts.FailOnNonTempDialError {
  195. return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
  196. }
  197. return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
  198. }
  199. // Any further errors will close the underlying connection
  200. defer func(conn net.Conn) {
  201. if err != nil {
  202. conn.Close()
  203. }
  204. }(conn)
  205. // The following defer and goroutine monitor the connectCtx for cancelation
  206. // and deadline. On context expiration, the connection is hard closed and
  207. // this function will naturally fail as a result. Otherwise, the defer
  208. // waits for the goroutine to exit to prevent the context from being
  209. // monitored (and to prevent the connection from ever being closed) after
  210. // returning from this function.
  211. ctxMonitorDone := grpcsync.NewEvent()
  212. newClientCtx, newClientDone := context.WithCancel(connectCtx)
  213. defer func() {
  214. newClientDone() // Awaken the goroutine below if connectCtx hasn't expired.
  215. <-ctxMonitorDone.Done() // Wait for the goroutine below to exit.
  216. }()
  217. go func(conn net.Conn) {
  218. defer ctxMonitorDone.Fire() // Signal this goroutine has exited.
  219. <-newClientCtx.Done() // Block until connectCtx expires or the defer above executes.
  220. if err := connectCtx.Err(); err != nil {
  221. // connectCtx expired before exiting the function. Hard close the connection.
  222. if logger.V(logLevel) {
  223. logger.Infof("Aborting due to connect deadline expiring: %v", err)
  224. }
  225. conn.Close()
  226. }
  227. }(conn)
  228. kp := opts.KeepaliveParams
  229. // Validate keepalive parameters.
  230. if kp.Time == 0 {
  231. kp.Time = defaultClientKeepaliveTime
  232. }
  233. if kp.Timeout == 0 {
  234. kp.Timeout = defaultClientKeepaliveTimeout
  235. }
  236. keepaliveEnabled := false
  237. if kp.Time != infinity {
  238. if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
  239. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  240. }
  241. keepaliveEnabled = true
  242. }
  243. var (
  244. isSecure bool
  245. authInfo credentials.AuthInfo
  246. )
  247. transportCreds := opts.TransportCredentials
  248. perRPCCreds := opts.PerRPCCredentials
  249. if b := opts.CredsBundle; b != nil {
  250. if t := b.TransportCredentials(); t != nil {
  251. transportCreds = t
  252. }
  253. if t := b.PerRPCCredentials(); t != nil {
  254. perRPCCreds = append(perRPCCreds, t)
  255. }
  256. }
  257. if transportCreds != nil {
  258. conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
  259. if err != nil {
  260. return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
  261. }
  262. for _, cd := range perRPCCreds {
  263. if cd.RequireTransportSecurity() {
  264. if ci, ok := authInfo.(interface {
  265. GetCommonAuthInfo() credentials.CommonAuthInfo
  266. }); ok {
  267. secLevel := ci.GetCommonAuthInfo().SecurityLevel
  268. if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
  269. return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
  270. }
  271. }
  272. }
  273. }
  274. isSecure = true
  275. if transportCreds.Info().SecurityProtocol == "tls" {
  276. scheme = "https"
  277. }
  278. }
  279. dynamicWindow := true
  280. icwz := int32(initialWindowSize)
  281. if opts.InitialConnWindowSize >= defaultWindowSize {
  282. icwz = opts.InitialConnWindowSize
  283. dynamicWindow = false
  284. }
  285. writeBufSize := opts.WriteBufferSize
  286. readBufSize := opts.ReadBufferSize
  287. maxHeaderListSize := defaultClientMaxHeaderListSize
  288. if opts.MaxHeaderListSize != nil {
  289. maxHeaderListSize = *opts.MaxHeaderListSize
  290. }
  291. t := &http2Client{
  292. ctx: ctx,
  293. ctxDone: ctx.Done(), // Cache Done chan.
  294. cancel: cancel,
  295. userAgent: opts.UserAgent,
  296. registeredCompressors: grpcutil.RegisteredCompressors(),
  297. address: addr,
  298. conn: conn,
  299. remoteAddr: conn.RemoteAddr(),
  300. localAddr: conn.LocalAddr(),
  301. authInfo: authInfo,
  302. readerDone: make(chan struct{}),
  303. writerDone: make(chan struct{}),
  304. goAway: make(chan struct{}),
  305. framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
  306. fc: &trInFlow{limit: uint32(icwz)},
  307. scheme: scheme,
  308. activeStreams: make(map[uint32]*Stream),
  309. isSecure: isSecure,
  310. perRPCCreds: perRPCCreds,
  311. kp: kp,
  312. statsHandlers: opts.StatsHandlers,
  313. initialWindowSize: initialWindowSize,
  314. nextID: 1,
  315. maxConcurrentStreams: defaultMaxStreamsClient,
  316. streamQuota: defaultMaxStreamsClient,
  317. streamsQuotaAvailable: make(chan struct{}, 1),
  318. czData: new(channelzData),
  319. keepaliveEnabled: keepaliveEnabled,
  320. bufferPool: newBufferPool(),
  321. onClose: onClose,
  322. }
  323. t.logger = prefixLoggerForClientTransport(t)
  324. // Add peer information to the http2client context.
  325. t.ctx = peer.NewContext(t.ctx, t.getPeer())
  326. if md, ok := addr.Metadata.(*metadata.MD); ok {
  327. t.md = *md
  328. } else if md := imetadata.Get(addr); md != nil {
  329. t.md = md
  330. }
  331. t.controlBuf = newControlBuffer(t.ctxDone)
  332. if opts.InitialWindowSize >= defaultWindowSize {
  333. t.initialWindowSize = opts.InitialWindowSize
  334. dynamicWindow = false
  335. }
  336. if dynamicWindow {
  337. t.bdpEst = &bdpEstimator{
  338. bdp: initialWindowSize,
  339. updateFlowControl: t.updateFlowControl,
  340. }
  341. }
  342. for _, sh := range t.statsHandlers {
  343. t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
  344. RemoteAddr: t.remoteAddr,
  345. LocalAddr: t.localAddr,
  346. })
  347. connBegin := &stats.ConnBegin{
  348. Client: true,
  349. }
  350. sh.HandleConn(t.ctx, connBegin)
  351. }
  352. t.channelzID, err = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, fmt.Sprintf("%s -> %s", t.localAddr, t.remoteAddr))
  353. if err != nil {
  354. return nil, err
  355. }
  356. if t.keepaliveEnabled {
  357. t.kpDormancyCond = sync.NewCond(&t.mu)
  358. go t.keepalive()
  359. }
  360. // Start the reader goroutine for incoming messages. Each transport has a
  361. // dedicated goroutine which reads HTTP2 frames from the network. Then it
  362. // dispatches the frame to the corresponding stream entity. When the
  363. // server preface is received, readerErrCh is closed. If an error occurs
  364. // first, an error is pushed to the channel. This must be checked before
  365. // returning from this function.
  366. readerErrCh := make(chan error, 1)
  367. go t.reader(readerErrCh)
  368. defer func() {
  369. if err == nil {
  370. err = <-readerErrCh
  371. }
  372. if err != nil {
  373. t.Close(err)
  374. }
  375. }()
  376. // Send connection preface to server.
  377. n, err := t.conn.Write(clientPreface)
  378. if err != nil {
  379. err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
  380. return nil, err
  381. }
  382. if n != len(clientPreface) {
  383. err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
  384. return nil, err
  385. }
  386. var ss []http2.Setting
  387. if t.initialWindowSize != defaultWindowSize {
  388. ss = append(ss, http2.Setting{
  389. ID: http2.SettingInitialWindowSize,
  390. Val: uint32(t.initialWindowSize),
  391. })
  392. }
  393. if opts.MaxHeaderListSize != nil {
  394. ss = append(ss, http2.Setting{
  395. ID: http2.SettingMaxHeaderListSize,
  396. Val: *opts.MaxHeaderListSize,
  397. })
  398. }
  399. err = t.framer.fr.WriteSettings(ss...)
  400. if err != nil {
  401. err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
  402. return nil, err
  403. }
  404. // Adjust the connection flow control window if needed.
  405. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  406. if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
  407. err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
  408. return nil, err
  409. }
  410. }
  411. t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
  412. if err := t.framer.writer.Flush(); err != nil {
  413. return nil, err
  414. }
  415. go func() {
  416. t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
  417. t.loopy.run()
  418. close(t.writerDone)
  419. }()
  420. return t, nil
  421. }
  422. func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
  423. // TODO(zhaoq): Handle uint32 overflow of Stream.id.
  424. s := &Stream{
  425. ct: t,
  426. done: make(chan struct{}),
  427. method: callHdr.Method,
  428. sendCompress: callHdr.SendCompress,
  429. buf: newRecvBuffer(),
  430. headerChan: make(chan struct{}),
  431. contentSubtype: callHdr.ContentSubtype,
  432. doneFunc: callHdr.DoneFunc,
  433. }
  434. s.wq = newWriteQuota(defaultWriteQuota, s.done)
  435. s.requestRead = func(n int) {
  436. t.adjustWindow(s, uint32(n))
  437. }
  438. // The client side stream context should have exactly the same life cycle with the user provided context.
  439. // That means, s.ctx should be read-only. And s.ctx is done iff ctx is done.
  440. // So we use the original context here instead of creating a copy.
  441. s.ctx = ctx
  442. s.trReader = &transportReader{
  443. reader: &recvBufferReader{
  444. ctx: s.ctx,
  445. ctxDone: s.ctx.Done(),
  446. recv: s.buf,
  447. closeStream: func(err error) {
  448. t.CloseStream(s, err)
  449. },
  450. freeBuffer: t.bufferPool.put,
  451. },
  452. windowHandler: func(n int) {
  453. t.updateWindow(s, uint32(n))
  454. },
  455. }
  456. return s
  457. }
  458. func (t *http2Client) getPeer() *peer.Peer {
  459. return &peer.Peer{
  460. Addr: t.remoteAddr,
  461. AuthInfo: t.authInfo, // Can be nil
  462. LocalAddr: t.localAddr,
  463. }
  464. }
  465. func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
  466. aud := t.createAudience(callHdr)
  467. ri := credentials.RequestInfo{
  468. Method: callHdr.Method,
  469. AuthInfo: t.authInfo,
  470. }
  471. ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
  472. authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
  473. if err != nil {
  474. return nil, err
  475. }
  476. callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
  477. if err != nil {
  478. return nil, err
  479. }
  480. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  481. // first and create a slice of that exact size.
  482. // Make the slice of certain predictable size to reduce allocations made by append.
  483. hfLen := 7 // :method, :scheme, :path, :authority, content-type, user-agent, te
  484. hfLen += len(authData) + len(callAuthData)
  485. headerFields := make([]hpack.HeaderField, 0, hfLen)
  486. headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
  487. headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
  488. headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
  489. headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
  490. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
  491. headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
  492. headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
  493. if callHdr.PreviousAttempts > 0 {
  494. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
  495. }
  496. registeredCompressors := t.registeredCompressors
  497. if callHdr.SendCompress != "" {
  498. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
  499. // Include the outgoing compressor name when compressor is not registered
  500. // via encoding.RegisterCompressor. This is possible when client uses
  501. // WithCompressor dial option.
  502. if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
  503. if registeredCompressors != "" {
  504. registeredCompressors += ","
  505. }
  506. registeredCompressors += callHdr.SendCompress
  507. }
  508. }
  509. if registeredCompressors != "" {
  510. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
  511. }
  512. if dl, ok := ctx.Deadline(); ok {
  513. // Send out timeout regardless its value. The server can detect timeout context by itself.
  514. // TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.
  515. timeout := time.Until(dl)
  516. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
  517. }
  518. for k, v := range authData {
  519. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  520. }
  521. for k, v := range callAuthData {
  522. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  523. }
  524. if b := stats.OutgoingTags(ctx); b != nil {
  525. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
  526. }
  527. if b := stats.OutgoingTrace(ctx); b != nil {
  528. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
  529. }
  530. if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {
  531. var k string
  532. for k, vv := range md {
  533. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  534. if isReservedHeader(k) {
  535. continue
  536. }
  537. for _, v := range vv {
  538. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  539. }
  540. }
  541. for _, vv := range added {
  542. for i, v := range vv {
  543. if i%2 == 0 {
  544. k = strings.ToLower(v)
  545. continue
  546. }
  547. // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
  548. if isReservedHeader(k) {
  549. continue
  550. }
  551. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  552. }
  553. }
  554. }
  555. for k, vv := range t.md {
  556. if isReservedHeader(k) {
  557. continue
  558. }
  559. for _, v := range vv {
  560. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  561. }
  562. }
  563. return headerFields, nil
  564. }
  565. func (t *http2Client) createAudience(callHdr *CallHdr) string {
  566. // Create an audience string only if needed.
  567. if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
  568. return ""
  569. }
  570. // Construct URI required to get auth request metadata.
  571. // Omit port if it is the default one.
  572. host := strings.TrimSuffix(callHdr.Host, ":443")
  573. pos := strings.LastIndex(callHdr.Method, "/")
  574. if pos == -1 {
  575. pos = len(callHdr.Method)
  576. }
  577. return "https://" + host + callHdr.Method[:pos]
  578. }
  579. func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
  580. if len(t.perRPCCreds) == 0 {
  581. return nil, nil
  582. }
  583. authData := map[string]string{}
  584. for _, c := range t.perRPCCreds {
  585. data, err := c.GetRequestMetadata(ctx, audience)
  586. if err != nil {
  587. if st, ok := status.FromError(err); ok {
  588. // Restrict the code to the list allowed by gRFC A54.
  589. if istatus.IsRestrictedControlPlaneCode(st) {
  590. err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
  591. }
  592. return nil, err
  593. }
  594. return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
  595. }
  596. for k, v := range data {
  597. // Capital header names are illegal in HTTP/2.
  598. k = strings.ToLower(k)
  599. authData[k] = v
  600. }
  601. }
  602. return authData, nil
  603. }
  604. func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
  605. var callAuthData map[string]string
  606. // Check if credentials.PerRPCCredentials were provided via call options.
  607. // Note: if these credentials are provided both via dial options and call
  608. // options, then both sets of credentials will be applied.
  609. if callCreds := callHdr.Creds; callCreds != nil {
  610. if callCreds.RequireTransportSecurity() {
  611. ri, _ := credentials.RequestInfoFromContext(ctx)
  612. if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
  613. return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
  614. }
  615. }
  616. data, err := callCreds.GetRequestMetadata(ctx, audience)
  617. if err != nil {
  618. if st, ok := status.FromError(err); ok {
  619. // Restrict the code to the list allowed by gRFC A54.
  620. if istatus.IsRestrictedControlPlaneCode(st) {
  621. err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
  622. }
  623. return nil, err
  624. }
  625. return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
  626. }
  627. callAuthData = make(map[string]string, len(data))
  628. for k, v := range data {
  629. // Capital header names are illegal in HTTP/2
  630. k = strings.ToLower(k)
  631. callAuthData[k] = v
  632. }
  633. }
  634. return callAuthData, nil
  635. }
  636. // NewStreamError wraps an error and reports additional information. Typically
  637. // NewStream errors result in transparent retry, as they mean nothing went onto
  638. // the wire. However, there are two notable exceptions:
  639. //
  640. // 1. If the stream headers violate the max header list size allowed by the
  641. // server. It's possible this could succeed on another transport, even if
  642. // it's unlikely, but do not transparently retry.
  643. // 2. If the credentials errored when requesting their headers. In this case,
  644. // it's possible a retry can fix the problem, but indefinitely transparently
  645. // retrying is not appropriate as it is likely the credentials, if they can
  646. // eventually succeed, would need I/O to do so.
  647. type NewStreamError struct {
  648. Err error
  649. AllowTransparentRetry bool
  650. }
  651. func (e NewStreamError) Error() string {
  652. return e.Err.Error()
  653. }
  654. // NewStream creates a stream and registers it into the transport as "active"
  655. // streams. All non-nil errors returned will be *NewStreamError.
  656. func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
  657. ctx = peer.NewContext(ctx, t.getPeer())
  658. // ServerName field of the resolver returned address takes precedence over
  659. // Host field of CallHdr to determine the :authority header. This is because,
  660. // the ServerName field takes precedence for server authentication during
  661. // TLS handshake, and the :authority header should match the value used
  662. // for server authentication.
  663. if t.address.ServerName != "" {
  664. newCallHdr := *callHdr
  665. newCallHdr.Host = t.address.ServerName
  666. callHdr = &newCallHdr
  667. }
  668. headerFields, err := t.createHeaderFields(ctx, callHdr)
  669. if err != nil {
  670. return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
  671. }
  672. s := t.newStream(ctx, callHdr)
  673. cleanup := func(err error) {
  674. if s.swapState(streamDone) == streamDone {
  675. // If it was already done, return.
  676. return
  677. }
  678. // The stream was unprocessed by the server.
  679. atomic.StoreUint32(&s.unprocessed, 1)
  680. s.write(recvMsg{err: err})
  681. close(s.done)
  682. // If headerChan isn't closed, then close it.
  683. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  684. close(s.headerChan)
  685. }
  686. }
  687. hdr := &headerFrame{
  688. hf: headerFields,
  689. endStream: false,
  690. initStream: func(id uint32) error {
  691. t.mu.Lock()
  692. // TODO: handle transport closure in loopy instead and remove this
  693. // initStream is never called when transport is draining.
  694. if t.state == closing {
  695. t.mu.Unlock()
  696. cleanup(ErrConnClosing)
  697. return ErrConnClosing
  698. }
  699. if channelz.IsOn() {
  700. atomic.AddInt64(&t.czData.streamsStarted, 1)
  701. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  702. }
  703. // If the keepalive goroutine has gone dormant, wake it up.
  704. if t.kpDormant {
  705. t.kpDormancyCond.Signal()
  706. }
  707. t.mu.Unlock()
  708. return nil
  709. },
  710. onOrphaned: cleanup,
  711. wq: s.wq,
  712. }
  713. firstTry := true
  714. var ch chan struct{}
  715. transportDrainRequired := false
  716. checkForStreamQuota := func(it any) bool {
  717. if t.streamQuota <= 0 { // Can go negative if server decreases it.
  718. if firstTry {
  719. t.waitingStreams++
  720. }
  721. ch = t.streamsQuotaAvailable
  722. return false
  723. }
  724. if !firstTry {
  725. t.waitingStreams--
  726. }
  727. t.streamQuota--
  728. h := it.(*headerFrame)
  729. h.streamID = t.nextID
  730. t.nextID += 2
  731. // Drain client transport if nextID > MaxStreamID which signals gRPC that
  732. // the connection is closed and a new one must be created for subsequent RPCs.
  733. transportDrainRequired = t.nextID > MaxStreamID
  734. s.id = h.streamID
  735. s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
  736. t.mu.Lock()
  737. if t.state == draining || t.activeStreams == nil { // Can be niled from Close().
  738. t.mu.Unlock()
  739. return false // Don't create a stream if the transport is already closed.
  740. }
  741. t.activeStreams[s.id] = s
  742. t.mu.Unlock()
  743. if t.streamQuota > 0 && t.waitingStreams > 0 {
  744. select {
  745. case t.streamsQuotaAvailable <- struct{}{}:
  746. default:
  747. }
  748. }
  749. return true
  750. }
  751. var hdrListSizeErr error
  752. checkForHeaderListSize := func(it any) bool {
  753. if t.maxSendHeaderListSize == nil {
  754. return true
  755. }
  756. hdrFrame := it.(*headerFrame)
  757. var sz int64
  758. for _, f := range hdrFrame.hf {
  759. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  760. hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
  761. return false
  762. }
  763. }
  764. return true
  765. }
  766. for {
  767. success, err := t.controlBuf.executeAndPut(func(it any) bool {
  768. return checkForHeaderListSize(it) && checkForStreamQuota(it)
  769. }, hdr)
  770. if err != nil {
  771. // Connection closed.
  772. return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
  773. }
  774. if success {
  775. break
  776. }
  777. if hdrListSizeErr != nil {
  778. return nil, &NewStreamError{Err: hdrListSizeErr}
  779. }
  780. firstTry = false
  781. select {
  782. case <-ch:
  783. case <-ctx.Done():
  784. return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
  785. case <-t.goAway:
  786. return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
  787. case <-t.ctx.Done():
  788. return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
  789. }
  790. }
  791. if len(t.statsHandlers) != 0 {
  792. header, ok := metadata.FromOutgoingContext(ctx)
  793. if ok {
  794. header.Set("user-agent", t.userAgent)
  795. } else {
  796. header = metadata.Pairs("user-agent", t.userAgent)
  797. }
  798. for _, sh := range t.statsHandlers {
  799. // Note: The header fields are compressed with hpack after this call returns.
  800. // No WireLength field is set here.
  801. // Note: Creating a new stats object to prevent pollution.
  802. outHeader := &stats.OutHeader{
  803. Client: true,
  804. FullMethod: callHdr.Method,
  805. RemoteAddr: t.remoteAddr,
  806. LocalAddr: t.localAddr,
  807. Compression: callHdr.SendCompress,
  808. Header: header,
  809. }
  810. sh.HandleRPC(s.ctx, outHeader)
  811. }
  812. }
  813. if transportDrainRequired {
  814. if t.logger.V(logLevel) {
  815. t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
  816. }
  817. t.GracefulClose()
  818. }
  819. return s, nil
  820. }
  821. // CloseStream clears the footprint of a stream when the stream is not needed any more.
  822. // This must not be executed in reader's goroutine.
  823. func (t *http2Client) CloseStream(s *Stream, err error) {
  824. var (
  825. rst bool
  826. rstCode http2.ErrCode
  827. )
  828. if err != nil {
  829. rst = true
  830. rstCode = http2.ErrCodeCancel
  831. }
  832. t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
  833. }
  834. func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
  835. // Set stream status to done.
  836. if s.swapState(streamDone) == streamDone {
  837. // If it was already done, return. If multiple closeStream calls
  838. // happen simultaneously, wait for the first to finish.
  839. <-s.done
  840. return
  841. }
  842. // status and trailers can be updated here without any synchronization because the stream goroutine will
  843. // only read it after it sees an io.EOF error from read or write and we'll write those errors
  844. // only after updating this.
  845. s.status = st
  846. if len(mdata) > 0 {
  847. s.trailer = mdata
  848. }
  849. if err != nil {
  850. // This will unblock reads eventually.
  851. s.write(recvMsg{err: err})
  852. }
  853. // If headerChan isn't closed, then close it.
  854. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  855. s.noHeaders = true
  856. close(s.headerChan)
  857. }
  858. cleanup := &cleanupStream{
  859. streamID: s.id,
  860. onWrite: func() {
  861. t.mu.Lock()
  862. if t.activeStreams != nil {
  863. delete(t.activeStreams, s.id)
  864. }
  865. t.mu.Unlock()
  866. if channelz.IsOn() {
  867. if eosReceived {
  868. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  869. } else {
  870. atomic.AddInt64(&t.czData.streamsFailed, 1)
  871. }
  872. }
  873. },
  874. rst: rst,
  875. rstCode: rstCode,
  876. }
  877. addBackStreamQuota := func(any) bool {
  878. t.streamQuota++
  879. if t.streamQuota > 0 && t.waitingStreams > 0 {
  880. select {
  881. case t.streamsQuotaAvailable <- struct{}{}:
  882. default:
  883. }
  884. }
  885. return true
  886. }
  887. t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
  888. // This will unblock write.
  889. close(s.done)
  890. if s.doneFunc != nil {
  891. s.doneFunc()
  892. }
  893. }
  894. // Close kicks off the shutdown process of the transport. This should be called
  895. // only once on a transport. Once it is called, the transport should not be
  896. // accessed any more.
  897. func (t *http2Client) Close(err error) {
  898. t.mu.Lock()
  899. // Make sure we only close once.
  900. if t.state == closing {
  901. t.mu.Unlock()
  902. return
  903. }
  904. if t.logger.V(logLevel) {
  905. t.logger.Infof("Closing: %v", err)
  906. }
  907. // Call t.onClose ASAP to prevent the client from attempting to create new
  908. // streams.
  909. if t.state != draining {
  910. t.onClose(GoAwayInvalid)
  911. }
  912. t.state = closing
  913. streams := t.activeStreams
  914. t.activeStreams = nil
  915. if t.kpDormant {
  916. // If the keepalive goroutine is blocked on this condition variable, we
  917. // should unblock it so that the goroutine eventually exits.
  918. t.kpDormancyCond.Signal()
  919. }
  920. t.mu.Unlock()
  921. t.controlBuf.finish()
  922. t.cancel()
  923. t.conn.Close()
  924. channelz.RemoveEntry(t.channelzID)
  925. // Append info about previous goaways if there were any, since this may be important
  926. // for understanding the root cause for this connection to be closed.
  927. _, goAwayDebugMessage := t.GetGoAwayReason()
  928. var st *status.Status
  929. if len(goAwayDebugMessage) > 0 {
  930. st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
  931. err = st.Err()
  932. } else {
  933. st = status.New(codes.Unavailable, err.Error())
  934. }
  935. // Notify all active streams.
  936. for _, s := range streams {
  937. t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
  938. }
  939. for _, sh := range t.statsHandlers {
  940. connEnd := &stats.ConnEnd{
  941. Client: true,
  942. }
  943. sh.HandleConn(t.ctx, connEnd)
  944. }
  945. }
  946. // GracefulClose sets the state to draining, which prevents new streams from
  947. // being created and causes the transport to be closed when the last active
  948. // stream is closed. If there are no active streams, the transport is closed
  949. // immediately. This does nothing if the transport is already draining or
  950. // closing.
  951. func (t *http2Client) GracefulClose() {
  952. t.mu.Lock()
  953. // Make sure we move to draining only from active.
  954. if t.state == draining || t.state == closing {
  955. t.mu.Unlock()
  956. return
  957. }
  958. if t.logger.V(logLevel) {
  959. t.logger.Infof("GracefulClose called")
  960. }
  961. t.onClose(GoAwayInvalid)
  962. t.state = draining
  963. active := len(t.activeStreams)
  964. t.mu.Unlock()
  965. if active == 0 {
  966. t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
  967. return
  968. }
  969. t.controlBuf.put(&incomingGoAway{})
  970. }
  971. // Write formats the data into HTTP2 data frame(s) and sends it out. The caller
  972. // should proceed only if Write returns nil.
  973. func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  974. if opts.Last {
  975. // If it's the last message, update stream state.
  976. if !s.compareAndSwapState(streamActive, streamWriteDone) {
  977. return errStreamDone
  978. }
  979. } else if s.getState() != streamActive {
  980. return errStreamDone
  981. }
  982. df := &dataFrame{
  983. streamID: s.id,
  984. endStream: opts.Last,
  985. h: hdr,
  986. d: data,
  987. }
  988. if hdr != nil || data != nil { // If it's not an empty data frame, check quota.
  989. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  990. return err
  991. }
  992. }
  993. return t.controlBuf.put(df)
  994. }
  995. func (t *http2Client) getStream(f http2.Frame) *Stream {
  996. t.mu.Lock()
  997. s := t.activeStreams[f.Header().StreamID]
  998. t.mu.Unlock()
  999. return s
  1000. }
  1001. // adjustWindow sends out extra window update over the initial window size
  1002. // of stream if the application is requesting data larger in size than
  1003. // the window.
  1004. func (t *http2Client) adjustWindow(s *Stream, n uint32) {
  1005. if w := s.fc.maybeAdjust(n); w > 0 {
  1006. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  1007. }
  1008. }
  1009. // updateWindow adjusts the inbound quota for the stream.
  1010. // Window updates will be sent out when the cumulative quota
  1011. // exceeds the corresponding threshold.
  1012. func (t *http2Client) updateWindow(s *Stream, n uint32) {
  1013. if w := s.fc.onRead(n); w > 0 {
  1014. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  1015. }
  1016. }
  1017. // updateFlowControl updates the incoming flow control windows
  1018. // for the transport and the stream based on the current bdp
  1019. // estimation.
  1020. func (t *http2Client) updateFlowControl(n uint32) {
  1021. updateIWS := func(any) bool {
  1022. t.initialWindowSize = int32(n)
  1023. t.mu.Lock()
  1024. for _, s := range t.activeStreams {
  1025. s.fc.newLimit(n)
  1026. }
  1027. t.mu.Unlock()
  1028. return true
  1029. }
  1030. t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
  1031. t.controlBuf.put(&outgoingSettings{
  1032. ss: []http2.Setting{
  1033. {
  1034. ID: http2.SettingInitialWindowSize,
  1035. Val: n,
  1036. },
  1037. },
  1038. })
  1039. }
  1040. func (t *http2Client) handleData(f *http2.DataFrame) {
  1041. size := f.Header().Length
  1042. var sendBDPPing bool
  1043. if t.bdpEst != nil {
  1044. sendBDPPing = t.bdpEst.add(size)
  1045. }
  1046. // Decouple connection's flow control from application's read.
  1047. // An update on connection's flow control should not depend on
  1048. // whether user application has read the data or not. Such a
  1049. // restriction is already imposed on the stream's flow control,
  1050. // and therefore the sender will be blocked anyways.
  1051. // Decoupling the connection flow control will prevent other
  1052. // active(fast) streams from starving in presence of slow or
  1053. // inactive streams.
  1054. //
  1055. if w := t.fc.onData(size); w > 0 {
  1056. t.controlBuf.put(&outgoingWindowUpdate{
  1057. streamID: 0,
  1058. increment: w,
  1059. })
  1060. }
  1061. if sendBDPPing {
  1062. // Avoid excessive ping detection (e.g. in an L7 proxy)
  1063. // by sending a window update prior to the BDP ping.
  1064. if w := t.fc.reset(); w > 0 {
  1065. t.controlBuf.put(&outgoingWindowUpdate{
  1066. streamID: 0,
  1067. increment: w,
  1068. })
  1069. }
  1070. t.controlBuf.put(bdpPing)
  1071. }
  1072. // Select the right stream to dispatch.
  1073. s := t.getStream(f)
  1074. if s == nil {
  1075. return
  1076. }
  1077. if size > 0 {
  1078. if err := s.fc.onData(size); err != nil {
  1079. t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
  1080. return
  1081. }
  1082. if f.Header().Flags.Has(http2.FlagDataPadded) {
  1083. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  1084. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  1085. }
  1086. }
  1087. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  1088. // guarantee f.Data() is consumed before the arrival of next frame.
  1089. // Can this copy be eliminated?
  1090. if len(f.Data()) > 0 {
  1091. buffer := t.bufferPool.get()
  1092. buffer.Reset()
  1093. buffer.Write(f.Data())
  1094. s.write(recvMsg{buffer: buffer})
  1095. }
  1096. }
  1097. // The server has closed the stream without sending trailers. Record that
  1098. // the read direction is closed, and set the status appropriately.
  1099. if f.StreamEnded() {
  1100. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
  1101. }
  1102. }
  1103. func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
  1104. s := t.getStream(f)
  1105. if s == nil {
  1106. return
  1107. }
  1108. if f.ErrCode == http2.ErrCodeRefusedStream {
  1109. // The stream was unprocessed by the server.
  1110. atomic.StoreUint32(&s.unprocessed, 1)
  1111. }
  1112. statusCode, ok := http2ErrConvTab[f.ErrCode]
  1113. if !ok {
  1114. if t.logger.V(logLevel) {
  1115. t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
  1116. }
  1117. statusCode = codes.Unknown
  1118. }
  1119. if statusCode == codes.Canceled {
  1120. if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
  1121. // Our deadline was already exceeded, and that was likely the cause
  1122. // of this cancelation. Alter the status code accordingly.
  1123. statusCode = codes.DeadlineExceeded
  1124. }
  1125. }
  1126. t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
  1127. }
  1128. func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
  1129. if f.IsAck() {
  1130. return
  1131. }
  1132. var maxStreams *uint32
  1133. var ss []http2.Setting
  1134. var updateFuncs []func()
  1135. f.ForeachSetting(func(s http2.Setting) error {
  1136. switch s.ID {
  1137. case http2.SettingMaxConcurrentStreams:
  1138. maxStreams = new(uint32)
  1139. *maxStreams = s.Val
  1140. case http2.SettingMaxHeaderListSize:
  1141. updateFuncs = append(updateFuncs, func() {
  1142. t.maxSendHeaderListSize = new(uint32)
  1143. *t.maxSendHeaderListSize = s.Val
  1144. })
  1145. default:
  1146. ss = append(ss, s)
  1147. }
  1148. return nil
  1149. })
  1150. if isFirst && maxStreams == nil {
  1151. maxStreams = new(uint32)
  1152. *maxStreams = math.MaxUint32
  1153. }
  1154. sf := &incomingSettings{
  1155. ss: ss,
  1156. }
  1157. if maxStreams != nil {
  1158. updateStreamQuota := func() {
  1159. delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
  1160. t.maxConcurrentStreams = *maxStreams
  1161. t.streamQuota += delta
  1162. if delta > 0 && t.waitingStreams > 0 {
  1163. close(t.streamsQuotaAvailable) // wake all of them up.
  1164. t.streamsQuotaAvailable = make(chan struct{}, 1)
  1165. }
  1166. }
  1167. updateFuncs = append(updateFuncs, updateStreamQuota)
  1168. }
  1169. t.controlBuf.executeAndPut(func(any) bool {
  1170. for _, f := range updateFuncs {
  1171. f()
  1172. }
  1173. return true
  1174. }, sf)
  1175. }
  1176. func (t *http2Client) handlePing(f *http2.PingFrame) {
  1177. if f.IsAck() {
  1178. // Maybe it's a BDP ping.
  1179. if t.bdpEst != nil {
  1180. t.bdpEst.calculate(f.Data)
  1181. }
  1182. return
  1183. }
  1184. pingAck := &ping{ack: true}
  1185. copy(pingAck.data[:], f.Data[:])
  1186. t.controlBuf.put(pingAck)
  1187. }
  1188. func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
  1189. t.mu.Lock()
  1190. if t.state == closing {
  1191. t.mu.Unlock()
  1192. return
  1193. }
  1194. if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
  1195. // When a client receives a GOAWAY with error code ENHANCE_YOUR_CALM and debug
  1196. // data equal to ASCII "too_many_pings", it should log the occurrence at a log level that is
  1197. // enabled by default and double the configure KEEPALIVE_TIME used for new connections
  1198. // on that channel.
  1199. logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
  1200. }
  1201. id := f.LastStreamID
  1202. if id > 0 && id%2 == 0 {
  1203. t.mu.Unlock()
  1204. t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
  1205. return
  1206. }
  1207. // A client can receive multiple GoAways from the server (see
  1208. // https://github.com/grpc/grpc-go/issues/1387). The idea is that the first
  1209. // GoAway will be sent with an ID of MaxInt32 and the second GoAway will be
  1210. // sent after an RTT delay with the ID of the last stream the server will
  1211. // process.
  1212. //
  1213. // Therefore, when we get the first GoAway we don't necessarily close any
  1214. // streams. While in case of second GoAway we close all streams created after
  1215. // the GoAwayId. This way streams that were in-flight while the GoAway from
  1216. // server was being sent don't get killed.
  1217. select {
  1218. case <-t.goAway: // t.goAway has been closed (i.e.,multiple GoAways).
  1219. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
  1220. if id > t.prevGoAwayID {
  1221. t.mu.Unlock()
  1222. t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
  1223. return
  1224. }
  1225. default:
  1226. t.setGoAwayReason(f)
  1227. close(t.goAway)
  1228. defer t.controlBuf.put(&incomingGoAway{}) // Defer as t.mu is currently held.
  1229. // Notify the clientconn about the GOAWAY before we set the state to
  1230. // draining, to allow the client to stop attempting to create streams
  1231. // before disallowing new streams on this connection.
  1232. if t.state != draining {
  1233. t.onClose(t.goAwayReason)
  1234. t.state = draining
  1235. }
  1236. }
  1237. // All streams with IDs greater than the GoAwayId
  1238. // and smaller than the previous GoAway ID should be killed.
  1239. upperLimit := t.prevGoAwayID
  1240. if upperLimit == 0 { // This is the first GoAway Frame.
  1241. upperLimit = math.MaxUint32 // Kill all streams after the GoAway ID.
  1242. }
  1243. t.prevGoAwayID = id
  1244. if len(t.activeStreams) == 0 {
  1245. t.mu.Unlock()
  1246. t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
  1247. return
  1248. }
  1249. streamsToClose := make([]*Stream, 0)
  1250. for streamID, stream := range t.activeStreams {
  1251. if streamID > id && streamID <= upperLimit {
  1252. // The stream was unprocessed by the server.
  1253. if streamID > id && streamID <= upperLimit {
  1254. atomic.StoreUint32(&stream.unprocessed, 1)
  1255. streamsToClose = append(streamsToClose, stream)
  1256. }
  1257. }
  1258. }
  1259. t.mu.Unlock()
  1260. // Called outside t.mu because closeStream can take controlBuf's mu, which
  1261. // could induce deadlock and is not allowed.
  1262. for _, stream := range streamsToClose {
  1263. t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
  1264. }
  1265. }
  1266. // setGoAwayReason sets the value of t.goAwayReason based
  1267. // on the GoAway frame received.
  1268. // It expects a lock on transport's mutex to be held by
  1269. // the caller.
  1270. func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
  1271. t.goAwayReason = GoAwayNoReason
  1272. switch f.ErrCode {
  1273. case http2.ErrCodeEnhanceYourCalm:
  1274. if string(f.DebugData()) == "too_many_pings" {
  1275. t.goAwayReason = GoAwayTooManyPings
  1276. }
  1277. }
  1278. if len(f.DebugData()) == 0 {
  1279. t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
  1280. } else {
  1281. t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
  1282. }
  1283. }
  1284. func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
  1285. t.mu.Lock()
  1286. defer t.mu.Unlock()
  1287. return t.goAwayReason, t.goAwayDebugMessage
  1288. }
  1289. func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  1290. t.controlBuf.put(&incomingWindowUpdate{
  1291. streamID: f.Header().StreamID,
  1292. increment: f.Increment,
  1293. })
  1294. }
  1295. // operateHeaders takes action on the decoded headers.
  1296. func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
  1297. s := t.getStream(frame)
  1298. if s == nil {
  1299. return
  1300. }
  1301. endStream := frame.StreamEnded()
  1302. atomic.StoreUint32(&s.bytesReceived, 1)
  1303. initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
  1304. if !initialHeader && !endStream {
  1305. // As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
  1306. st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
  1307. t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
  1308. return
  1309. }
  1310. // frame.Truncated is set to true when framer detects that the current header
  1311. // list size hits MaxHeaderListSize limit.
  1312. if frame.Truncated {
  1313. se := status.New(codes.Internal, "peer header list size exceeded limit")
  1314. t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
  1315. return
  1316. }
  1317. var (
  1318. // If a gRPC Response-Headers has already been received, then it means
  1319. // that the peer is speaking gRPC and we are in gRPC mode.
  1320. isGRPC = !initialHeader
  1321. mdata = make(map[string][]string)
  1322. contentTypeErr = "malformed header: missing HTTP content-type"
  1323. grpcMessage string
  1324. recvCompress string
  1325. httpStatusCode *int
  1326. httpStatusErr string
  1327. rawStatusCode = codes.Unknown
  1328. // headerError is set if an error is encountered while parsing the headers
  1329. headerError string
  1330. )
  1331. if initialHeader {
  1332. httpStatusErr = "malformed header: missing HTTP status"
  1333. }
  1334. for _, hf := range frame.Fields {
  1335. switch hf.Name {
  1336. case "content-type":
  1337. if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
  1338. contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
  1339. break
  1340. }
  1341. contentTypeErr = ""
  1342. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  1343. isGRPC = true
  1344. case "grpc-encoding":
  1345. recvCompress = hf.Value
  1346. case "grpc-status":
  1347. code, err := strconv.ParseInt(hf.Value, 10, 32)
  1348. if err != nil {
  1349. se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
  1350. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1351. return
  1352. }
  1353. rawStatusCode = codes.Code(uint32(code))
  1354. case "grpc-message":
  1355. grpcMessage = decodeGrpcMessage(hf.Value)
  1356. case ":status":
  1357. if hf.Value == "200" {
  1358. httpStatusErr = ""
  1359. statusCode := 200
  1360. httpStatusCode = &statusCode
  1361. break
  1362. }
  1363. c, err := strconv.ParseInt(hf.Value, 10, 32)
  1364. if err != nil {
  1365. se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
  1366. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1367. return
  1368. }
  1369. statusCode := int(c)
  1370. httpStatusCode = &statusCode
  1371. httpStatusErr = fmt.Sprintf(
  1372. "unexpected HTTP status code received from server: %d (%s)",
  1373. statusCode,
  1374. http.StatusText(statusCode),
  1375. )
  1376. default:
  1377. if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
  1378. break
  1379. }
  1380. v, err := decodeMetadataHeader(hf.Name, hf.Value)
  1381. if err != nil {
  1382. headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
  1383. logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
  1384. break
  1385. }
  1386. mdata[hf.Name] = append(mdata[hf.Name], v)
  1387. }
  1388. }
  1389. if !isGRPC || httpStatusErr != "" {
  1390. var code = codes.Internal // when header does not include HTTP status, return INTERNAL
  1391. if httpStatusCode != nil {
  1392. var ok bool
  1393. code, ok = HTTPStatusConvTab[*httpStatusCode]
  1394. if !ok {
  1395. code = codes.Unknown
  1396. }
  1397. }
  1398. var errs []string
  1399. if httpStatusErr != "" {
  1400. errs = append(errs, httpStatusErr)
  1401. }
  1402. if contentTypeErr != "" {
  1403. errs = append(errs, contentTypeErr)
  1404. }
  1405. // Verify the HTTP response is a 200.
  1406. se := status.New(code, strings.Join(errs, "; "))
  1407. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1408. return
  1409. }
  1410. if headerError != "" {
  1411. se := status.New(codes.Internal, headerError)
  1412. t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
  1413. return
  1414. }
  1415. // For headers, set them in s.header and close headerChan. For trailers or
  1416. // trailers-only, closeStream will set the trailers and close headerChan as
  1417. // needed.
  1418. if !endStream {
  1419. // If headerChan hasn't been closed yet (expected, given we checked it
  1420. // above, but something else could have potentially closed the whole
  1421. // stream).
  1422. if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
  1423. s.headerValid = true
  1424. // These values can be set without any synchronization because
  1425. // stream goroutine will read it only after seeing a closed
  1426. // headerChan which we'll close after setting this.
  1427. s.recvCompress = recvCompress
  1428. if len(mdata) > 0 {
  1429. s.header = mdata
  1430. }
  1431. close(s.headerChan)
  1432. }
  1433. }
  1434. for _, sh := range t.statsHandlers {
  1435. if !endStream {
  1436. inHeader := &stats.InHeader{
  1437. Client: true,
  1438. WireLength: int(frame.Header().Length),
  1439. Header: metadata.MD(mdata).Copy(),
  1440. Compression: s.recvCompress,
  1441. }
  1442. sh.HandleRPC(s.ctx, inHeader)
  1443. } else {
  1444. inTrailer := &stats.InTrailer{
  1445. Client: true,
  1446. WireLength: int(frame.Header().Length),
  1447. Trailer: metadata.MD(mdata).Copy(),
  1448. }
  1449. sh.HandleRPC(s.ctx, inTrailer)
  1450. }
  1451. }
  1452. if !endStream {
  1453. return
  1454. }
  1455. status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
  1456. // If client received END_STREAM from server while stream was still active,
  1457. // send RST_STREAM.
  1458. rstStream := s.getState() == streamActive
  1459. t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
  1460. }
  1461. // readServerPreface reads and handles the initial settings frame from the
  1462. // server.
  1463. func (t *http2Client) readServerPreface() error {
  1464. frame, err := t.framer.fr.ReadFrame()
  1465. if err != nil {
  1466. return connectionErrorf(true, err, "error reading server preface: %v", err)
  1467. }
  1468. sf, ok := frame.(*http2.SettingsFrame)
  1469. if !ok {
  1470. return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
  1471. }
  1472. t.handleSettings(sf, true)
  1473. return nil
  1474. }
  1475. // reader verifies the server preface and reads all subsequent data from
  1476. // network connection. If the server preface is not read successfully, an
  1477. // error is pushed to errCh; otherwise errCh is closed with no error.
  1478. func (t *http2Client) reader(errCh chan<- error) {
  1479. defer close(t.readerDone)
  1480. if err := t.readServerPreface(); err != nil {
  1481. errCh <- err
  1482. return
  1483. }
  1484. close(errCh)
  1485. if t.keepaliveEnabled {
  1486. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1487. }
  1488. // loop to keep reading incoming messages on this transport.
  1489. for {
  1490. t.controlBuf.throttle()
  1491. frame, err := t.framer.fr.ReadFrame()
  1492. if t.keepaliveEnabled {
  1493. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  1494. }
  1495. if err != nil {
  1496. // Abort an active stream if the http2.Framer returns a
  1497. // http2.StreamError. This can happen only if the server's response
  1498. // is malformed http2.
  1499. if se, ok := err.(http2.StreamError); ok {
  1500. t.mu.Lock()
  1501. s := t.activeStreams[se.StreamID]
  1502. t.mu.Unlock()
  1503. if s != nil {
  1504. // use error detail to provide better err message
  1505. code := http2ErrConvTab[se.Code]
  1506. errorDetail := t.framer.fr.ErrorDetail()
  1507. var msg string
  1508. if errorDetail != nil {
  1509. msg = errorDetail.Error()
  1510. } else {
  1511. msg = "received invalid frame"
  1512. }
  1513. t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
  1514. }
  1515. continue
  1516. } else {
  1517. // Transport error.
  1518. t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
  1519. return
  1520. }
  1521. }
  1522. switch frame := frame.(type) {
  1523. case *http2.MetaHeadersFrame:
  1524. t.operateHeaders(frame)
  1525. case *http2.DataFrame:
  1526. t.handleData(frame)
  1527. case *http2.RSTStreamFrame:
  1528. t.handleRSTStream(frame)
  1529. case *http2.SettingsFrame:
  1530. t.handleSettings(frame, false)
  1531. case *http2.PingFrame:
  1532. t.handlePing(frame)
  1533. case *http2.GoAwayFrame:
  1534. t.handleGoAway(frame)
  1535. case *http2.WindowUpdateFrame:
  1536. t.handleWindowUpdate(frame)
  1537. default:
  1538. if logger.V(logLevel) {
  1539. logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
  1540. }
  1541. }
  1542. }
  1543. }
  1544. func minTime(a, b time.Duration) time.Duration {
  1545. if a < b {
  1546. return a
  1547. }
  1548. return b
  1549. }
  1550. // keepalive running in a separate goroutine makes sure the connection is alive by sending pings.
  1551. func (t *http2Client) keepalive() {
  1552. p := &ping{data: [8]byte{}}
  1553. // True iff a ping has been sent, and no data has been received since then.
  1554. outstandingPing := false
  1555. // Amount of time remaining before which we should receive an ACK for the
  1556. // last sent ping.
  1557. timeoutLeft := time.Duration(0)
  1558. // Records the last value of t.lastRead before we go block on the timer.
  1559. // This is required to check for read activity since then.
  1560. prevNano := time.Now().UnixNano()
  1561. timer := time.NewTimer(t.kp.Time)
  1562. for {
  1563. select {
  1564. case <-timer.C:
  1565. lastRead := atomic.LoadInt64(&t.lastRead)
  1566. if lastRead > prevNano {
  1567. // There has been read activity since the last time we were here.
  1568. outstandingPing = false
  1569. // Next timer should fire at kp.Time seconds from lastRead time.
  1570. timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1571. prevNano = lastRead
  1572. continue
  1573. }
  1574. if outstandingPing && timeoutLeft <= 0 {
  1575. t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
  1576. return
  1577. }
  1578. t.mu.Lock()
  1579. if t.state == closing {
  1580. // If the transport is closing, we should exit from the
  1581. // keepalive goroutine here. If not, we could have a race
  1582. // between the call to Signal() from Close() and the call to
  1583. // Wait() here, whereby the keepalive goroutine ends up
  1584. // blocking on the condition variable which will never be
  1585. // signalled again.
  1586. t.mu.Unlock()
  1587. return
  1588. }
  1589. if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
  1590. // If a ping was sent out previously (because there were active
  1591. // streams at that point) which wasn't acked and its timeout
  1592. // hadn't fired, but we got here and are about to go dormant,
  1593. // we should make sure that we unconditionally send a ping once
  1594. // we awaken.
  1595. outstandingPing = false
  1596. t.kpDormant = true
  1597. t.kpDormancyCond.Wait()
  1598. }
  1599. t.kpDormant = false
  1600. t.mu.Unlock()
  1601. // We get here either because we were dormant and a new stream was
  1602. // created which unblocked the Wait() call, or because the
  1603. // keepalive timer expired. In both cases, we need to send a ping.
  1604. if !outstandingPing {
  1605. if channelz.IsOn() {
  1606. atomic.AddInt64(&t.czData.kpCount, 1)
  1607. }
  1608. t.controlBuf.put(p)
  1609. timeoutLeft = t.kp.Timeout
  1610. outstandingPing = true
  1611. }
  1612. // The amount of time to sleep here is the minimum of kp.Time and
  1613. // timeoutLeft. This will ensure that we wait only for kp.Time
  1614. // before sending out the next ping (for cases where the ping is
  1615. // acked).
  1616. sleepDuration := minTime(t.kp.Time, timeoutLeft)
  1617. timeoutLeft -= sleepDuration
  1618. timer.Reset(sleepDuration)
  1619. case <-t.ctx.Done():
  1620. if !timer.Stop() {
  1621. <-timer.C
  1622. }
  1623. return
  1624. }
  1625. }
  1626. }
  1627. func (t *http2Client) Error() <-chan struct{} {
  1628. return t.ctx.Done()
  1629. }
  1630. func (t *http2Client) GoAway() <-chan struct{} {
  1631. return t.goAway
  1632. }
  1633. func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric {
  1634. s := channelz.SocketInternalMetric{
  1635. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1636. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1637. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1638. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1639. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1640. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1641. LastLocalStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1642. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1643. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1644. LocalFlowControlWindow: int64(t.fc.getSize()),
  1645. SocketOptions: channelz.GetSocketOption(t.conn),
  1646. LocalAddr: t.localAddr,
  1647. RemoteAddr: t.remoteAddr,
  1648. // RemoteName :
  1649. }
  1650. if au, ok := t.authInfo.(credentials.ChannelzSecurityInfo); ok {
  1651. s.Security = au.GetSecurityValue()
  1652. }
  1653. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1654. return &s
  1655. }
  1656. func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
  1657. func (t *http2Client) IncrMsgSent() {
  1658. atomic.AddInt64(&t.czData.msgSent, 1)
  1659. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1660. }
  1661. func (t *http2Client) IncrMsgRecv() {
  1662. atomic.AddInt64(&t.czData.msgRecv, 1)
  1663. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1664. }
  1665. func (t *http2Client) getOutFlowWindow() int64 {
  1666. resp := make(chan uint32, 1)
  1667. timer := time.NewTimer(time.Second)
  1668. defer timer.Stop()
  1669. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1670. select {
  1671. case sz := <-resp:
  1672. return int64(sz)
  1673. case <-t.ctxDone:
  1674. return -1
  1675. case <-timer.C:
  1676. return -2
  1677. }
  1678. }
  1679. func (t *http2Client) stateForTesting() transportState {
  1680. t.mu.Lock()
  1681. defer t.mu.Unlock()
  1682. return t.state
  1683. }