http_util.go 11 KB


  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bufio"
  21. "encoding/base64"
  22. "errors"
  23. "fmt"
  24. "io"
  25. "math"
  26. "net"
  27. "net/http"
  28. "net/url"
  29. "strconv"
  30. "strings"
  31. "sync"
  32. "time"
  33. "unicode/utf8"
  34. "golang.org/x/net/http2"
  35. "golang.org/x/net/http2/hpack"
  36. "google.golang.org/grpc/codes"
  37. )
  38. const (
  39. // http2MaxFrameLen specifies the max length of a HTTP2 frame.
  40. http2MaxFrameLen = 16384 // 16KB frame
  41. // https://httpwg.org/specs/rfc7540.html#SettingValues
  42. http2InitHeaderTableSize = 4096
  43. )
  44. var (
  45. clientPreface = []byte(http2.ClientPreface)
  46. http2ErrConvTab = map[http2.ErrCode]codes.Code{
  47. http2.ErrCodeNo: codes.Internal,
  48. http2.ErrCodeProtocol: codes.Internal,
  49. http2.ErrCodeInternal: codes.Internal,
  50. http2.ErrCodeFlowControl: codes.ResourceExhausted,
  51. http2.ErrCodeSettingsTimeout: codes.Internal,
  52. http2.ErrCodeStreamClosed: codes.Internal,
  53. http2.ErrCodeFrameSize: codes.Internal,
  54. http2.ErrCodeRefusedStream: codes.Unavailable,
  55. http2.ErrCodeCancel: codes.Canceled,
  56. http2.ErrCodeCompression: codes.Internal,
  57. http2.ErrCodeConnect: codes.Internal,
  58. http2.ErrCodeEnhanceYourCalm: codes.ResourceExhausted,
  59. http2.ErrCodeInadequateSecurity: codes.PermissionDenied,
  60. http2.ErrCodeHTTP11Required: codes.Internal,
  61. }
  62. // HTTPStatusConvTab is the HTTP status code to gRPC error code conversion table.
  63. HTTPStatusConvTab = map[int]codes.Code{
  64. // 400 Bad Request - INTERNAL.
  65. http.StatusBadRequest: codes.Internal,
  66. // 401 Unauthorized - UNAUTHENTICATED.
  67. http.StatusUnauthorized: codes.Unauthenticated,
  68. // 403 Forbidden - PERMISSION_DENIED.
  69. http.StatusForbidden: codes.PermissionDenied,
  70. // 404 Not Found - UNIMPLEMENTED.
  71. http.StatusNotFound: codes.Unimplemented,
  72. // 429 Too Many Requests - UNAVAILABLE.
  73. http.StatusTooManyRequests: codes.Unavailable,
  74. // 502 Bad Gateway - UNAVAILABLE.
  75. http.StatusBadGateway: codes.Unavailable,
  76. // 503 Service Unavailable - UNAVAILABLE.
  77. http.StatusServiceUnavailable: codes.Unavailable,
  78. // 504 Gateway timeout - UNAVAILABLE.
  79. http.StatusGatewayTimeout: codes.Unavailable,
  80. }
  81. )
  82. var grpcStatusDetailsBinHeader = "grpc-status-details-bin"
  83. // isReservedHeader checks whether hdr belongs to HTTP2 headers
  84. // reserved by gRPC protocol. Any other headers are classified as the
  85. // user-specified metadata.
  86. func isReservedHeader(hdr string) bool {
  87. if hdr != "" && hdr[0] == ':' {
  88. return true
  89. }
  90. switch hdr {
  91. case "content-type",
  92. "user-agent",
  93. "grpc-message-type",
  94. "grpc-encoding",
  95. "grpc-message",
  96. "grpc-status",
  97. "grpc-timeout",
  98. // Intentionally exclude grpc-previous-rpc-attempts and
  99. // grpc-retry-pushback-ms, which are "reserved", but their API
  100. // intentionally works via metadata.
  101. "te":
  102. return true
  103. default:
  104. return false
  105. }
  106. }
  107. // isWhitelistedHeader checks whether hdr should be propagated into metadata
  108. // visible to users, even though it is classified as "reserved", above.
  109. func isWhitelistedHeader(hdr string) bool {
  110. switch hdr {
  111. case ":authority", "user-agent":
  112. return true
  113. default:
  114. return false
  115. }
  116. }
  117. const binHdrSuffix = "-bin"
  118. func encodeBinHeader(v []byte) string {
  119. return base64.RawStdEncoding.EncodeToString(v)
  120. }
  121. func decodeBinHeader(v string) ([]byte, error) {
  122. if len(v)%4 == 0 {
  123. // Input was padded, or padding was not necessary.
  124. return base64.StdEncoding.DecodeString(v)
  125. }
  126. return base64.RawStdEncoding.DecodeString(v)
  127. }
  128. func encodeMetadataHeader(k, v string) string {
  129. if strings.HasSuffix(k, binHdrSuffix) {
  130. return encodeBinHeader(([]byte)(v))
  131. }
  132. return v
  133. }
  134. func decodeMetadataHeader(k, v string) (string, error) {
  135. if strings.HasSuffix(k, binHdrSuffix) {
  136. b, err := decodeBinHeader(v)
  137. return string(b), err
  138. }
  139. return v, nil
  140. }
  141. type timeoutUnit uint8
  142. const (
  143. hour timeoutUnit = 'H'
  144. minute timeoutUnit = 'M'
  145. second timeoutUnit = 'S'
  146. millisecond timeoutUnit = 'm'
  147. microsecond timeoutUnit = 'u'
  148. nanosecond timeoutUnit = 'n'
  149. )
  150. func timeoutUnitToDuration(u timeoutUnit) (d time.Duration, ok bool) {
  151. switch u {
  152. case hour:
  153. return time.Hour, true
  154. case minute:
  155. return time.Minute, true
  156. case second:
  157. return time.Second, true
  158. case millisecond:
  159. return time.Millisecond, true
  160. case microsecond:
  161. return time.Microsecond, true
  162. case nanosecond:
  163. return time.Nanosecond, true
  164. default:
  165. }
  166. return
  167. }
  168. func decodeTimeout(s string) (time.Duration, error) {
  169. size := len(s)
  170. if size < 2 {
  171. return 0, fmt.Errorf("transport: timeout string is too short: %q", s)
  172. }
  173. if size > 9 {
  174. // Spec allows for 8 digits plus the unit.
  175. return 0, fmt.Errorf("transport: timeout string is too long: %q", s)
  176. }
  177. unit := timeoutUnit(s[size-1])
  178. d, ok := timeoutUnitToDuration(unit)
  179. if !ok {
  180. return 0, fmt.Errorf("transport: timeout unit is not recognized: %q", s)
  181. }
  182. t, err := strconv.ParseInt(s[:size-1], 10, 64)
  183. if err != nil {
  184. return 0, err
  185. }
  186. const maxHours = math.MaxInt64 / int64(time.Hour)
  187. if d == time.Hour && t > maxHours {
  188. // This timeout would overflow math.MaxInt64; clamp it.
  189. return time.Duration(math.MaxInt64), nil
  190. }
  191. return d * time.Duration(t), nil
  192. }
  193. const (
  194. spaceByte = ' '
  195. tildeByte = '~'
  196. percentByte = '%'
  197. )
  198. // encodeGrpcMessage is used to encode status code in header field
  199. // "grpc-message". It does percent encoding and also replaces invalid utf-8
  200. // characters with Unicode replacement character.
  201. //
  202. // It checks to see if each individual byte in msg is an allowable byte, and
  203. // then either percent encoding or passing it through. When percent encoding,
  204. // the byte is converted into hexadecimal notation with a '%' prepended.
  205. func encodeGrpcMessage(msg string) string {
  206. if msg == "" {
  207. return ""
  208. }
  209. lenMsg := len(msg)
  210. for i := 0; i < lenMsg; i++ {
  211. c := msg[i]
  212. if !(c >= spaceByte && c <= tildeByte && c != percentByte) {
  213. return encodeGrpcMessageUnchecked(msg)
  214. }
  215. }
  216. return msg
  217. }
  218. func encodeGrpcMessageUnchecked(msg string) string {
  219. var sb strings.Builder
  220. for len(msg) > 0 {
  221. r, size := utf8.DecodeRuneInString(msg)
  222. for _, b := range []byte(string(r)) {
  223. if size > 1 {
  224. // If size > 1, r is not ascii. Always do percent encoding.
  225. fmt.Fprintf(&sb, "%%%02X", b)
  226. continue
  227. }
  228. // The for loop is necessary even if size == 1. r could be
  229. // utf8.RuneError.
  230. //
  231. // fmt.Sprintf("%%%02X", utf8.RuneError) gives "%FFFD".
  232. if b >= spaceByte && b <= tildeByte && b != percentByte {
  233. sb.WriteByte(b)
  234. } else {
  235. fmt.Fprintf(&sb, "%%%02X", b)
  236. }
  237. }
  238. msg = msg[size:]
  239. }
  240. return sb.String()
  241. }
  242. // decodeGrpcMessage decodes the msg encoded by encodeGrpcMessage.
  243. func decodeGrpcMessage(msg string) string {
  244. if msg == "" {
  245. return ""
  246. }
  247. lenMsg := len(msg)
  248. for i := 0; i < lenMsg; i++ {
  249. if msg[i] == percentByte && i+2 < lenMsg {
  250. return decodeGrpcMessageUnchecked(msg)
  251. }
  252. }
  253. return msg
  254. }
  255. func decodeGrpcMessageUnchecked(msg string) string {
  256. var sb strings.Builder
  257. lenMsg := len(msg)
  258. for i := 0; i < lenMsg; i++ {
  259. c := msg[i]
  260. if c == percentByte && i+2 < lenMsg {
  261. parsed, err := strconv.ParseUint(msg[i+1:i+3], 16, 8)
  262. if err != nil {
  263. sb.WriteByte(c)
  264. } else {
  265. sb.WriteByte(byte(parsed))
  266. i += 2
  267. }
  268. } else {
  269. sb.WriteByte(c)
  270. }
  271. }
  272. return sb.String()
  273. }
  274. type bufWriter struct {
  275. pool *sync.Pool
  276. buf []byte
  277. offset int
  278. batchSize int
  279. conn net.Conn
  280. err error
  281. }
  282. func newBufWriter(conn net.Conn, batchSize int, pool *sync.Pool) *bufWriter {
  283. w := &bufWriter{
  284. batchSize: batchSize,
  285. conn: conn,
  286. pool: pool,
  287. }
  288. // this indicates that we should use non shared buf
  289. if pool == nil {
  290. w.buf = make([]byte, batchSize)
  291. }
  292. return w
  293. }
  294. func (w *bufWriter) Write(b []byte) (n int, err error) {
  295. if w.err != nil {
  296. return 0, w.err
  297. }
  298. if w.batchSize == 0 { // Buffer has been disabled.
  299. n, err = w.conn.Write(b)
  300. return n, toIOError(err)
  301. }
  302. if w.buf == nil {
  303. b := w.pool.Get().(*[]byte)
  304. w.buf = *b
  305. }
  306. for len(b) > 0 {
  307. nn := copy(w.buf[w.offset:], b)
  308. b = b[nn:]
  309. w.offset += nn
  310. n += nn
  311. if w.offset >= w.batchSize {
  312. err = w.flushKeepBuffer()
  313. }
  314. }
  315. return n, err
  316. }
  317. func (w *bufWriter) Flush() error {
  318. err := w.flushKeepBuffer()
  319. // Only release the buffer if we are in a "shared" mode
  320. if w.buf != nil && w.pool != nil {
  321. b := w.buf
  322. w.pool.Put(&b)
  323. w.buf = nil
  324. }
  325. return err
  326. }
  327. func (w *bufWriter) flushKeepBuffer() error {
  328. if w.err != nil {
  329. return w.err
  330. }
  331. if w.offset == 0 {
  332. return nil
  333. }
  334. _, w.err = w.conn.Write(w.buf[:w.offset])
  335. w.err = toIOError(w.err)
  336. w.offset = 0
  337. return w.err
  338. }
  339. type ioError struct {
  340. error
  341. }
  342. func (i ioError) Unwrap() error {
  343. return i.error
  344. }
  345. func isIOError(err error) bool {
  346. return errors.As(err, &ioError{})
  347. }
  348. func toIOError(err error) error {
  349. if err == nil {
  350. return nil
  351. }
  352. return ioError{error: err}
  353. }
  354. type framer struct {
  355. writer *bufWriter
  356. fr *http2.Framer
  357. }
  358. var writeBufferPoolMap map[int]*sync.Pool = make(map[int]*sync.Pool)
  359. var writeBufferMutex sync.Mutex
  360. func newFramer(conn net.Conn, writeBufferSize, readBufferSize int, sharedWriteBuffer bool, maxHeaderListSize uint32) *framer {
  361. if writeBufferSize < 0 {
  362. writeBufferSize = 0
  363. }
  364. var r io.Reader = conn
  365. if readBufferSize > 0 {
  366. r = bufio.NewReaderSize(r, readBufferSize)
  367. }
  368. var pool *sync.Pool
  369. if sharedWriteBuffer {
  370. pool = getWriteBufferPool(writeBufferSize)
  371. }
  372. w := newBufWriter(conn, writeBufferSize, pool)
  373. f := &framer{
  374. writer: w,
  375. fr: http2.NewFramer(w, r),
  376. }
  377. f.fr.SetMaxReadFrameSize(http2MaxFrameLen)
  378. // Opt-in to Frame reuse API on framer to reduce garbage.
  379. // Frames aren't safe to read from after a subsequent call to ReadFrame.
  380. f.fr.SetReuseFrames()
  381. f.fr.MaxHeaderListSize = maxHeaderListSize
  382. f.fr.ReadMetaHeaders = hpack.NewDecoder(http2InitHeaderTableSize, nil)
  383. return f
  384. }
  385. func getWriteBufferPool(writeBufferSize int) *sync.Pool {
  386. writeBufferMutex.Lock()
  387. defer writeBufferMutex.Unlock()
  388. size := writeBufferSize * 2
  389. pool, ok := writeBufferPoolMap[size]
  390. if ok {
  391. return pool
  392. }
  393. pool = &sync.Pool{
  394. New: func() any {
  395. b := make([]byte, size)
  396. return &b
  397. },
  398. }
  399. writeBufferPoolMap[size] = pool
  400. return pool
  401. }
  402. // parseDialTarget returns the network and address to pass to dialer.
  403. func parseDialTarget(target string) (string, string) {
  404. net := "tcp"
  405. m1 := strings.Index(target, ":")
  406. m2 := strings.Index(target, ":/")
  407. // handle unix:addr which will fail with url.Parse
  408. if m1 >= 0 && m2 < 0 {
  409. if n := target[0:m1]; n == "unix" {
  410. return n, target[m1+1:]
  411. }
  412. }
  413. if m2 >= 0 {
  414. t, err := url.Parse(target)
  415. if err != nil {
  416. return net, target
  417. }
  418. scheme := t.Scheme
  419. addr := t.Path
  420. if scheme == "unix" {
  421. if addr == "" {
  422. addr = t.Host
  423. }
  424. return scheme, addr
  425. }
  426. }
  427. return net, target
  428. }