resolver.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. package kafka
  2. import (
  3. "context"
  4. "net"
  5. )
  6. // The Resolver interface is used as an abstraction to provide service discovery
  7. // of the hosts of a kafka cluster.
  8. type Resolver interface {
  9. // LookupHost looks up the given host using the local resolver.
  10. // It returns a slice of that host's addresses.
  11. LookupHost(ctx context.Context, host string) (addrs []string, err error)
  12. }
  13. // BrokerResolver is an interface implemented by types that translate host
  14. // names into a network address.
  15. //
  16. // This resolver is not intended to be a general purpose interface. Instead,
  17. // it is tailored to the particular needs of the kafka protocol, with the goal
  18. // being to provide a flexible mechanism for extending broker name resolution
  19. // while retaining context that is specific to interacting with a kafka cluster.
  20. //
  21. // Resolvers must be safe to use from multiple goroutines.
  22. type BrokerResolver interface {
  23. // Returns the IP addresses of the broker passed as argument.
  24. LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error)
  25. }
  26. // NewBrokerResolver constructs a Resolver from r.
  27. //
  28. // If r is nil, net.DefaultResolver is used instead.
  29. func NewBrokerResolver(r *net.Resolver) BrokerResolver {
  30. return brokerResolver{r}
  31. }
  32. type brokerResolver struct {
  33. *net.Resolver
  34. }
  35. func (r brokerResolver) LookupBrokerIPAddr(ctx context.Context, broker Broker) ([]net.IPAddr, error) {
  36. ipAddrs, err := r.LookupIPAddr(ctx, broker.Host)
  37. if err != nil {
  38. return nil, err
  39. }
  40. if len(ipAddrs) == 0 {
  41. return nil, &net.DNSError{
  42. Err: "no addresses were returned by the resolver",
  43. Name: broker.Host,
  44. IsTemporary: true,
  45. IsNotFound: true,
  46. }
  47. }
  48. return ipAddrs, nil
  49. }