saslauthenticate.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. package saslauthenticate
  2. import (
  3. "encoding/binary"
  4. "io"
  5. "github.com/segmentio/kafka-go/protocol"
  6. )
  7. func init() {
  8. protocol.Register(&Request{}, &Response{})
  9. }
  10. type Request struct {
  11. AuthBytes []byte `kafka:"min=v0,max=v1"`
  12. }
  13. func (r *Request) RawExchange(rw io.ReadWriter) (protocol.Message, error) {
  14. if err := r.writeTo(rw); err != nil {
  15. return nil, err
  16. }
  17. return r.readResp(rw)
  18. }
  19. func (*Request) Required(versions map[protocol.ApiKey]int16) bool {
  20. const v0 = 0
  21. return versions[protocol.SaslHandshake] == v0
  22. }
  23. func (r *Request) writeTo(w io.Writer) error {
  24. size := len(r.AuthBytes) + 4
  25. buf := make([]byte, size)
  26. binary.BigEndian.PutUint32(buf[:4], uint32(len(r.AuthBytes)))
  27. copy(buf[4:], r.AuthBytes)
  28. _, err := w.Write(buf)
  29. return err
  30. }
  31. func (r *Request) readResp(read io.Reader) (protocol.Message, error) {
  32. var lenBuf [4]byte
  33. if _, err := io.ReadFull(read, lenBuf[:]); err != nil {
  34. return nil, err
  35. }
  36. respLen := int32(binary.BigEndian.Uint32(lenBuf[:]))
  37. data := make([]byte, respLen)
  38. if _, err := io.ReadFull(read, data[:]); err != nil {
  39. return nil, err
  40. }
  41. return &Response{
  42. AuthBytes: data,
  43. }, nil
  44. }
  45. func (r *Request) ApiKey() protocol.ApiKey { return protocol.SaslAuthenticate }
  46. type Response struct {
  47. ErrorCode int16 `kafka:"min=v0,max=v1"`
  48. ErrorMessage string `kafka:"min=v0,max=v1,nullable"`
  49. AuthBytes []byte `kafka:"min=v0,max=v1"`
  50. SessionLifetimeMs int64 `kafka:"min=v1,max=v1"`
  51. }
  52. func (r *Response) ApiKey() protocol.ApiKey { return protocol.SaslAuthenticate }
  53. var _ protocol.RawExchanger = (*Request)(nil)