leavegroup.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/leavegroup"
  9. )
  10. // LeaveGroupRequest is the request structure for the LeaveGroup function.
  11. type LeaveGroupRequest struct {
  12. // Address of the kafka broker to sent he request to.
  13. Addr net.Addr
  14. // GroupID of the group to leave.
  15. GroupID string
  16. // List of leaving member identities.
  17. Members []LeaveGroupRequestMember
  18. }
  19. // LeaveGroupRequestMember represents the indentify of a member leaving a group.
  20. type LeaveGroupRequestMember struct {
  21. // The member ID to remove from the group.
  22. ID string
  23. // The group instance ID to remove from the group.
  24. GroupInstanceID string
  25. }
  26. // LeaveGroupResponse is the response structure for the LeaveGroup function.
  27. type LeaveGroupResponse struct {
  28. // An error that may have occurred when attempting to leave the group.
  29. //
  30. // The errors contain the kafka error code. Programs may use the standard
  31. // errors.Is function to test the error against kafka error codes.
  32. Error error
  33. // The amount of time that the broker throttled the request.
  34. Throttle time.Duration
  35. // List of leaving member responses.
  36. Members []LeaveGroupResponseMember
  37. }
  38. // LeaveGroupResponseMember represents a member leaving the group.
  39. type LeaveGroupResponseMember struct {
  40. // The member ID of the member leaving the group.
  41. ID string
  42. // The group instance ID to remove from the group.
  43. GroupInstanceID string
  44. // An error that may have occured when attempting to remove the member from the group.
  45. //
  46. // The errors contain the kafka error code. Programs may use the standard
  47. // errors.Is function to test the error against kafka error codes.
  48. Error error
  49. }
  50. func (c *Client) LeaveGroup(ctx context.Context, req *LeaveGroupRequest) (*LeaveGroupResponse, error) {
  51. leaveGroup := leavegroup.Request{
  52. GroupID: req.GroupID,
  53. Members: make([]leavegroup.RequestMember, 0, len(req.Members)),
  54. }
  55. for _, member := range req.Members {
  56. leaveGroup.Members = append(leaveGroup.Members, leavegroup.RequestMember{
  57. MemberID: member.ID,
  58. GroupInstanceID: member.GroupInstanceID,
  59. })
  60. }
  61. m, err := c.roundTrip(ctx, req.Addr, &leaveGroup)
  62. if err != nil {
  63. return nil, fmt.Errorf("kafka.(*Client).LeaveGroup: %w", err)
  64. }
  65. r := m.(*leavegroup.Response)
  66. res := &LeaveGroupResponse{
  67. Error: makeError(r.ErrorCode, ""),
  68. Throttle: makeDuration(r.ThrottleTimeMS),
  69. }
  70. if len(r.Members) == 0 {
  71. // If we're using a version of the api without the
  72. // members array in the response, just add a member
  73. // so the api is consistent across versions.
  74. r.Members = []leavegroup.ResponseMember{
  75. {
  76. MemberID: req.Members[0].ID,
  77. GroupInstanceID: req.Members[0].GroupInstanceID,
  78. },
  79. }
  80. }
  81. res.Members = make([]LeaveGroupResponseMember, 0, len(r.Members))
  82. for _, member := range r.Members {
  83. res.Members = append(res.Members, LeaveGroupResponseMember{
  84. ID: member.MemberID,
  85. GroupInstanceID: member.GroupInstanceID,
  86. Error: makeError(member.ErrorCode, ""),
  87. })
  88. }
  89. return res, nil
  90. }
  91. type leaveGroupRequestV0 struct {
  92. // GroupID holds the unique group identifier
  93. GroupID string
  94. // MemberID assigned by the group coordinator or the zero string if joining
  95. // for the first time.
  96. MemberID string
  97. }
  98. func (t leaveGroupRequestV0) size() int32 {
  99. return sizeofString(t.GroupID) + sizeofString(t.MemberID)
  100. }
  101. func (t leaveGroupRequestV0) writeTo(wb *writeBuffer) {
  102. wb.writeString(t.GroupID)
  103. wb.writeString(t.MemberID)
  104. }
  105. type leaveGroupResponseV0 struct {
  106. // ErrorCode holds response error code
  107. ErrorCode int16
  108. }
  109. func (t leaveGroupResponseV0) size() int32 {
  110. return sizeofInt16(t.ErrorCode)
  111. }
  112. func (t leaveGroupResponseV0) writeTo(wb *writeBuffer) {
  113. wb.writeInt16(t.ErrorCode)
  114. }
  115. func (t *leaveGroupResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  116. remain, err = readInt16(r, size, &t.ErrorCode)
  117. return
  118. }