123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109 |
- package kafka
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "time"
- heartbeatAPI "github.com/segmentio/kafka-go/protocol/heartbeat"
- )
- // HeartbeatRequest represents a heartbeat sent to kafka to indicate consume liveness.
- type HeartbeatRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // GroupID is the ID of the group.
- GroupID string
- // GenerationID is the current generation for the group.
- GenerationID int32
- // MemberID is the ID of the group member.
- MemberID string
- // GroupInstanceID is a unique identifier for the consumer.
- GroupInstanceID string
- }
- // HeartbeatResponse represents a response from a heartbeat request.
- type HeartbeatResponse struct {
- // Error is set to non-nil if an error occurred.
- Error error
- // The amount of time that the broker throttled the request.
- //
- // This field will be zero if the kafka broker did not support the
- // Heartbeat API in version 1 or above.
- Throttle time.Duration
- }
- type heartbeatRequestV0 struct {
- // GroupID holds the unique group identifier
- GroupID string
- // GenerationID holds the generation of the group.
- GenerationID int32
- // MemberID assigned by the group coordinator
- MemberID string
- }
- // Heartbeat sends a heartbeat request to a kafka broker and returns the response.
- func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &heartbeatAPI.Request{
- GroupID: req.GroupID,
- GenerationID: req.GenerationID,
- MemberID: req.MemberID,
- GroupInstanceID: req.GroupInstanceID,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).Heartbeat: %w", err)
- }
- res := m.(*heartbeatAPI.Response)
- ret := &HeartbeatResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- }
- if res.ErrorCode != 0 {
- ret.Error = Error(res.ErrorCode)
- }
- return ret, nil
- }
- func (t heartbeatRequestV0) size() int32 {
- return sizeofString(t.GroupID) +
- sizeofInt32(t.GenerationID) +
- sizeofString(t.MemberID)
- }
- func (t heartbeatRequestV0) writeTo(wb *writeBuffer) {
- wb.writeString(t.GroupID)
- wb.writeInt32(t.GenerationID)
- wb.writeString(t.MemberID)
- }
- type heartbeatResponseV0 struct {
- // ErrorCode holds response error code
- ErrorCode int16
- }
- func (t heartbeatResponseV0) size() int32 {
- return sizeofInt16(t.ErrorCode)
- }
- func (t heartbeatResponseV0) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.ErrorCode)
- }
- func (t *heartbeatResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
- return
- }
- return
- }
|