123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157 |
- package protocol
- import (
- "crypto/tls"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- )
- func ReadResponse(r io.Reader, apiKey ApiKey, apiVersion int16) (correlationID int32, msg Message, err error) {
- if i := int(apiKey); i < 0 || i >= len(apiTypes) {
- err = fmt.Errorf("unsupported api key: %d", i)
- return
- }
- t := &apiTypes[apiKey]
- if t == nil {
- err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
- return
- }
- minVersion := t.minVersion()
- maxVersion := t.maxVersion()
- if apiVersion < minVersion || apiVersion > maxVersion {
- err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
- return
- }
- d := &decoder{reader: r, remain: 4}
- size := d.readInt32()
- if err = d.err; err != nil {
- err = dontExpectEOF(err)
- return
- }
- d.remain = int(size)
- correlationID = d.readInt32()
- if err = d.err; err != nil {
- if errors.Is(err, io.ErrUnexpectedEOF) {
- // If a Writer/Reader is configured without TLS and connects
- // to a broker expecting TLS the only message we return to the
- // caller is io.ErrUnexpetedEOF which is opaque. This section
- // tries to determine if that's what has happened.
- // We first deconstruct the initial 4 bytes of the message
- // from the size which was read earlier.
- // Next, we examine those bytes to see if they looks like a TLS
- // error message. If they do we wrap the io.ErrUnexpectedEOF
- // with some context.
- if looksLikeUnexpectedTLS(size) {
- err = fmt.Errorf("%w: broker appears to be expecting TLS", io.ErrUnexpectedEOF)
- }
- return
- }
- err = dontExpectEOF(err)
- return
- }
- res := &t.responses[apiVersion-minVersion]
- if res.flexible {
- // In the flexible case, there's a tag buffer at the end of the response header
- taggedCount := int(d.readUnsignedVarInt())
- for i := 0; i < taggedCount; i++ {
- d.readUnsignedVarInt() // tagID
- size := d.readUnsignedVarInt()
- // Just throw away the values for now
- d.read(int(size))
- }
- }
- msg = res.new()
- res.decode(d, valueOf(msg))
- d.discardAll()
- if err = d.err; err != nil {
- err = dontExpectEOF(err)
- }
- return
- }
- func WriteResponse(w io.Writer, apiVersion int16, correlationID int32, msg Message) error {
- apiKey := msg.ApiKey()
- if i := int(apiKey); i < 0 || i >= len(apiTypes) {
- return fmt.Errorf("unsupported api key: %d", i)
- }
- t := &apiTypes[apiKey]
- if t == nil {
- return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
- }
- if typedMessage, ok := msg.(OverrideTypeMessage); ok {
- typeKey := typedMessage.TypeKey()
- overrideType := overrideApiTypes[apiKey][typeKey]
- t = &overrideType
- }
- minVersion := t.minVersion()
- maxVersion := t.maxVersion()
- if apiVersion < minVersion || apiVersion > maxVersion {
- return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
- }
- r := &t.responses[apiVersion-minVersion]
- v := valueOf(msg)
- b := newPageBuffer()
- defer b.unref()
- e := &encoder{writer: b}
- e.writeInt32(0) // placeholder for the response size
- e.writeInt32(correlationID)
- if r.flexible {
- // Flexible messages use extra space for a tag buffer,
- // which begins with a size value. Since we're not writing any fields into the
- // latter, we can just write zero for now.
- //
- // See
- // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
- // for details.
- e.writeUnsignedVarInt(0)
- }
- r.encode(e, v)
- err := e.err
- if err == nil {
- size := packUint32(uint32(b.Size()) - 4)
- b.WriteAt(size[:], 0)
- _, err = b.WriteTo(w)
- }
- return err
- }
- const (
- tlsAlertByte byte = 0x15
- )
- // looksLikeUnexpectedTLS returns true if the size passed in resemble
- // the TLS alert message that is returned to a client which sends
- // an invalid ClientHello message.
- func looksLikeUnexpectedTLS(size int32) bool {
- var sizeBytes [4]byte
- binary.BigEndian.PutUint32(sizeBytes[:], uint32(size))
- if sizeBytes[0] != tlsAlertByte {
- return false
- }
- version := int(sizeBytes[1])<<8 | int(sizeBytes[2])
- return version <= tls.VersionTLS13 && version >= tls.VersionTLS10
- }
|