123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package kafka
- import (
- "context"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/alterpartitionreassignments"
- )
- // AlterPartitionReassignmentsRequest is a request to the AlterPartitionReassignments API.
- type AlterPartitionReassignmentsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // Topic is the name of the topic to alter partitions in. Keep this field empty and use Topic in AlterPartitionReassignmentsRequestAssignment to
- // reassign to multiple topics.
- Topic string
- // Assignments is the list of partition reassignments to submit to the API.
- Assignments []AlterPartitionReassignmentsRequestAssignment
- // Timeout is the amount of time to wait for the request to complete.
- Timeout time.Duration
- }
- // AlterPartitionReassignmentsRequestAssignment contains the requested reassignments for a single
- // partition.
- type AlterPartitionReassignmentsRequestAssignment struct {
- // Topic is the name of the topic to alter partitions in. If empty, the value of Topic in AlterPartitionReassignmentsRequest is used.
- Topic string
- // PartitionID is the ID of the partition to make the reassignments in.
- PartitionID int
- // BrokerIDs is a slice of brokers to set the partition replicas to, or null to cancel a pending reassignment for this partition.
- BrokerIDs []int
- }
- // AlterPartitionReassignmentsResponse is a response from the AlterPartitionReassignments API.
- type AlterPartitionReassignmentsResponse struct {
- // Error is set to a non-nil value including the code and message if a top-level
- // error was encountered when doing the update.
- Error error
- // PartitionResults contains the specific results for each partition.
- PartitionResults []AlterPartitionReassignmentsResponsePartitionResult
- }
- // AlterPartitionReassignmentsResponsePartitionResult contains the detailed result of
- // doing reassignments for a single partition.
- type AlterPartitionReassignmentsResponsePartitionResult struct {
- // Topic is the topic name.
- Topic string
- // PartitionID is the ID of the partition that was altered.
- PartitionID int
- // Error is set to a non-nil value including the code and message if an error was encountered
- // during the update for this partition.
- Error error
- }
- func (c *Client) AlterPartitionReassignments(
- ctx context.Context,
- req *AlterPartitionReassignmentsRequest,
- ) (*AlterPartitionReassignmentsResponse, error) {
- apiTopicMap := make(map[string]*alterpartitionreassignments.RequestTopic)
- for _, assignment := range req.Assignments {
- topic := assignment.Topic
- if topic == "" {
- topic = req.Topic
- }
- apiTopic := apiTopicMap[topic]
- if apiTopic == nil {
- apiTopic = &alterpartitionreassignments.RequestTopic{
- Name: topic,
- }
- apiTopicMap[topic] = apiTopic
- }
- replicas := []int32{}
- for _, brokerID := range assignment.BrokerIDs {
- replicas = append(replicas, int32(brokerID))
- }
- apiTopic.Partitions = append(
- apiTopic.Partitions,
- alterpartitionreassignments.RequestPartition{
- PartitionIndex: int32(assignment.PartitionID),
- Replicas: replicas,
- },
- )
- }
- apiReq := &alterpartitionreassignments.Request{
- TimeoutMs: int32(req.Timeout.Milliseconds()),
- }
- for _, apiTopic := range apiTopicMap {
- apiReq.Topics = append(apiReq.Topics, *apiTopic)
- }
- protoResp, err := c.roundTrip(
- ctx,
- req.Addr,
- apiReq,
- )
- if err != nil {
- return nil, err
- }
- apiResp := protoResp.(*alterpartitionreassignments.Response)
- resp := &AlterPartitionReassignmentsResponse{
- Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage),
- }
- for _, topicResult := range apiResp.Results {
- for _, partitionResult := range topicResult.Partitions {
- resp.PartitionResults = append(
- resp.PartitionResults,
- AlterPartitionReassignmentsResponsePartitionResult{
- Topic: topicResult.Name,
- PartitionID: int(partitionResult.PartitionIndex),
- Error: makeError(partitionResult.ErrorCode, partitionResult.ErrorMessage),
- },
- )
- }
- }
- return resp, nil
- }
|