alterpartitionreassignments.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
  7. )
  8. // AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
  9. type AlterPartitionReassignmentsRequest struct {
  10. // Address of the kafka broker to send the request to.
  11. Addr net.Addr
  12. // Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
  13. // reassign to multiple topics.
  14. Topic string
  15. // Assignments is the list of partition reassignments to submit to the API.
  16. Assignments []AlterPartitionReassignmentsRequestAssignment
  17. // Timeout is the amount of time to wait for the request to complete.
  18. Timeout time.Duration
  19. }
  20. // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
  21. // partition.
  22. type AlterPartitionReassignmentsRequestAssignment struct {
  23. // Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
  24. Topic string
  25. // PartitionID is the ID of the partition to make the reassignments in.
  26. PartitionID int
  27. // BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
  28. BrokerIDs []int
  29. }
  30. // AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
  31. type AlterPartitionReassignmentsResponse struct {
  32. // Error is set to a non-nil value including the code and message if a top-level
  33. // error was encountered when doing the update.
  34. Error error
  35. // PartitionResults contains the specific results for each partition.
  36. PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
  37. }
  38. // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
  39. // doing reassignments for a single partition.
  40. type AlterPartitionReassignmentsResponsePartitionResult struct {
  41. // Topic is the topic name.
  42. Topic string
  43. // PartitionID is the ID of the partition that was altered.
  44. PartitionID int
  45. // Error is set to a non-nil value including the code and message if an error was encountered
  46. // during the update for this partition.
  47. Error error
  48. }
  49. func (c *Client) AlterPartitionReassignments(
  50. ctx context.Context,
  51. req *AlterPartitionReassignmentsRequest,
  52. ) (*AlterPartitionReassignmentsResponse, error) {
  53. apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
  54. for _, assignment := range req.Assignments {
  55. topic := assignment.Topic
  56. if topic == "" {
  57. topic = req.Topic
  58. }
  59. apiTopic := apiTopicMap[topic]
  60. if apiTopic == nil {
  61. apiTopic = &alterpartitionreassignments.RequestTopic{
  62. Name: topic,
  63. }
  64. apiTopicMap[topic] = apiTopic
  65. }
  66. replicas := []int32{}
  67. for _, brokerID := range assignment.BrokerIDs {
  68. replicas = append(replicas, int32(brokerID))
  69. }
  70. apiTopic.Partitions = append(
  71. apiTopic.Partitions,
  72. alterpartitionreassignments.RequestPartition{
  73. PartitionIndex: int32(assignment.PartitionID),
  74. Replicas: replicas,
  75. },
  76. )
  77. }
  78. apiReq := &alterpartitionreassignments.Request{
  79. TimeoutMs: int32(req.Timeout.Milliseconds()),
  80. }
  81. for _, apiTopic := range apiTopicMap {
  82. apiReq.Topics = append(apiReq.Topics, *apiTopic)
  83. }
  84. protoResp, err := c.roundTrip(
  85. ctx,
  86. req.Addr,
  87. apiReq,
  88. )
  89. if err != nil {
  90. return nil, err
  91. }
  92. apiResp := protoResp.(*alterpartitionreassignments.Response)
  93. resp := &AlterPartitionReassignmentsResponse{
  94. Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
  95. }
  96. for _, topicResult := range apiResp.Results {
  97. for _, partitionResult := range topicResult.Partitions {
  98. resp.PartitionResults = append(
  99. resp.PartitionResults,
  100. AlterPartitionReassignmentsResponsePartitionResult{
  101. Topic: topicResult.Name,
  102. PartitionID: int(partitionResult.PartitionIndex),
  103. Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
  104. },
  105. )
  106. }
  107. }
  108. return resp, nil
  109. }