123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package kafka
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/findcoordinator"
- )
- // CoordinatorKeyType is used to specify the type of coordinator to look for
- type CoordinatorKeyType int8
- const (
- // CoordinatorKeyTypeConsumer type is used when looking for a Group coordinator
- CoordinatorKeyTypeConsumer CoordinatorKeyType = 0
- // CoordinatorKeyTypeTransaction type is used when looking for a Transaction coordinator
- CoordinatorKeyTypeTransaction CoordinatorKeyType = 1
- )
- // FindCoordinatorRequest is the request structure for the FindCoordinator function
- type FindCoordinatorRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // The coordinator key.
- Key string
- // The coordinator key type. (Group, transaction, etc.)
- KeyType CoordinatorKeyType
- }
- // FindCoordinatorResponseCoordinator contains details about the found coordinator
- type FindCoordinatorResponseCoordinator struct {
- // NodeID holds the broker id.
- NodeID int
- // Host of the broker
- Host string
- // Port on which broker accepts requests
- Port int
- }
- // FindCoordinatorResponse is the response structure for the FindCoordinator function
- type FindCoordinatorResponse struct {
- // The Transaction/Group Coordinator details
- Coordinator *FindCoordinatorResponseCoordinator
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // An error that may have occurred while attempting to retrieve Coordinator
- //
- // The error contains both the kafka error code, and an error message
- // returned by the kafka broker.
- Error error
- }
- // FindCoordinator sends a findCoordinator request to a kafka broker and returns the
- // response.
- func (c *Client) FindCoordinator(ctx context.Context, req *FindCoordinatorRequest) (*FindCoordinatorResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &findcoordinator.Request{
- Key: req.Key,
- KeyType: int8(req.KeyType),
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).FindCoordinator: %w", err)
- }
- res := m.(*findcoordinator.Response)
- coordinator := &FindCoordinatorResponseCoordinator{
- NodeID: int(res.NodeID),
- Host: res.Host,
- Port: int(res.Port),
- }
- ret := &FindCoordinatorResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Error: makeError(res.ErrorCode, res.ErrorMessage),
- Coordinator: coordinator,
- }
- return ret, nil
- }
- // FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
- type findCoordinatorRequestV0 struct {
- // CoordinatorKey holds id to use for finding the coordinator (for groups, this is
- // the groupId, for transactional producers, this is the transactional id)
- CoordinatorKey string
- }
- func (t findCoordinatorRequestV0) size() int32 {
- return sizeofString(t.CoordinatorKey)
- }
- func (t findCoordinatorRequestV0) writeTo(wb *writeBuffer) {
- wb.writeString(t.CoordinatorKey)
- }
- type findCoordinatorResponseCoordinatorV0 struct {
- // NodeID holds the broker id.
- NodeID int32
- // Host of the broker
- Host string
- // Port on which broker accepts requests
- Port int32
- }
- func (t findCoordinatorResponseCoordinatorV0) size() int32 {
- return sizeofInt32(t.NodeID) +
- sizeofString(t.Host) +
- sizeofInt32(t.Port)
- }
- func (t findCoordinatorResponseCoordinatorV0) writeTo(wb *writeBuffer) {
- wb.writeInt32(t.NodeID)
- wb.writeString(t.Host)
- wb.writeInt32(t.Port)
- }
- func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readInt32(r, size, &t.NodeID); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.Host); err != nil {
- return
- }
- if remain, err = readInt32(r, remain, &t.Port); err != nil {
- return
- }
- return
- }
- type findCoordinatorResponseV0 struct {
- // ErrorCode holds response error code
- ErrorCode int16
- // Coordinator holds host and port information for the coordinator
- Coordinator findCoordinatorResponseCoordinatorV0
- }
- func (t findCoordinatorResponseV0) size() int32 {
- return sizeofInt16(t.ErrorCode) +
- t.Coordinator.size()
- }
- func (t findCoordinatorResponseV0) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.ErrorCode)
- t.Coordinator.writeTo(wb)
- }
- func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
- return
- }
- if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil {
- return
- }
- return
- }
|