response.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package protocol
  2. import (
  3. "crypto/tls"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "io"
  8. )
  9. func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) {
  10. if i := int(apiKey); i < 0 || i >= len(apiTypes) {
  11. err = fmt.Errorf("unsupported api key: %d", i)
  12. return
  13. }
  14. t := &apiTypes[apiKey]
  15. if t == nil {
  16. err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
  17. return
  18. }
  19. minVersion := t.minVersion()
  20. maxVersion := t.maxVersion()
  21. if apiVersion < minVersion || apiVersion > maxVersion {
  22. err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
  23. return
  24. }
  25. d := &decoder{reader: r, remain: 4}
  26. size := d.readInt32()
  27. if err = d.err; err != nil {
  28. err = dontExpectEOF(err)
  29. return
  30. }
  31. d.remain = int(size)
  32. correlationID = d.readInt32()
  33. if err = d.err; err != nil {
  34. if errors.Is(err, io.ErrUnexpectedEOF) {
  35. // If a Writer/Reader is configured without TLS and connects
  36. // to a broker expecting TLS the only message we return to the
  37. // caller is io.ErrUnexpetedEOF which is opaque. This section
  38. // tries to determine if that's what has happened.
  39. // We first deconstruct the initial 4 bytes of the message
  40. // from the size which was read earlier.
  41. // Next, we examine those bytes to see if they looks like a TLS
  42. // error message. If they do we wrap the io.ErrUnexpectedEOF
  43. // with some context.
  44. if looksLikeUnexpectedTLS(size) {
  45. err = fmt.Errorf("%w: broker appears to be expecting TLS", io.ErrUnexpectedEOF)
  46. }
  47. return
  48. }
  49. err = dontExpectEOF(err)
  50. return
  51. }
  52. res := &t.responses[apiVersion-minVersion]
  53. if res.flexible {
  54. // In the flexible case, there's a tag buffer at the end of the response header
  55. taggedCount := int(d.readUnsignedVarInt())
  56. for i := 0; i < taggedCount; i++ {
  57. d.readUnsignedVarInt() // tagID
  58. size := d.readUnsignedVarInt()
  59. // Just throw away the values for now
  60. d.read(int(size))
  61. }
  62. }
  63. msg = res.new()
  64. res.decode(d, valueOf(msg))
  65. d.discardAll()
  66. if err = d.err; err != nil {
  67. err = dontExpectEOF(err)
  68. }
  69. return
  70. }
  71. func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {
  72. apiKey := msg.ApiKey()
  73. if i := int(apiKey); i < 0 || i >= len(apiTypes) {
  74. return fmt.Errorf("unsupported api key: %d", i)
  75. }
  76. t := &apiTypes[apiKey]
  77. if t == nil {
  78. return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
  79. }
  80. if typedMessage, ok := msg.(OverrideTypeMessage); ok {
  81. typeKey := typedMessage.TypeKey()
  82. overrideType := overrideApiTypes[apiKey][typeKey]
  83. t = &overrideType
  84. }
  85. minVersion := t.minVersion()
  86. maxVersion := t.maxVersion()
  87. if apiVersion < minVersion || apiVersion > maxVersion {
  88. return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
  89. }
  90. r := &t.responses[apiVersion-minVersion]
  91. v := valueOf(msg)
  92. b := newPageBuffer()
  93. defer b.unref()
  94. e := &encoder{writer: b}
  95. e.writeInt32(0) // placeholder for the response size
  96. e.writeInt32(correlationID)
  97. if r.flexible {
  98. // Flexible messages use extra space for a tag buffer,
  99. // which begins with a size value. Since we're not writing any fields into the
  100. // latter, we can just write zero for now.
  101. //
  102. // See
  103. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
  104. // for details.
  105. e.writeUnsignedVarInt(0)
  106. }
  107. r.encode(e, v)
  108. err := e.err
  109. if err == nil {
  110. size := packUint32(uint32(b.Size()) - 4)
  111. b.WriteAt(size[:], 0)
  112. _, err = b.WriteTo(w)
  113. }
  114. return err
  115. }
  116. const (
  117. tlsAlertByte byte = 0x15
  118. )
  119. // looksLikeUnexpectedTLS returns true if the size passed in resemble
  120. // the TLS alert message that is returned to a client which sends
  121. // an invalid ClientHello message.
  122. func looksLikeUnexpectedTLS(size int32) bool {
  123. var sizeBytes [4]byte
  124. binary.BigEndian.PutUint32(sizeBytes[:], uint32(size))
  125. if sizeBytes[0] != tlsAlertByte {
  126. return false
  127. }
  128. version := int(sizeBytes[1])<<8 | int(sizeBytes[2])
  129. return version <= tls.VersionTLS13 && version >= tls.VersionTLS10
  130. }