heartbeat.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. heartbeatAPI "github.com/segmentio/kafka-go/protocol/heartbeat"
  9. )
  10. // HeartbeatRequest represents a heartbeat sent to kafka to indicate consume liveness.
  11. type HeartbeatRequest struct {
  12. // Address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // GroupID is the ID of the group.
  15. GroupID string
  16. // GenerationID is the current generation for the group.
  17. GenerationID int32
  18. // MemberID is the ID of the group member.
  19. MemberID string
  20. // GroupInstanceID is a unique identifier for the consumer.
  21. GroupInstanceID string
  22. }
  23. // HeartbeatResponse represents a response from a heartbeat request.
  24. type HeartbeatResponse struct {
  25. // Error is set to non-nil if an error occurred.
  26. Error error
  27. // The amount of time that the broker throttled the request.
  28. //
  29. // This field will be zero if the kafka broker did not support the
  30. // Heartbeat API in version 1 or above.
  31. Throttle time.Duration
  32. }
  33. type heartbeatRequestV0 struct {
  34. // GroupID holds the unique group identifier
  35. GroupID string
  36. // GenerationID holds the generation of the group.
  37. GenerationID int32
  38. // MemberID assigned by the group coordinator
  39. MemberID string
  40. }
  41. // Heartbeat sends a heartbeat request to a kafka broker and returns the response.
  42. func (c *Client) Heartbeat(ctx context.Context, req *HeartbeatRequest) (*HeartbeatResponse, error) {
  43. m, err := c.roundTrip(ctx, req.Addr, &heartbeatAPI.Request{
  44. GroupID: req.GroupID,
  45. GenerationID: req.GenerationID,
  46. MemberID: req.MemberID,
  47. GroupInstanceID: req.GroupInstanceID,
  48. })
  49. if err != nil {
  50. return nil, fmt.Errorf("kafka.(*Client).Heartbeat: %w", err)
  51. }
  52. res := m.(*heartbeatAPI.Response)
  53. ret := &HeartbeatResponse{
  54. Throttle: makeDuration(res.ThrottleTimeMs),
  55. }
  56. if res.ErrorCode != 0 {
  57. ret.Error = Error(res.ErrorCode)
  58. }
  59. return ret, nil
  60. }
  61. func (t heartbeatRequestV0) size() int32 {
  62. return sizeofString(t.GroupID) +
  63. sizeofInt32(t.GenerationID) +
  64. sizeofString(t.MemberID)
  65. }
  66. func (t heartbeatRequestV0) writeTo(wb *writeBuffer) {
  67. wb.writeString(t.GroupID)
  68. wb.writeInt32(t.GenerationID)
  69. wb.writeString(t.MemberID)
  70. }
  71. type heartbeatResponseV0 struct {
  72. // ErrorCode holds response error code
  73. ErrorCode int16
  74. }
  75. func (t heartbeatResponseV0) size() int32 {
  76. return sizeofInt16(t.ErrorCode)
  77. }
  78. func (t heartbeatResponseV0) writeTo(wb *writeBuffer) {
  79. wb.writeInt16(t.ErrorCode)
  80. }
  81. func (t *heartbeatResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  82. if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
  83. return
  84. }
  85. return
  86. }