http2_server.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "context"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "net/http"
  28. "strconv"
  29. "sync"
  30. "sync/atomic"
  31. "time"
  32. "github.com/golang/protobuf/proto"
  33. "golang.org/x/net/http2"
  34. "golang.org/x/net/http2/hpack"
  35. "google.golang.org/grpc/internal/grpclog"
  36. "google.golang.org/grpc/internal/grpcutil"
  37. "google.golang.org/grpc/internal/pretty"
  38. "google.golang.org/grpc/internal/syscall"
  39. "google.golang.org/grpc/codes"
  40. "google.golang.org/grpc/credentials"
  41. "google.golang.org/grpc/internal/channelz"
  42. "google.golang.org/grpc/internal/grpcrand"
  43. "google.golang.org/grpc/internal/grpcsync"
  44. "google.golang.org/grpc/keepalive"
  45. "google.golang.org/grpc/metadata"
  46. "google.golang.org/grpc/peer"
  47. "google.golang.org/grpc/stats"
  48. "google.golang.org/grpc/status"
  49. "google.golang.org/grpc/tap"
  50. )
  51. var (
  52. // ErrIllegalHeaderWrite indicates that setting header is illegal because of
  53. // the stream's state.
  54. ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
  55. // ErrHeaderListSizeLimitViolation indicates that the header list size is larger
  56. // than the limit set by peer.
  57. ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
  58. )
  59. // serverConnectionCounter counts the number of connections a server has seen
  60. // (equal to the number of http2Servers created). Must be accessed atomically.
  61. var serverConnectionCounter uint64
  62. // http2Server implements the ServerTransport interface with HTTP2.
  63. type http2Server struct {
  64. lastRead int64 // Keep this field 64-bit aligned. Accessed atomically.
  65. done chan struct{}
  66. conn net.Conn
  67. loopy *loopyWriter
  68. readerDone chan struct{} // sync point to enable testing.
  69. loopyWriterDone chan struct{}
  70. peer peer.Peer
  71. inTapHandle tap.ServerInHandle
  72. framer *framer
  73. // The max number of concurrent streams.
  74. maxStreams uint32
  75. // controlBuf delivers all the control related tasks (e.g., window
  76. // updates, reset streams, and various settings) to the controller.
  77. controlBuf *controlBuffer
  78. fc *trInFlow
  79. stats []stats.Handler
  80. // Keepalive and max-age parameters for the server.
  81. kp keepalive.ServerParameters
  82. // Keepalive enforcement policy.
  83. kep keepalive.EnforcementPolicy
  84. // The time instance last ping was received.
  85. lastPingAt time.Time
  86. // Number of times the client has violated keepalive ping policy so far.
  87. pingStrikes uint8
  88. // Flag to signify that number of ping strikes should be reset to 0.
  89. // This is set whenever data or header frames are sent.
  90. // 1 means yes.
  91. resetPingStrikes uint32 // Accessed atomically.
  92. initialWindowSize int32
  93. bdpEst *bdpEstimator
  94. maxSendHeaderListSize *uint32
  95. mu sync.Mutex // guard the following
  96. // drainEvent is initialized when Drain() is called the first time. After
  97. // which the server writes out the first GoAway(with ID 2^31-1) frame. Then
  98. // an independent goroutine will be launched to later send the second
  99. // GoAway. During this time we don't want to write another first GoAway(with
  100. // ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
  101. // already initialized since draining is already underway.
  102. drainEvent *grpcsync.Event
  103. state transportState
  104. activeStreams map[uint32]*Stream
  105. // idle is the time instant when the connection went idle.
  106. // This is either the beginning of the connection or when the number of
  107. // RPCs go down to 0.
  108. // When the connection is busy, this value is set to 0.
  109. idle time.Time
  110. // Fields below are for channelz metric collection.
  111. channelzID *channelz.Identifier
  112. czData *channelzData
  113. bufferPool *bufferPool
  114. connectionID uint64
  115. // maxStreamMu guards the maximum stream ID
  116. // This lock may not be taken if mu is already held.
  117. maxStreamMu sync.Mutex
  118. maxStreamID uint32 // max stream ID ever seen
  119. logger *grpclog.PrefixLogger
  120. }
  121. // NewServerTransport creates a http2 transport with conn and configuration
  122. // options from config.
  123. //
  124. // It returns a non-nil transport and a nil error on success. On failure, it
  125. // returns a nil transport and a non-nil error. For a special case where the
  126. // underlying conn gets closed before the client preface could be read, it
  127. // returns a nil transport and a nil error.
  128. func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  129. var authInfo credentials.AuthInfo
  130. rawConn := conn
  131. if config.Credentials != nil {
  132. var err error
  133. conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
  134. if err != nil {
  135. // ErrConnDispatched means that the connection was dispatched away
  136. // from gRPC; those connections should be left open. io.EOF means
  137. // the connection was closed before handshaking completed, which can
  138. // happen naturally from probers. Return these errors directly.
  139. if err == credentials.ErrConnDispatched || err == io.EOF {
  140. return nil, err
  141. }
  142. return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
  143. }
  144. }
  145. writeBufSize := config.WriteBufferSize
  146. readBufSize := config.ReadBufferSize
  147. maxHeaderListSize := defaultServerMaxHeaderListSize
  148. if config.MaxHeaderListSize != nil {
  149. maxHeaderListSize = *config.MaxHeaderListSize
  150. }
  151. framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
  152. // Send initial settings as connection preface to client.
  153. isettings := []http2.Setting{{
  154. ID: http2.SettingMaxFrameSize,
  155. Val: http2MaxFrameLen,
  156. }}
  157. if config.MaxStreams != math.MaxUint32 {
  158. isettings = append(isettings, http2.Setting{
  159. ID: http2.SettingMaxConcurrentStreams,
  160. Val: config.MaxStreams,
  161. })
  162. }
  163. dynamicWindow := true
  164. iwz := int32(initialWindowSize)
  165. if config.InitialWindowSize >= defaultWindowSize {
  166. iwz = config.InitialWindowSize
  167. dynamicWindow = false
  168. }
  169. icwz := int32(initialWindowSize)
  170. if config.InitialConnWindowSize >= defaultWindowSize {
  171. icwz = config.InitialConnWindowSize
  172. dynamicWindow = false
  173. }
  174. if iwz != defaultWindowSize {
  175. isettings = append(isettings, http2.Setting{
  176. ID: http2.SettingInitialWindowSize,
  177. Val: uint32(iwz)})
  178. }
  179. if config.MaxHeaderListSize != nil {
  180. isettings = append(isettings, http2.Setting{
  181. ID: http2.SettingMaxHeaderListSize,
  182. Val: *config.MaxHeaderListSize,
  183. })
  184. }
  185. if config.HeaderTableSize != nil {
  186. isettings = append(isettings, http2.Setting{
  187. ID: http2.SettingHeaderTableSize,
  188. Val: *config.HeaderTableSize,
  189. })
  190. }
  191. if err := framer.fr.WriteSettings(isettings...); err != nil {
  192. return nil, connectionErrorf(false, err, "transport: %v", err)
  193. }
  194. // Adjust the connection flow control window if needed.
  195. if delta := uint32(icwz - defaultWindowSize); delta > 0 {
  196. if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
  197. return nil, connectionErrorf(false, err, "transport: %v", err)
  198. }
  199. }
  200. kp := config.KeepaliveParams
  201. if kp.MaxConnectionIdle == 0 {
  202. kp.MaxConnectionIdle = defaultMaxConnectionIdle
  203. }
  204. if kp.MaxConnectionAge == 0 {
  205. kp.MaxConnectionAge = defaultMaxConnectionAge
  206. }
  207. // Add a jitter to MaxConnectionAge.
  208. kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
  209. if kp.MaxConnectionAgeGrace == 0 {
  210. kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
  211. }
  212. if kp.Time == 0 {
  213. kp.Time = defaultServerKeepaliveTime
  214. }
  215. if kp.Timeout == 0 {
  216. kp.Timeout = defaultServerKeepaliveTimeout
  217. }
  218. if kp.Time != infinity {
  219. if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
  220. return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
  221. }
  222. }
  223. kep := config.KeepalivePolicy
  224. if kep.MinTime == 0 {
  225. kep.MinTime = defaultKeepalivePolicyMinTime
  226. }
  227. done := make(chan struct{})
  228. peer := peer.Peer{
  229. Addr: conn.RemoteAddr(),
  230. LocalAddr: conn.LocalAddr(),
  231. AuthInfo: authInfo,
  232. }
  233. t := &http2Server{
  234. done: done,
  235. conn: conn,
  236. peer: peer,
  237. framer: framer,
  238. readerDone: make(chan struct{}),
  239. loopyWriterDone: make(chan struct{}),
  240. maxStreams: config.MaxStreams,
  241. inTapHandle: config.InTapHandle,
  242. fc: &trInFlow{limit: uint32(icwz)},
  243. state: reachable,
  244. activeStreams: make(map[uint32]*Stream),
  245. stats: config.StatsHandlers,
  246. kp: kp,
  247. idle: time.Now(),
  248. kep: kep,
  249. initialWindowSize: iwz,
  250. czData: new(channelzData),
  251. bufferPool: newBufferPool(),
  252. }
  253. t.logger = prefixLoggerForServerTransport(t)
  254. t.controlBuf = newControlBuffer(t.done)
  255. if dynamicWindow {
  256. t.bdpEst = &bdpEstimator{
  257. bdp: initialWindowSize,
  258. updateFlowControl: t.updateFlowControl,
  259. }
  260. }
  261. t.channelzID, err = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.peer.Addr, t.peer.LocalAddr))
  262. if err != nil {
  263. return nil, err
  264. }
  265. t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
  266. t.framer.writer.Flush()
  267. defer func() {
  268. if err != nil {
  269. t.Close(err)
  270. }
  271. }()
  272. // Check the validity of client preface.
  273. preface := make([]byte, len(clientPreface))
  274. if _, err := io.ReadFull(t.conn, preface); err != nil {
  275. // In deployments where a gRPC server runs behind a cloud load balancer
  276. // which performs regular TCP level health checks, the connection is
  277. // closed immediately by the latter. Returning io.EOF here allows the
  278. // grpc server implementation to recognize this scenario and suppress
  279. // logging to reduce spam.
  280. if err == io.EOF {
  281. return nil, io.EOF
  282. }
  283. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
  284. }
  285. if !bytes.Equal(preface, clientPreface) {
  286. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
  287. }
  288. frame, err := t.framer.fr.ReadFrame()
  289. if err == io.EOF || err == io.ErrUnexpectedEOF {
  290. return nil, err
  291. }
  292. if err != nil {
  293. return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
  294. }
  295. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  296. sf, ok := frame.(*http2.SettingsFrame)
  297. if !ok {
  298. return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
  299. }
  300. t.handleSettings(sf)
  301. go func() {
  302. t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger)
  303. t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
  304. t.loopy.run()
  305. close(t.loopyWriterDone)
  306. }()
  307. go t.keepalive()
  308. return t, nil
  309. }
  310. // operateHeaders takes action on the decoded headers. Returns an error if fatal
  311. // error encountered and transport needs to close, otherwise returns nil.
  312. func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
  313. // Acquire max stream ID lock for entire duration
  314. t.maxStreamMu.Lock()
  315. defer t.maxStreamMu.Unlock()
  316. streamID := frame.Header().StreamID
  317. // frame.Truncated is set to true when framer detects that the current header
  318. // list size hits MaxHeaderListSize limit.
  319. if frame.Truncated {
  320. t.controlBuf.put(&cleanupStream{
  321. streamID: streamID,
  322. rst: true,
  323. rstCode: http2.ErrCodeFrameSize,
  324. onWrite: func() {},
  325. })
  326. return nil
  327. }
  328. if streamID%2 != 1 || streamID <= t.maxStreamID {
  329. // illegal gRPC stream id.
  330. return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
  331. }
  332. t.maxStreamID = streamID
  333. buf := newRecvBuffer()
  334. s := &Stream{
  335. id: streamID,
  336. st: t,
  337. buf: buf,
  338. fc: &inFlow{limit: uint32(t.initialWindowSize)},
  339. headerWireLength: int(frame.Header().Length),
  340. }
  341. var (
  342. // if false, content-type was missing or invalid
  343. isGRPC = false
  344. contentType = ""
  345. mdata = make(metadata.MD, len(frame.Fields))
  346. httpMethod string
  347. // these are set if an error is encountered while parsing the headers
  348. protocolError bool
  349. headerError *status.Status
  350. timeoutSet bool
  351. timeout time.Duration
  352. )
  353. for _, hf := range frame.Fields {
  354. switch hf.Name {
  355. case "content-type":
  356. contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
  357. if !validContentType {
  358. contentType = hf.Value
  359. break
  360. }
  361. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  362. s.contentSubtype = contentSubtype
  363. isGRPC = true
  364. case "grpc-accept-encoding":
  365. mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
  366. if hf.Value == "" {
  367. continue
  368. }
  369. compressors := hf.Value
  370. if s.clientAdvertisedCompressors != "" {
  371. compressors = s.clientAdvertisedCompressors + "," + compressors
  372. }
  373. s.clientAdvertisedCompressors = compressors
  374. case "grpc-encoding":
  375. s.recvCompress = hf.Value
  376. case ":method":
  377. httpMethod = hf.Value
  378. case ":path":
  379. s.method = hf.Value
  380. case "grpc-timeout":
  381. timeoutSet = true
  382. var err error
  383. if timeout, err = decodeTimeout(hf.Value); err != nil {
  384. headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
  385. }
  386. // "Transports must consider requests containing the Connection header
  387. // as malformed." - A41
  388. case "connection":
  389. if t.logger.V(logLevel) {
  390. t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
  391. }
  392. protocolError = true
  393. default:
  394. if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
  395. break
  396. }
  397. v, err := decodeMetadataHeader(hf.Name, hf.Value)
  398. if err != nil {
  399. headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
  400. t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
  401. break
  402. }
  403. mdata[hf.Name] = append(mdata[hf.Name], v)
  404. }
  405. }
  406. // "If multiple Host headers or multiple :authority headers are present, the
  407. // request must be rejected with an HTTP status code 400 as required by Host
  408. // validation in RFC 7230 §5.4, gRPC status code INTERNAL, or RST_STREAM
  409. // with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
  410. // error, this takes precedence over a client not speaking gRPC.
  411. if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
  412. errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
  413. if t.logger.V(logLevel) {
  414. t.logger.Infof("Aborting the stream early: %v", errMsg)
  415. }
  416. t.controlBuf.put(&earlyAbortStream{
  417. httpStatus: http.StatusBadRequest,
  418. streamID: streamID,
  419. contentSubtype: s.contentSubtype,
  420. status: status.New(codes.Internal, errMsg),
  421. rst: !frame.StreamEnded(),
  422. })
  423. return nil
  424. }
  425. if protocolError {
  426. t.controlBuf.put(&cleanupStream{
  427. streamID: streamID,
  428. rst: true,
  429. rstCode: http2.ErrCodeProtocol,
  430. onWrite: func() {},
  431. })
  432. return nil
  433. }
  434. if !isGRPC {
  435. t.controlBuf.put(&earlyAbortStream{
  436. httpStatus: http.StatusUnsupportedMediaType,
  437. streamID: streamID,
  438. contentSubtype: s.contentSubtype,
  439. status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
  440. rst: !frame.StreamEnded(),
  441. })
  442. return nil
  443. }
  444. if headerError != nil {
  445. t.controlBuf.put(&earlyAbortStream{
  446. httpStatus: http.StatusBadRequest,
  447. streamID: streamID,
  448. contentSubtype: s.contentSubtype,
  449. status: headerError,
  450. rst: !frame.StreamEnded(),
  451. })
  452. return nil
  453. }
  454. // "If :authority is missing, Host must be renamed to :authority." - A41
  455. if len(mdata[":authority"]) == 0 {
  456. // No-op if host isn't present, no eventual :authority header is a valid
  457. // RPC.
  458. if host, ok := mdata["host"]; ok {
  459. mdata[":authority"] = host
  460. delete(mdata, "host")
  461. }
  462. } else {
  463. // "If :authority is present, Host must be discarded" - A41
  464. delete(mdata, "host")
  465. }
  466. if frame.StreamEnded() {
  467. // s is just created by the caller. No lock needed.
  468. s.state = streamReadDone
  469. }
  470. if timeoutSet {
  471. s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
  472. } else {
  473. s.ctx, s.cancel = context.WithCancel(ctx)
  474. }
  475. // Attach the received metadata to the context.
  476. if len(mdata) > 0 {
  477. s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
  478. if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
  479. s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
  480. }
  481. if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
  482. s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
  483. }
  484. }
  485. t.mu.Lock()
  486. if t.state != reachable {
  487. t.mu.Unlock()
  488. s.cancel()
  489. return nil
  490. }
  491. if uint32(len(t.activeStreams)) >= t.maxStreams {
  492. t.mu.Unlock()
  493. t.controlBuf.put(&cleanupStream{
  494. streamID: streamID,
  495. rst: true,
  496. rstCode: http2.ErrCodeRefusedStream,
  497. onWrite: func() {},
  498. })
  499. s.cancel()
  500. return nil
  501. }
  502. if httpMethod != http.MethodPost {
  503. t.mu.Unlock()
  504. errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
  505. if t.logger.V(logLevel) {
  506. t.logger.Infof("Aborting the stream early: %v", errMsg)
  507. }
  508. t.controlBuf.put(&earlyAbortStream{
  509. httpStatus: 405,
  510. streamID: streamID,
  511. contentSubtype: s.contentSubtype,
  512. status: status.New(codes.Internal, errMsg),
  513. rst: !frame.StreamEnded(),
  514. })
  515. s.cancel()
  516. return nil
  517. }
  518. if t.inTapHandle != nil {
  519. var err error
  520. if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
  521. t.mu.Unlock()
  522. if t.logger.V(logLevel) {
  523. t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
  524. }
  525. stat, ok := status.FromError(err)
  526. if !ok {
  527. stat = status.New(codes.PermissionDenied, err.Error())
  528. }
  529. t.controlBuf.put(&earlyAbortStream{
  530. httpStatus: 200,
  531. streamID: s.id,
  532. contentSubtype: s.contentSubtype,
  533. status: stat,
  534. rst: !frame.StreamEnded(),
  535. })
  536. return nil
  537. }
  538. }
  539. t.activeStreams[streamID] = s
  540. if len(t.activeStreams) == 1 {
  541. t.idle = time.Time{}
  542. }
  543. t.mu.Unlock()
  544. if channelz.IsOn() {
  545. atomic.AddInt64(&t.czData.streamsStarted, 1)
  546. atomic.StoreInt64(&t.czData.lastStreamCreatedTime, time.Now().UnixNano())
  547. }
  548. s.requestRead = func(n int) {
  549. t.adjustWindow(s, uint32(n))
  550. }
  551. s.ctxDone = s.ctx.Done()
  552. s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
  553. s.trReader = &transportReader{
  554. reader: &recvBufferReader{
  555. ctx: s.ctx,
  556. ctxDone: s.ctxDone,
  557. recv: s.buf,
  558. freeBuffer: t.bufferPool.put,
  559. },
  560. windowHandler: func(n int) {
  561. t.updateWindow(s, uint32(n))
  562. },
  563. }
  564. // Register the stream with loopy.
  565. t.controlBuf.put(&registerStream{
  566. streamID: s.id,
  567. wq: s.wq,
  568. })
  569. handle(s)
  570. return nil
  571. }
  572. // HandleStreams receives incoming streams using the given handler. This is
  573. // typically run in a separate goroutine.
  574. // traceCtx attaches trace to ctx and returns the new context.
  575. func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
  576. defer func() {
  577. <-t.loopyWriterDone
  578. close(t.readerDone)
  579. }()
  580. for {
  581. t.controlBuf.throttle()
  582. frame, err := t.framer.fr.ReadFrame()
  583. atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
  584. if err != nil {
  585. if se, ok := err.(http2.StreamError); ok {
  586. if t.logger.V(logLevel) {
  587. t.logger.Warningf("Encountered http2.StreamError: %v", se)
  588. }
  589. t.mu.Lock()
  590. s := t.activeStreams[se.StreamID]
  591. t.mu.Unlock()
  592. if s != nil {
  593. t.closeStream(s, true, se.Code, false)
  594. } else {
  595. t.controlBuf.put(&cleanupStream{
  596. streamID: se.StreamID,
  597. rst: true,
  598. rstCode: se.Code,
  599. onWrite: func() {},
  600. })
  601. }
  602. continue
  603. }
  604. if err == io.EOF || err == io.ErrUnexpectedEOF {
  605. t.Close(err)
  606. return
  607. }
  608. t.Close(err)
  609. return
  610. }
  611. switch frame := frame.(type) {
  612. case *http2.MetaHeadersFrame:
  613. if err := t.operateHeaders(ctx, frame, handle); err != nil {
  614. t.Close(err)
  615. break
  616. }
  617. case *http2.DataFrame:
  618. t.handleData(frame)
  619. case *http2.RSTStreamFrame:
  620. t.handleRSTStream(frame)
  621. case *http2.SettingsFrame:
  622. t.handleSettings(frame)
  623. case *http2.PingFrame:
  624. t.handlePing(frame)
  625. case *http2.WindowUpdateFrame:
  626. t.handleWindowUpdate(frame)
  627. case *http2.GoAwayFrame:
  628. // TODO: Handle GoAway from the client appropriately.
  629. default:
  630. if t.logger.V(logLevel) {
  631. t.logger.Infof("Received unsupported frame type %T", frame)
  632. }
  633. }
  634. }
  635. }
  636. func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
  637. t.mu.Lock()
  638. defer t.mu.Unlock()
  639. if t.activeStreams == nil {
  640. // The transport is closing.
  641. return nil, false
  642. }
  643. s, ok := t.activeStreams[f.Header().StreamID]
  644. if !ok {
  645. // The stream is already done.
  646. return nil, false
  647. }
  648. return s, true
  649. }
  650. // adjustWindow sends out extra window update over the initial window size
  651. // of stream if the application is requesting data larger in size than
  652. // the window.
  653. func (t *http2Server) adjustWindow(s *Stream, n uint32) {
  654. if w := s.fc.maybeAdjust(n); w > 0 {
  655. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
  656. }
  657. }
  658. // updateWindow adjusts the inbound quota for the stream and the transport.
  659. // Window updates will deliver to the controller for sending when
  660. // the cumulative quota exceeds the corresponding threshold.
  661. func (t *http2Server) updateWindow(s *Stream, n uint32) {
  662. if w := s.fc.onRead(n); w > 0 {
  663. t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
  664. increment: w,
  665. })
  666. }
  667. }
  668. // updateFlowControl updates the incoming flow control windows
  669. // for the transport and the stream based on the current bdp
  670. // estimation.
  671. func (t *http2Server) updateFlowControl(n uint32) {
  672. t.mu.Lock()
  673. for _, s := range t.activeStreams {
  674. s.fc.newLimit(n)
  675. }
  676. t.initialWindowSize = int32(n)
  677. t.mu.Unlock()
  678. t.controlBuf.put(&outgoingWindowUpdate{
  679. streamID: 0,
  680. increment: t.fc.newLimit(n),
  681. })
  682. t.controlBuf.put(&outgoingSettings{
  683. ss: []http2.Setting{
  684. {
  685. ID: http2.SettingInitialWindowSize,
  686. Val: n,
  687. },
  688. },
  689. })
  690. }
  691. func (t *http2Server) handleData(f *http2.DataFrame) {
  692. size := f.Header().Length
  693. var sendBDPPing bool
  694. if t.bdpEst != nil {
  695. sendBDPPing = t.bdpEst.add(size)
  696. }
  697. // Decouple connection's flow control from application's read.
  698. // An update on connection's flow control should not depend on
  699. // whether user application has read the data or not. Such a
  700. // restriction is already imposed on the stream's flow control,
  701. // and therefore the sender will be blocked anyways.
  702. // Decoupling the connection flow control will prevent other
  703. // active(fast) streams from starving in presence of slow or
  704. // inactive streams.
  705. if w := t.fc.onData(size); w > 0 {
  706. t.controlBuf.put(&outgoingWindowUpdate{
  707. streamID: 0,
  708. increment: w,
  709. })
  710. }
  711. if sendBDPPing {
  712. // Avoid excessive ping detection (e.g. in an L7 proxy)
  713. // by sending a window update prior to the BDP ping.
  714. if w := t.fc.reset(); w > 0 {
  715. t.controlBuf.put(&outgoingWindowUpdate{
  716. streamID: 0,
  717. increment: w,
  718. })
  719. }
  720. t.controlBuf.put(bdpPing)
  721. }
  722. // Select the right stream to dispatch.
  723. s, ok := t.getStream(f)
  724. if !ok {
  725. return
  726. }
  727. if s.getState() == streamReadDone {
  728. t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
  729. return
  730. }
  731. if size > 0 {
  732. if err := s.fc.onData(size); err != nil {
  733. t.closeStream(s, true, http2.ErrCodeFlowControl, false)
  734. return
  735. }
  736. if f.Header().Flags.Has(http2.FlagDataPadded) {
  737. if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
  738. t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
  739. }
  740. }
  741. // TODO(bradfitz, zhaoq): A copy is required here because there is no
  742. // guarantee f.Data() is consumed before the arrival of next frame.
  743. // Can this copy be eliminated?
  744. if len(f.Data()) > 0 {
  745. buffer := t.bufferPool.get()
  746. buffer.Reset()
  747. buffer.Write(f.Data())
  748. s.write(recvMsg{buffer: buffer})
  749. }
  750. }
  751. if f.StreamEnded() {
  752. // Received the end of stream from the client.
  753. s.compareAndSwapState(streamActive, streamReadDone)
  754. s.write(recvMsg{err: io.EOF})
  755. }
  756. }
  757. func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
  758. // If the stream is not deleted from the transport's active streams map, then do a regular close stream.
  759. if s, ok := t.getStream(f); ok {
  760. t.closeStream(s, false, 0, false)
  761. return
  762. }
  763. // If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
  764. t.controlBuf.put(&cleanupStream{
  765. streamID: f.Header().StreamID,
  766. rst: false,
  767. rstCode: 0,
  768. onWrite: func() {},
  769. })
  770. }
  771. func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
  772. if f.IsAck() {
  773. return
  774. }
  775. var ss []http2.Setting
  776. var updateFuncs []func()
  777. f.ForeachSetting(func(s http2.Setting) error {
  778. switch s.ID {
  779. case http2.SettingMaxHeaderListSize:
  780. updateFuncs = append(updateFuncs, func() {
  781. t.maxSendHeaderListSize = new(uint32)
  782. *t.maxSendHeaderListSize = s.Val
  783. })
  784. default:
  785. ss = append(ss, s)
  786. }
  787. return nil
  788. })
  789. t.controlBuf.executeAndPut(func(any) bool {
  790. for _, f := range updateFuncs {
  791. f()
  792. }
  793. return true
  794. }, &incomingSettings{
  795. ss: ss,
  796. })
  797. }
  798. const (
  799. maxPingStrikes = 2
  800. defaultPingTimeout = 2 * time.Hour
  801. )
  802. func (t *http2Server) handlePing(f *http2.PingFrame) {
  803. if f.IsAck() {
  804. if f.Data == goAwayPing.data && t.drainEvent != nil {
  805. t.drainEvent.Fire()
  806. return
  807. }
  808. // Maybe it's a BDP ping.
  809. if t.bdpEst != nil {
  810. t.bdpEst.calculate(f.Data)
  811. }
  812. return
  813. }
  814. pingAck := &ping{ack: true}
  815. copy(pingAck.data[:], f.Data[:])
  816. t.controlBuf.put(pingAck)
  817. now := time.Now()
  818. defer func() {
  819. t.lastPingAt = now
  820. }()
  821. // A reset ping strikes means that we don't need to check for policy
  822. // violation for this ping and the pingStrikes counter should be set
  823. // to 0.
  824. if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
  825. t.pingStrikes = 0
  826. return
  827. }
  828. t.mu.Lock()
  829. ns := len(t.activeStreams)
  830. t.mu.Unlock()
  831. if ns < 1 && !t.kep.PermitWithoutStream {
  832. // Keepalive shouldn't be active thus, this new ping should
  833. // have come after at least defaultPingTimeout.
  834. if t.lastPingAt.Add(defaultPingTimeout).After(now) {
  835. t.pingStrikes++
  836. }
  837. } else {
  838. // Check if keepalive policy is respected.
  839. if t.lastPingAt.Add(t.kep.MinTime).After(now) {
  840. t.pingStrikes++
  841. }
  842. }
  843. if t.pingStrikes > maxPingStrikes {
  844. // Send goaway and close the connection.
  845. t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
  846. }
  847. }
  848. func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
  849. t.controlBuf.put(&incomingWindowUpdate{
  850. streamID: f.Header().StreamID,
  851. increment: f.Increment,
  852. })
  853. }
  854. func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
  855. for k, vv := range md {
  856. if isReservedHeader(k) {
  857. // Clients don't tolerate reading restricted headers after some non restricted ones were sent.
  858. continue
  859. }
  860. for _, v := range vv {
  861. headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
  862. }
  863. }
  864. return headerFields
  865. }
  866. func (t *http2Server) checkForHeaderListSize(it any) bool {
  867. if t.maxSendHeaderListSize == nil {
  868. return true
  869. }
  870. hdrFrame := it.(*headerFrame)
  871. var sz int64
  872. for _, f := range hdrFrame.hf {
  873. if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
  874. if t.logger.V(logLevel) {
  875. t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
  876. }
  877. return false
  878. }
  879. }
  880. return true
  881. }
  882. func (t *http2Server) streamContextErr(s *Stream) error {
  883. select {
  884. case <-t.done:
  885. return ErrConnClosing
  886. default:
  887. }
  888. return ContextErr(s.ctx.Err())
  889. }
  890. // WriteHeader sends the header metadata md back to the client.
  891. func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
  892. s.hdrMu.Lock()
  893. defer s.hdrMu.Unlock()
  894. if s.getState() == streamDone {
  895. return t.streamContextErr(s)
  896. }
  897. if s.updateHeaderSent() {
  898. return ErrIllegalHeaderWrite
  899. }
  900. if md.Len() > 0 {
  901. if s.header.Len() > 0 {
  902. s.header = metadata.Join(s.header, md)
  903. } else {
  904. s.header = md
  905. }
  906. }
  907. if err := t.writeHeaderLocked(s); err != nil {
  908. return status.Convert(err).Err()
  909. }
  910. return nil
  911. }
  912. func (t *http2Server) setResetPingStrikes() {
  913. atomic.StoreUint32(&t.resetPingStrikes, 1)
  914. }
  915. func (t *http2Server) writeHeaderLocked(s *Stream) error {
  916. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  917. // first and create a slice of that exact size.
  918. headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  919. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  920. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  921. if s.sendCompress != "" {
  922. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  923. }
  924. headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  925. success, err := t.controlBuf.executeAndPut(t.checkForHeaderListSize, &headerFrame{
  926. streamID: s.id,
  927. hf: headerFields,
  928. endStream: false,
  929. onWrite: t.setResetPingStrikes,
  930. })
  931. if !success {
  932. if err != nil {
  933. return err
  934. }
  935. t.closeStream(s, true, http2.ErrCodeInternal, false)
  936. return ErrHeaderListSizeLimitViolation
  937. }
  938. for _, sh := range t.stats {
  939. // Note: Headers are compressed with hpack after this call returns.
  940. // No WireLength field is set here.
  941. outHeader := &stats.OutHeader{
  942. Header: s.header.Copy(),
  943. Compression: s.sendCompress,
  944. }
  945. sh.HandleRPC(s.Context(), outHeader)
  946. }
  947. return nil
  948. }
  949. // WriteStatus sends stream status to the client and terminates the stream.
  950. // There is no further I/O operations being able to perform on this stream.
  951. // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  952. // OK is adopted.
  953. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  954. s.hdrMu.Lock()
  955. defer s.hdrMu.Unlock()
  956. if s.getState() == streamDone {
  957. return nil
  958. }
  959. // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  960. // first and create a slice of that exact size.
  961. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  962. if !s.updateHeaderSent() { // No headers have been sent.
  963. if len(s.header) > 0 { // Send a separate header frame.
  964. if err := t.writeHeaderLocked(s); err != nil {
  965. return err
  966. }
  967. } else { // Send a trailer only response.
  968. headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  969. headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  970. }
  971. }
  972. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  973. headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  974. if p := st.Proto(); p != nil && len(p.Details) > 0 {
  975. // Do not use the user's grpc-status-details-bin (if present) if we are
  976. // even attempting to set our own.
  977. delete(s.trailer, grpcStatusDetailsBinHeader)
  978. stBytes, err := proto.Marshal(p)
  979. if err != nil {
  980. // TODO: return error instead, when callers are able to handle it.
  981. t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
  982. } else {
  983. headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
  984. }
  985. }
  986. // Attach the trailer metadata.
  987. headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  988. trailingHeader := &headerFrame{
  989. streamID: s.id,
  990. hf: headerFields,
  991. endStream: true,
  992. onWrite: t.setResetPingStrikes,
  993. }
  994. success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  995. if !success {
  996. if err != nil {
  997. return err
  998. }
  999. t.closeStream(s, true, http2.ErrCodeInternal, false)
  1000. return ErrHeaderListSizeLimitViolation
  1001. }
  1002. // Send a RST_STREAM after the trailers if the client has not already half-closed.
  1003. rst := s.getState() == streamActive
  1004. t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  1005. for _, sh := range t.stats {
  1006. // Note: The trailer fields are compressed with hpack after this call returns.
  1007. // No WireLength field is set here.
  1008. sh.HandleRPC(s.Context(), &stats.OutTrailer{
  1009. Trailer: s.trailer.Copy(),
  1010. })
  1011. }
  1012. return nil
  1013. }
  1014. // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  1015. // is returns if it fails (e.g., framing error, transport error).
  1016. func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  1017. if !s.isHeaderSent() { // Headers haven't been written yet.
  1018. if err := t.WriteHeader(s, nil); err != nil {
  1019. return err
  1020. }
  1021. } else {
  1022. // Writing headers checks for this condition.
  1023. if s.getState() == streamDone {
  1024. return t.streamContextErr(s)
  1025. }
  1026. }
  1027. df := &dataFrame{
  1028. streamID: s.id,
  1029. h: hdr,
  1030. d: data,
  1031. onEachWrite: t.setResetPingStrikes,
  1032. }
  1033. if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  1034. return t.streamContextErr(s)
  1035. }
  1036. return t.controlBuf.put(df)
  1037. }
  1038. // keepalive running in a separate goroutine does the following:
  1039. // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  1040. // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  1041. // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  1042. // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  1043. // after an additional duration of keepalive.Timeout.
  1044. func (t *http2Server) keepalive() {
  1045. p := &ping{}
  1046. // True iff a ping has been sent, and no data has been received since then.
  1047. outstandingPing := false
  1048. // Amount of time remaining before which we should receive an ACK for the
  1049. // last sent ping.
  1050. kpTimeoutLeft := time.Duration(0)
  1051. // Records the last value of t.lastRead before we go block on the timer.
  1052. // This is required to check for read activity since then.
  1053. prevNano := time.Now().UnixNano()
  1054. // Initialize the different timers to their default values.
  1055. idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  1056. ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  1057. kpTimer := time.NewTimer(t.kp.Time)
  1058. defer func() {
  1059. // We need to drain the underlying channel in these timers after a call
  1060. // to Stop(), only if we are interested in resetting them. Clearly we
  1061. // are not interested in resetting them here.
  1062. idleTimer.Stop()
  1063. ageTimer.Stop()
  1064. kpTimer.Stop()
  1065. }()
  1066. for {
  1067. select {
  1068. case <-idleTimer.C:
  1069. t.mu.Lock()
  1070. idle := t.idle
  1071. if idle.IsZero() { // The connection is non-idle.
  1072. t.mu.Unlock()
  1073. idleTimer.Reset(t.kp.MaxConnectionIdle)
  1074. continue
  1075. }
  1076. val := t.kp.MaxConnectionIdle - time.Since(idle)
  1077. t.mu.Unlock()
  1078. if val <= 0 {
  1079. // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  1080. // Gracefully close the connection.
  1081. t.Drain("max_idle")
  1082. return
  1083. }
  1084. idleTimer.Reset(val)
  1085. case <-ageTimer.C:
  1086. t.Drain("max_age")
  1087. ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  1088. select {
  1089. case <-ageTimer.C:
  1090. // Close the connection after grace period.
  1091. if t.logger.V(logLevel) {
  1092. t.logger.Infof("Closing server transport due to maximum connection age")
  1093. }
  1094. t.controlBuf.put(closeConnection{})
  1095. case <-t.done:
  1096. }
  1097. return
  1098. case <-kpTimer.C:
  1099. lastRead := atomic.LoadInt64(&t.lastRead)
  1100. if lastRead > prevNano {
  1101. // There has been read activity since the last time we were
  1102. // here. Setup the timer to fire at kp.Time seconds from
  1103. // lastRead time and continue.
  1104. outstandingPing = false
  1105. kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1106. prevNano = lastRead
  1107. continue
  1108. }
  1109. if outstandingPing && kpTimeoutLeft <= 0 {
  1110. t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Time))
  1111. return
  1112. }
  1113. if !outstandingPing {
  1114. if channelz.IsOn() {
  1115. atomic.AddInt64(&t.czData.kpCount, 1)
  1116. }
  1117. t.controlBuf.put(p)
  1118. kpTimeoutLeft = t.kp.Timeout
  1119. outstandingPing = true
  1120. }
  1121. // The amount of time to sleep here is the minimum of kp.Time and
  1122. // timeoutLeft. This will ensure that we wait only for kp.Time
  1123. // before sending out the next ping (for cases where the ping is
  1124. // acked).
  1125. sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  1126. kpTimeoutLeft -= sleepDuration
  1127. kpTimer.Reset(sleepDuration)
  1128. case <-t.done:
  1129. return
  1130. }
  1131. }
  1132. }
  1133. // Close starts shutting down the http2Server transport.
  1134. // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1135. // could cause some resource issue. Revisit this later.
  1136. func (t *http2Server) Close(err error) {
  1137. t.mu.Lock()
  1138. if t.state == closing {
  1139. t.mu.Unlock()
  1140. return
  1141. }
  1142. if t.logger.V(logLevel) {
  1143. t.logger.Infof("Closing: %v", err)
  1144. }
  1145. t.state = closing
  1146. streams := t.activeStreams
  1147. t.activeStreams = nil
  1148. t.mu.Unlock()
  1149. t.controlBuf.finish()
  1150. close(t.done)
  1151. if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
  1152. t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
  1153. }
  1154. channelz.RemoveEntry(t.channelzID)
  1155. // Cancel all active streams.
  1156. for _, s := range streams {
  1157. s.cancel()
  1158. }
  1159. }
  1160. // deleteStream deletes the stream s from transport's active streams.
  1161. func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
  1162. t.mu.Lock()
  1163. if _, ok := t.activeStreams[s.id]; ok {
  1164. delete(t.activeStreams, s.id)
  1165. if len(t.activeStreams) == 0 {
  1166. t.idle = time.Now()
  1167. }
  1168. }
  1169. t.mu.Unlock()
  1170. if channelz.IsOn() {
  1171. if eosReceived {
  1172. atomic.AddInt64(&t.czData.streamsSucceeded, 1)
  1173. } else {
  1174. atomic.AddInt64(&t.czData.streamsFailed, 1)
  1175. }
  1176. }
  1177. }
  1178. // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1179. func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1180. // In case stream sending and receiving are invoked in separate
  1181. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1182. // called to interrupt the potential blocking on other goroutines.
  1183. s.cancel()
  1184. oldState := s.swapState(streamDone)
  1185. if oldState == streamDone {
  1186. // If the stream was already done, return.
  1187. return
  1188. }
  1189. hdr.cleanup = &cleanupStream{
  1190. streamID: s.id,
  1191. rst: rst,
  1192. rstCode: rstCode,
  1193. onWrite: func() {
  1194. t.deleteStream(s, eosReceived)
  1195. },
  1196. }
  1197. t.controlBuf.put(hdr)
  1198. }
  1199. // closeStream clears the footprint of a stream when the stream is not needed any more.
  1200. func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1201. // In case stream sending and receiving are invoked in separate
  1202. // goroutines (e.g., bi-directional streaming), cancel needs to be
  1203. // called to interrupt the potential blocking on other goroutines.
  1204. s.cancel()
  1205. s.swapState(streamDone)
  1206. t.deleteStream(s, eosReceived)
  1207. t.controlBuf.put(&cleanupStream{
  1208. streamID: s.id,
  1209. rst: rst,
  1210. rstCode: rstCode,
  1211. onWrite: func() {},
  1212. })
  1213. }
  1214. func (t *http2Server) Drain(debugData string) {
  1215. t.mu.Lock()
  1216. defer t.mu.Unlock()
  1217. if t.drainEvent != nil {
  1218. return
  1219. }
  1220. t.drainEvent = grpcsync.NewEvent()
  1221. t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
  1222. }
  1223. var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1224. // Handles outgoing GoAway and returns true if loopy needs to put itself
  1225. // in draining mode.
  1226. func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1227. t.maxStreamMu.Lock()
  1228. t.mu.Lock()
  1229. if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1230. t.mu.Unlock()
  1231. t.maxStreamMu.Unlock()
  1232. // The transport is closing.
  1233. return false, ErrConnClosing
  1234. }
  1235. if !g.headsUp {
  1236. // Stop accepting more streams now.
  1237. t.state = draining
  1238. sid := t.maxStreamID
  1239. retErr := g.closeConn
  1240. if len(t.activeStreams) == 0 {
  1241. retErr = errors.New("second GOAWAY written and no active streams left to process")
  1242. }
  1243. t.mu.Unlock()
  1244. t.maxStreamMu.Unlock()
  1245. if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1246. return false, err
  1247. }
  1248. if retErr != nil {
  1249. return false, retErr
  1250. }
  1251. return true, nil
  1252. }
  1253. t.mu.Unlock()
  1254. t.maxStreamMu.Unlock()
  1255. // For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1256. // Follow that with a ping and wait for the ack to come back or a timer
  1257. // to expire. During this time accept new streams since they might have
  1258. // originated before the GoAway reaches the client.
  1259. // After getting the ack or timer expiration send out another GoAway this
  1260. // time with an ID of the max stream server intends to process.
  1261. if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
  1262. return false, err
  1263. }
  1264. if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1265. return false, err
  1266. }
  1267. go func() {
  1268. timer := time.NewTimer(time.Minute)
  1269. defer timer.Stop()
  1270. select {
  1271. case <-t.drainEvent.Done():
  1272. case <-timer.C:
  1273. case <-t.done:
  1274. return
  1275. }
  1276. t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1277. }()
  1278. return false, nil
  1279. }
  1280. func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric {
  1281. s := channelz.SocketInternalMetric{
  1282. StreamsStarted: atomic.LoadInt64(&t.czData.streamsStarted),
  1283. StreamsSucceeded: atomic.LoadInt64(&t.czData.streamsSucceeded),
  1284. StreamsFailed: atomic.LoadInt64(&t.czData.streamsFailed),
  1285. MessagesSent: atomic.LoadInt64(&t.czData.msgSent),
  1286. MessagesReceived: atomic.LoadInt64(&t.czData.msgRecv),
  1287. KeepAlivesSent: atomic.LoadInt64(&t.czData.kpCount),
  1288. LastRemoteStreamCreatedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastStreamCreatedTime)),
  1289. LastMessageSentTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgSentTime)),
  1290. LastMessageReceivedTimestamp: time.Unix(0, atomic.LoadInt64(&t.czData.lastMsgRecvTime)),
  1291. LocalFlowControlWindow: int64(t.fc.getSize()),
  1292. SocketOptions: channelz.GetSocketOption(t.conn),
  1293. LocalAddr: t.peer.LocalAddr,
  1294. RemoteAddr: t.peer.Addr,
  1295. // RemoteName :
  1296. }
  1297. if au, ok := t.peer.AuthInfo.(credentials.ChannelzSecurityInfo); ok {
  1298. s.Security = au.GetSecurityValue()
  1299. }
  1300. s.RemoteFlowControlWindow = t.getOutFlowWindow()
  1301. return &s
  1302. }
  1303. func (t *http2Server) IncrMsgSent() {
  1304. atomic.AddInt64(&t.czData.msgSent, 1)
  1305. atomic.StoreInt64(&t.czData.lastMsgSentTime, time.Now().UnixNano())
  1306. }
  1307. func (t *http2Server) IncrMsgRecv() {
  1308. atomic.AddInt64(&t.czData.msgRecv, 1)
  1309. atomic.StoreInt64(&t.czData.lastMsgRecvTime, time.Now().UnixNano())
  1310. }
  1311. func (t *http2Server) getOutFlowWindow() int64 {
  1312. resp := make(chan uint32, 1)
  1313. timer := time.NewTimer(time.Second)
  1314. defer timer.Stop()
  1315. t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1316. select {
  1317. case sz := <-resp:
  1318. return int64(sz)
  1319. case <-t.done:
  1320. return -1
  1321. case <-timer.C:
  1322. return -2
  1323. }
  1324. }
  1325. // Peer returns the peer of the transport.
  1326. func (t *http2Server) Peer() *peer.Peer {
  1327. return &peer.Peer{
  1328. Addr: t.peer.Addr,
  1329. LocalAddr: t.peer.LocalAddr,
  1330. AuthInfo: t.peer.AuthInfo, // Can be nil
  1331. }
  1332. }
  1333. func getJitter(v time.Duration) time.Duration {
  1334. if v == infinity {
  1335. return 0
  1336. }
  1337. // Generate a jitter between +/- 10% of the value.
  1338. r := int64(v / 10)
  1339. j := grpcrand.Int63n(2*r) - r
  1340. return time.Duration(j)
  1341. }
  1342. type connectionKey struct{}
  1343. // GetConnection gets the connection from the context.
  1344. func GetConnection(ctx context.Context) net.Conn {
  1345. conn, _ := ctx.Value(connectionKey{}).(net.Conn)
  1346. return conn
  1347. }
  1348. // SetConnection adds the connection to the context to be able to get
  1349. // information about the destination ip and port for an incoming RPC. This also
  1350. // allows any unary or streaming interceptors to see the connection.
  1351. func SetConnection(ctx context.Context, conn net.Conn) context.Context {
  1352. return context.WithValue(ctx, connectionKey{}, conn)
  1353. }