listpartitionreassignments.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. "time"
  6. "github.com/segmentio/kafka-go/protocol/listpartitionreassignments"
  7. )
  8. // ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API.
  9. type ListPartitionReassignmentsRequest struct {
  10. // Address of the kafka broker to send the request to.
  11. Addr net.Addr
  12. // Topics we want reassignments for, mapped by their name, or nil to list everything.
  13. Topics map[string]ListPartitionReassignmentsRequestTopic
  14. // Timeout is the amount of time to wait for the request to complete.
  15. Timeout time.Duration
  16. }
  17. // ListPartitionReassignmentsRequestTopic contains the requested partitions for a single
  18. // topic.
  19. type ListPartitionReassignmentsRequestTopic struct {
  20. // The partitions to list partition reassignments for.
  21. PartitionIndexes []int
  22. }
  23. // ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API.
  24. type ListPartitionReassignmentsResponse struct {
  25. // Error is set to a non-nil value including the code and message if a top-level
  26. // error was encountered.
  27. Error error
  28. // Topics contains results for each topic, mapped by their name.
  29. Topics map[string]ListPartitionReassignmentsResponseTopic
  30. }
  31. // ListPartitionReassignmentsResponseTopic contains the detailed result of
  32. // ongoing reassignments for a topic.
  33. type ListPartitionReassignmentsResponseTopic struct {
  34. // Partitions contains result for topic partitions.
  35. Partitions []ListPartitionReassignmentsResponsePartition
  36. }
  37. // ListPartitionReassignmentsResponsePartition contains the detailed result of
  38. // ongoing reassignments for a single partition.
  39. type ListPartitionReassignmentsResponsePartition struct {
  40. // PartitionIndex contains index of the partition.
  41. PartitionIndex int
  42. // Replicas contains the current replica set.
  43. Replicas []int
  44. // AddingReplicas contains the set of replicas we are currently adding.
  45. AddingReplicas []int
  46. // RemovingReplicas contains the set of replicas we are currently removing.
  47. RemovingReplicas []int
  48. }
  49. func (c *Client) ListPartitionReassignments(
  50. ctx context.Context,
  51. req *ListPartitionReassignmentsRequest,
  52. ) (*ListPartitionReassignmentsResponse, error) {
  53. apiReq := &listpartitionreassignments.Request{
  54. TimeoutMs: int32(req.Timeout.Milliseconds()),
  55. }
  56. for topicName, topicReq := range req.Topics {
  57. apiReq.Topics = append(
  58. apiReq.Topics,
  59. listpartitionreassignments.RequestTopic{
  60. Name: topicName,
  61. PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes),
  62. },
  63. )
  64. }
  65. protoResp, err := c.roundTrip(
  66. ctx,
  67. req.Addr,
  68. apiReq,
  69. )
  70. if err != nil {
  71. return nil, err
  72. }
  73. apiResp := protoResp.(*listpartitionreassignments.Response)
  74. resp := &ListPartitionReassignmentsResponse{
  75. Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
  76. Topics: make(map[string]ListPartitionReassignmentsResponseTopic),
  77. }
  78. for _, topicResult := range apiResp.Topics {
  79. respTopic := ListPartitionReassignmentsResponseTopic{}
  80. for _, partitionResult := range topicResult.Partitions {
  81. respTopic.Partitions = append(
  82. respTopic.Partitions,
  83. ListPartitionReassignmentsResponsePartition{
  84. PartitionIndex: int(partitionResult.PartitionIndex),
  85. Replicas: int32ToIntArray(partitionResult.Replicas),
  86. AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas),
  87. RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas),
  88. },
  89. )
  90. }
  91. resp.Topics[topicResult.Name] = respTopic
  92. }
  93. return resp, nil
  94. }
  95. func intToInt32Array(arr []int) []int32 {
  96. if arr == nil {
  97. return nil
  98. }
  99. res := make([]int32, len(arr))
  100. for i := range arr {
  101. res[i] = int32(arr[i])
  102. }
  103. return res
  104. }
  105. func int32ToIntArray(arr []int32) []int {
  106. if arr == nil {
  107. return nil
  108. }
  109. res := make([]int, len(arr))
  110. for i := range arr {
  111. res[i] = int(arr[i])
  112. }
  113. return res
  114. }