findcoordinator.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/findcoordinator"
  9. )
  10. // CoordinatorKeyType is used to specify the type of coordinator to look for
  11. type CoordinatorKeyType int8
  12. const (
  13. // CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator
  14. CoordinatorKeyTypeConsumer CoordinatorKeyType = 0
  15. // CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator
  16. CoordinatorKeyTypeTransaction CoordinatorKeyType = 1
  17. )
  18. // FindCoordinatorRequest is the request structure for the FindCoordinator function
  19. type FindCoordinatorRequest struct {
  20. // Address of the kafka broker to send the request to.
  21. Addr net.Addr
  22. // The coordinator key.
  23. Key string
  24. // The coordinator key type. (Group, transaction, etc.)
  25. KeyType CoordinatorKeyType
  26. }
  27. // FindCoordinatorResponseCoordinator contains details about the found coordinator
  28. type FindCoordinatorResponseCoordinator struct {
  29. // NodeID holds the broker id.
  30. NodeID int
  31. // Host of the broker
  32. Host string
  33. // Port on which broker accepts requests
  34. Port int
  35. }
  36. // FindCoordinatorResponse is the response structure for the FindCoordinator function
  37. type FindCoordinatorResponse struct {
  38. // The Transaction/Group Coordinator details
  39. Coordinator *FindCoordinatorResponseCoordinator
  40. // The amount of time that the broker throttled the request.
  41. Throttle time.Duration
  42. // An error that may have occurred while attempting to retrieve Coordinator
  43. //
  44. // The error contains both the kafka error code, and an error message
  45. // returned by the kafka broker.
  46. Error error
  47. }
  48. // FindCoordinator sends a findCoordinator request to a kafka broker and returns the
  49. // response.
  50. func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
  51. m, err := c.roundTrip(ctx, req.Addr, &findcoordinator.Request{
  52. Key: req.Key,
  53. KeyType: int8(req.KeyType),
  54. })
  55. if err != nil {
  56. return nil, fmt.Errorf("kafka.(*Client).FindCoordinator: %w", err)
  57. }
  58. res := m.(*findcoordinator.Response)
  59. coordinator := &FindCoordinatorResponseCoordinator{
  60. NodeID: int(res.NodeID),
  61. Host: res.Host,
  62. Port: int(res.Port),
  63. }
  64. ret := &FindCoordinatorResponse{
  65. Throttle: makeDuration(res.ThrottleTimeMs),
  66. Error: makeError(res.ErrorCode, res.ErrorMessage),
  67. Coordinator: coordinator,
  68. }
  69. return ret, nil
  70. }
  71. // FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction
  72. //
  73. // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
  74. type findCoordinatorRequestV0 struct {
  75. // CoordinatorKey holds id to use for finding the coordinator (for groups, this is
  76. // the groupId, for transactional producers, this is the transactional id)
  77. CoordinatorKey string
  78. }
  79. func (t findCoordinatorRequestV0) size() int32 {
  80. return sizeofString(t.CoordinatorKey)
  81. }
  82. func (t findCoordinatorRequestV0) writeTo(wb *writeBuffer) {
  83. wb.writeString(t.CoordinatorKey)
  84. }
  85. type findCoordinatorResponseCoordinatorV0 struct {
  86. // NodeID holds the broker id.
  87. NodeID int32
  88. // Host of the broker
  89. Host string
  90. // Port on which broker accepts requests
  91. Port int32
  92. }
  93. func (t findCoordinatorResponseCoordinatorV0) size() int32 {
  94. return sizeofInt32(t.NodeID) +
  95. sizeofString(t.Host) +
  96. sizeofInt32(t.Port)
  97. }
  98. func (t findCoordinatorResponseCoordinatorV0) writeTo(wb *writeBuffer) {
  99. wb.writeInt32(t.NodeID)
  100. wb.writeString(t.Host)
  101. wb.writeInt32(t.Port)
  102. }
  103. func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  104. if remain, err = readInt32(r, size, &t.NodeID); err != nil {
  105. return
  106. }
  107. if remain, err = readString(r, remain, &t.Host); err != nil {
  108. return
  109. }
  110. if remain, err = readInt32(r, remain, &t.Port); err != nil {
  111. return
  112. }
  113. return
  114. }
  115. type findCoordinatorResponseV0 struct {
  116. // ErrorCode holds response error code
  117. ErrorCode int16
  118. // Coordinator holds host and port information for the coordinator
  119. Coordinator findCoordinatorResponseCoordinatorV0
  120. }
  121. func (t findCoordinatorResponseV0) size() int32 {
  122. return sizeofInt16(t.ErrorCode) +
  123. t.Coordinator.size()
  124. }
  125. func (t findCoordinatorResponseV0) writeTo(wb *writeBuffer) {
  126. wb.writeInt16(t.ErrorCode)
  127. t.Coordinator.writeTo(wb)
  128. }
  129. func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  130. if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
  131. return
  132. }
  133. if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil {
  134. return
  135. }
  136. return
  137. }