conn.go 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602
  1. package kafka
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. var (
  17. errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
  18. errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
  19. )
  20. // Conn represents a connection to a kafka broker.
  21. //
  22. // Instances of Conn are safe to use concurrently from multiple goroutines.
  23. type Conn struct {
  24. // base network connection
  25. conn net.Conn
  26. // number of inflight requests on the connection.
  27. inflight int32
  28. // offset management (synchronized on the mutex field)
  29. mutex sync.Mutex
  30. offset int64
  31. // read buffer (synchronized on rlock)
  32. rlock sync.Mutex
  33. rbuf bufio.Reader
  34. // write buffer (synchronized on wlock)
  35. wlock sync.Mutex
  36. wbuf bufio.Writer
  37. wb writeBuffer
  38. // deadline management
  39. wdeadline connDeadline
  40. rdeadline connDeadline
  41. // immutable values of the connection object
  42. clientID string
  43. topic string
  44. partition int32
  45. fetchMaxBytes int32
  46. fetchMinSize int32
  47. // correlation ID generator (synchronized on wlock)
  48. correlationID int32
  49. // number of replica acks required when publishing to a partition
  50. requiredAcks int32
  51. // lazily loaded API versions used by this connection
  52. apiVersions atomic.Value // apiVersionMap
  53. transactionalID *string
  54. }
  55. type apiVersionMap map[apiKey]ApiVersion
  56. func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
  57. x := v[key]
  58. for i := len(sortedSupportedVersions) - 1; i >= 0; i-- {
  59. s := sortedSupportedVersions[i]
  60. if apiVersion(x.MaxVersion) >= s {
  61. return s
  62. }
  63. }
  64. return -1
  65. }
  66. // ConnConfig is a configuration object used to create new instances of Conn.
  67. type ConnConfig struct {
  68. ClientID string
  69. Topic string
  70. Partition int
  71. // The transactional id to use for transactional delivery. Idempotent
  72. // deliver should be enabled if transactional id is configured.
  73. // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
  74. // Empty string means that this connection can't be transactional.
  75. TransactionalID string
  76. }
  77. // ReadBatchConfig is a configuration object used for reading batches of messages.
  78. type ReadBatchConfig struct {
  79. // MinBytes indicates to the broker the minimum batch size that the consumer
  80. // will accept. Setting a high minimum when consuming from a low-volume topic
  81. // may result in delayed delivery when the broker does not have enough data to
  82. // satisfy the defined minimum.
  83. MinBytes int
  84. // MaxBytes indicates to the broker the maximum batch size that the consumer
  85. // will accept. The broker will truncate a message to satisfy this maximum, so
  86. // choose a value that is high enough for your largest message size.
  87. MaxBytes int
  88. // IsolationLevel controls the visibility of transactional records.
  89. // ReadUncommitted makes all records visible. With ReadCommitted only
  90. // non-transactional and committed records are visible.
  91. IsolationLevel IsolationLevel
  92. // MaxWait is the amount of time for the broker while waiting to hit the
  93. // min/max byte targets. This setting is independent of any network-level
  94. // timeouts or deadlines.
  95. //
  96. // For backward compatibility, when this field is left zero, kafka-go will
  97. // infer the max wait from the connection's read deadline.
  98. MaxWait time.Duration
  99. }
  100. type IsolationLevel int8
  101. const (
  102. ReadUncommitted IsolationLevel = 0
  103. ReadCommitted IsolationLevel = 1
  104. )
  105. var (
  106. // DefaultClientID is the default value used as ClientID of kafka
  107. // connections.
  108. DefaultClientID string
  109. )
  110. func init() {
  111. progname := filepath.Base(os.Args[0])
  112. hostname, _ := os.Hostname()
  113. DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
  114. }
  115. // NewConn returns a new kafka connection for the given topic and partition.
  116. func NewConn(conn net.Conn, topic string, partition int) *Conn {
  117. return NewConnWith(conn, ConnConfig{
  118. Topic: topic,
  119. Partition: partition,
  120. })
  121. }
  122. func emptyToNullable(transactionalID string) (result *string) {
  123. if transactionalID != "" {
  124. result = &transactionalID
  125. }
  126. return result
  127. }
  128. // NewConnWith returns a new kafka connection configured with config.
  129. // The offset is initialized to FirstOffset.
  130. func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
  131. if len(config.ClientID) == 0 {
  132. config.ClientID = DefaultClientID
  133. }
  134. if config.Partition < 0 || config.Partition > math.MaxInt32 {
  135. panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
  136. }
  137. c := &Conn{
  138. conn: conn,
  139. rbuf: *bufio.NewReader(conn),
  140. wbuf: *bufio.NewWriter(conn),
  141. clientID: config.ClientID,
  142. topic: config.Topic,
  143. partition: int32(config.Partition),
  144. offset: FirstOffset,
  145. requiredAcks: -1,
  146. transactionalID: emptyToNullable(config.TransactionalID),
  147. }
  148. c.wb.w = &c.wbuf
  149. // The fetch request needs to ask for a MaxBytes value that is at least
  150. // enough to load the control data of the response. To avoid having to
  151. // recompute it on every read, it is cached here in the Conn value.
  152. c.fetchMinSize = (fetchResponseV2{
  153. Topics: []fetchResponseTopicV2{{
  154. TopicName: config.Topic,
  155. Partitions: []fetchResponsePartitionV2{{
  156. Partition: int32(config.Partition),
  157. MessageSet: messageSet{{}},
  158. }},
  159. }},
  160. }).size()
  161. c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
  162. return c
  163. }
  164. func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
  165. v, err := c.loadVersions()
  166. if err != nil {
  167. return -1, err
  168. }
  169. a := v.negotiate(key, sortedSupportedVersions...)
  170. if a < 0 {
  171. return -1, fmt.Errorf("no matching versions were found between the client and the broker for API key %d", key)
  172. }
  173. return a, nil
  174. }
  175. func (c *Conn) loadVersions() (apiVersionMap, error) {
  176. v, _ := c.apiVersions.Load().(apiVersionMap)
  177. if v != nil {
  178. return v, nil
  179. }
  180. brokerVersions, err := c.ApiVersions()
  181. if err != nil {
  182. return nil, err
  183. }
  184. v = make(apiVersionMap, len(brokerVersions))
  185. for _, a := range brokerVersions {
  186. v[apiKey(a.ApiKey)] = a
  187. }
  188. c.apiVersions.Store(v)
  189. return v, nil
  190. }
  191. // Controller requests kafka for the current controller and returns its URL
  192. func (c *Conn) Controller() (broker Broker, err error) {
  193. err = c.readOperation(
  194. func(deadline time.Time, id int32) error {
  195. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  196. },
  197. func(deadline time.Time, size int) error {
  198. var res metadataResponseV1
  199. if err := c.readResponse(size, &res); err != nil {
  200. return err
  201. }
  202. for _, brokerMeta := range res.Brokers {
  203. if brokerMeta.NodeID == res.ControllerID {
  204. broker = Broker{ID: int(brokerMeta.NodeID),
  205. Port: int(brokerMeta.Port),
  206. Host: brokerMeta.Host,
  207. Rack: brokerMeta.Rack}
  208. break
  209. }
  210. }
  211. return nil
  212. },
  213. )
  214. return broker, err
  215. }
  216. // Brokers retrieve the broker list from the Kafka metadata
  217. func (c *Conn) Brokers() ([]Broker, error) {
  218. var brokers []Broker
  219. err := c.readOperation(
  220. func(deadline time.Time, id int32) error {
  221. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  222. },
  223. func(deadline time.Time, size int) error {
  224. var res metadataResponseV1
  225. if err := c.readResponse(size, &res); err != nil {
  226. return err
  227. }
  228. brokers = make([]Broker, len(res.Brokers))
  229. for i, brokerMeta := range res.Brokers {
  230. brokers[i] = Broker{
  231. ID: int(brokerMeta.NodeID),
  232. Port: int(brokerMeta.Port),
  233. Host: brokerMeta.Host,
  234. Rack: brokerMeta.Rack,
  235. }
  236. }
  237. return nil
  238. },
  239. )
  240. return brokers, err
  241. }
  242. // DeleteTopics deletes the specified topics.
  243. func (c *Conn) DeleteTopics(topics ...string) error {
  244. _, err := c.deleteTopics(deleteTopicsRequestV0{
  245. Topics: topics,
  246. })
  247. return err
  248. }
  249. // describeGroups retrieves the specified groups
  250. //
  251. // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
  252. func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
  253. var response describeGroupsResponseV0
  254. err := c.readOperation(
  255. func(deadline time.Time, id int32) error {
  256. return c.writeRequest(describeGroups, v0, id, request)
  257. },
  258. func(deadline time.Time, size int) error {
  259. return expectZeroSize(func() (remain int, err error) {
  260. return (&response).readFrom(&c.rbuf, size)
  261. }())
  262. },
  263. )
  264. if err != nil {
  265. return describeGroupsResponseV0{}, err
  266. }
  267. for _, group := range response.Groups {
  268. if group.ErrorCode != 0 {
  269. return describeGroupsResponseV0{}, Error(group.ErrorCode)
  270. }
  271. }
  272. return response, nil
  273. }
  274. // findCoordinator finds the coordinator for the specified group or transaction
  275. //
  276. // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
  277. func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  278. var response findCoordinatorResponseV0
  279. err := c.readOperation(
  280. func(deadline time.Time, id int32) error {
  281. return c.writeRequest(findCoordinator, v0, id, request)
  282. },
  283. func(deadline time.Time, size int) error {
  284. return expectZeroSize(func() (remain int, err error) {
  285. return (&response).readFrom(&c.rbuf, size)
  286. }())
  287. },
  288. )
  289. if err != nil {
  290. return findCoordinatorResponseV0{}, err
  291. }
  292. if response.ErrorCode != 0 {
  293. return findCoordinatorResponseV0{}, Error(response.ErrorCode)
  294. }
  295. return response, nil
  296. }
  297. // heartbeat sends a heartbeat message required by consumer groups
  298. //
  299. // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
  300. func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
  301. var response heartbeatResponseV0
  302. err := c.writeOperation(
  303. func(deadline time.Time, id int32) error {
  304. return c.writeRequest(heartbeat, v0, id, request)
  305. },
  306. func(deadline time.Time, size int) error {
  307. return expectZeroSize(func() (remain int, err error) {
  308. return (&response).readFrom(&c.rbuf, size)
  309. }())
  310. },
  311. )
  312. if err != nil {
  313. return heartbeatResponseV0{}, err
  314. }
  315. if response.ErrorCode != 0 {
  316. return heartbeatResponseV0{}, Error(response.ErrorCode)
  317. }
  318. return response, nil
  319. }
  320. // joinGroup attempts to join a consumer group
  321. //
  322. // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
  323. func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
  324. var response joinGroupResponseV1
  325. err := c.writeOperation(
  326. func(deadline time.Time, id int32) error {
  327. return c.writeRequest(joinGroup, v1, id, request)
  328. },
  329. func(deadline time.Time, size int) error {
  330. return expectZeroSize(func() (remain int, err error) {
  331. return (&response).readFrom(&c.rbuf, size)
  332. }())
  333. },
  334. )
  335. if err != nil {
  336. return joinGroupResponseV1{}, err
  337. }
  338. if response.ErrorCode != 0 {
  339. return joinGroupResponseV1{}, Error(response.ErrorCode)
  340. }
  341. return response, nil
  342. }
  343. // leaveGroup leaves the consumer from the consumer group
  344. //
  345. // See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
  346. func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  347. var response leaveGroupResponseV0
  348. err := c.writeOperation(
  349. func(deadline time.Time, id int32) error {
  350. return c.writeRequest(leaveGroup, v0, id, request)
  351. },
  352. func(deadline time.Time, size int) error {
  353. return expectZeroSize(func() (remain int, err error) {
  354. return (&response).readFrom(&c.rbuf, size)
  355. }())
  356. },
  357. )
  358. if err != nil {
  359. return leaveGroupResponseV0{}, err
  360. }
  361. if response.ErrorCode != 0 {
  362. return leaveGroupResponseV0{}, Error(response.ErrorCode)
  363. }
  364. return response, nil
  365. }
  366. // listGroups lists all the consumer groups
  367. //
  368. // See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
  369. func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, error) {
  370. var response listGroupsResponseV1
  371. err := c.readOperation(
  372. func(deadline time.Time, id int32) error {
  373. return c.writeRequest(listGroups, v1, id, request)
  374. },
  375. func(deadline time.Time, size int) error {
  376. return expectZeroSize(func() (remain int, err error) {
  377. return (&response).readFrom(&c.rbuf, size)
  378. }())
  379. },
  380. )
  381. if err != nil {
  382. return listGroupsResponseV1{}, err
  383. }
  384. if response.ErrorCode != 0 {
  385. return listGroupsResponseV1{}, Error(response.ErrorCode)
  386. }
  387. return response, nil
  388. }
  389. // offsetCommit commits the specified topic partition offsets
  390. //
  391. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
  392. func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  393. var response offsetCommitResponseV2
  394. err := c.writeOperation(
  395. func(deadline time.Time, id int32) error {
  396. return c.writeRequest(offsetCommit, v2, id, request)
  397. },
  398. func(deadline time.Time, size int) error {
  399. return expectZeroSize(func() (remain int, err error) {
  400. return (&response).readFrom(&c.rbuf, size)
  401. }())
  402. },
  403. )
  404. if err != nil {
  405. return offsetCommitResponseV2{}, err
  406. }
  407. for _, r := range response.Responses {
  408. for _, pr := range r.PartitionResponses {
  409. if pr.ErrorCode != 0 {
  410. return offsetCommitResponseV2{}, Error(pr.ErrorCode)
  411. }
  412. }
  413. }
  414. return response, nil
  415. }
  416. // offsetFetch fetches the offsets for the specified topic partitions.
  417. // -1 indicates that there is no offset saved for the partition.
  418. //
  419. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
  420. func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  421. var response offsetFetchResponseV1
  422. err := c.readOperation(
  423. func(deadline time.Time, id int32) error {
  424. return c.writeRequest(offsetFetch, v1, id, request)
  425. },
  426. func(deadline time.Time, size int) error {
  427. return expectZeroSize(func() (remain int, err error) {
  428. return (&response).readFrom(&c.rbuf, size)
  429. }())
  430. },
  431. )
  432. if err != nil {
  433. return offsetFetchResponseV1{}, err
  434. }
  435. for _, r := range response.Responses {
  436. for _, pr := range r.PartitionResponses {
  437. if pr.ErrorCode != 0 {
  438. return offsetFetchResponseV1{}, Error(pr.ErrorCode)
  439. }
  440. }
  441. }
  442. return response, nil
  443. }
  444. // syncGroup completes the handshake to join a consumer group
  445. //
  446. // See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
  447. func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
  448. var response syncGroupResponseV0
  449. err := c.readOperation(
  450. func(deadline time.Time, id int32) error {
  451. return c.writeRequest(syncGroup, v0, id, request)
  452. },
  453. func(deadline time.Time, size int) error {
  454. return expectZeroSize(func() (remain int, err error) {
  455. return (&response).readFrom(&c.rbuf, size)
  456. }())
  457. },
  458. )
  459. if err != nil {
  460. return syncGroupResponseV0{}, err
  461. }
  462. if response.ErrorCode != 0 {
  463. return syncGroupResponseV0{}, Error(response.ErrorCode)
  464. }
  465. return response, nil
  466. }
  467. // Close closes the kafka connection.
  468. func (c *Conn) Close() error {
  469. return c.conn.Close()
  470. }
  471. // LocalAddr returns the local network address.
  472. func (c *Conn) LocalAddr() net.Addr {
  473. return c.conn.LocalAddr()
  474. }
  475. // RemoteAddr returns the remote network address.
  476. func (c *Conn) RemoteAddr() net.Addr {
  477. return c.conn.RemoteAddr()
  478. }
  479. // SetDeadline sets the read and write deadlines associated with the connection.
  480. // It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
  481. //
  482. // A deadline is an absolute time after which I/O operations fail with a timeout
  483. // (see type Error) instead of blocking. The deadline applies to all future and
  484. // pending I/O, not just the immediately following call to Read or Write. After
  485. // a deadline has been exceeded, the connection may be closed if it was found to
  486. // be in an unrecoverable state.
  487. //
  488. // A zero value for t means I/O operations will not time out.
  489. func (c *Conn) SetDeadline(t time.Time) error {
  490. c.rdeadline.setDeadline(t)
  491. c.wdeadline.setDeadline(t)
  492. return nil
  493. }
  494. // SetReadDeadline sets the deadline for future Read calls and any
  495. // currently-blocked Read call.
  496. // A zero value for t means Read will not time out.
  497. func (c *Conn) SetReadDeadline(t time.Time) error {
  498. c.rdeadline.setDeadline(t)
  499. return nil
  500. }
  501. // SetWriteDeadline sets the deadline for future Write calls and any
  502. // currently-blocked Write call.
  503. // Even if write times out, it may return n > 0, indicating that some of the
  504. // data was successfully written.
  505. // A zero value for t means Write will not time out.
  506. func (c *Conn) SetWriteDeadline(t time.Time) error {
  507. c.wdeadline.setDeadline(t)
  508. return nil
  509. }
  510. // Offset returns the current offset of the connection as pair of integers,
  511. // where the first one is an offset value and the second one indicates how
  512. // to interpret it.
  513. //
  514. // See Seek for more details about the offset and whence values.
  515. func (c *Conn) Offset() (offset int64, whence int) {
  516. c.mutex.Lock()
  517. offset = c.offset
  518. c.mutex.Unlock()
  519. switch offset {
  520. case FirstOffset:
  521. offset = 0
  522. whence = SeekStart
  523. case LastOffset:
  524. offset = 0
  525. whence = SeekEnd
  526. default:
  527. whence = SeekAbsolute
  528. }
  529. return
  530. }
  531. const (
  532. SeekStart = 0 // Seek relative to the first offset available in the partition.
  533. SeekAbsolute = 1 // Seek to an absolute offset.
  534. SeekEnd = 2 // Seek relative to the last offset available in the partition.
  535. SeekCurrent = 3 // Seek relative to the current offset.
  536. // This flag may be combined to any of the SeekAbsolute and SeekCurrent
  537. // constants to skip the bound check that the connection would do otherwise.
  538. // Programs can use this flag to avoid making a metadata request to the kafka
  539. // broker to read the current first and last offsets of the partition.
  540. SeekDontCheck = 1 << 30
  541. )
  542. // Seek sets the offset for the next read or write operation according to whence, which
  543. // should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent.
  544. // When seeking relative to the end, the offset is subtracted from the current offset.
  545. // Note that for historical reasons, these do not align with the usual whence constants
  546. // as in lseek(2) or os.Seek.
  547. // The method returns the new absolute offset of the connection.
  548. func (c *Conn) Seek(offset int64, whence int) (int64, error) {
  549. seekDontCheck := (whence & SeekDontCheck) != 0
  550. whence &= ^SeekDontCheck
  551. switch whence {
  552. case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
  553. default:
  554. return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
  555. }
  556. if seekDontCheck {
  557. if whence == SeekAbsolute {
  558. c.mutex.Lock()
  559. c.offset = offset
  560. c.mutex.Unlock()
  561. return offset, nil
  562. }
  563. if whence == SeekCurrent {
  564. c.mutex.Lock()
  565. c.offset += offset
  566. offset = c.offset
  567. c.mutex.Unlock()
  568. return offset, nil
  569. }
  570. }
  571. if whence == SeekAbsolute {
  572. c.mutex.Lock()
  573. unchanged := offset == c.offset
  574. c.mutex.Unlock()
  575. if unchanged {
  576. return offset, nil
  577. }
  578. }
  579. if whence == SeekCurrent {
  580. c.mutex.Lock()
  581. offset = c.offset + offset
  582. c.mutex.Unlock()
  583. }
  584. first, last, err := c.ReadOffsets()
  585. if err != nil {
  586. return 0, err
  587. }
  588. switch whence {
  589. case SeekStart:
  590. offset = first + offset
  591. case SeekEnd:
  592. offset = last - offset
  593. }
  594. if offset < first || offset > last {
  595. return 0, OffsetOutOfRange
  596. }
  597. c.mutex.Lock()
  598. c.offset = offset
  599. c.mutex.Unlock()
  600. return offset, nil
  601. }
  602. // Read reads the message at the current offset from the connection, advancing
  603. // the offset on success so the next call to a read method will produce the next
  604. // message.
  605. // The method returns the number of bytes read, or an error if something went
  606. // wrong.
  607. //
  608. // While it is safe to call Read concurrently from multiple goroutines it may
  609. // be hard for the program to predict the results as the connection offset will
  610. // be read and written by multiple goroutines, they could read duplicates, or
  611. // messages may be seen by only some of the goroutines.
  612. //
  613. // The method fails with io.ErrShortBuffer if the buffer passed as argument is
  614. // too small to hold the message value.
  615. //
  616. // This method is provided to satisfy the net.Conn interface but is much less
  617. // efficient than using the more general purpose ReadBatch method.
  618. func (c *Conn) Read(b []byte) (int, error) {
  619. batch := c.ReadBatch(1, len(b))
  620. n, err := batch.Read(b)
  621. return n, coalesceErrors(silentEOF(err), batch.Close())
  622. }
  623. // ReadMessage reads the message at the current offset from the connection,
  624. // advancing the offset on success so the next call to a read method will
  625. // produce the next message.
  626. //
  627. // Because this method allocate memory buffers for the message key and value
  628. // it is less memory-efficient than Read, but has the advantage of never
  629. // failing with io.ErrShortBuffer.
  630. //
  631. // While it is safe to call Read concurrently from multiple goroutines it may
  632. // be hard for the program to predict the results as the connection offset will
  633. // be read and written by multiple goroutines, they could read duplicates, or
  634. // messages may be seen by only some of the goroutines.
  635. //
  636. // This method is provided for convenience purposes but is much less efficient
  637. // than using the more general purpose ReadBatch method.
  638. func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
  639. batch := c.ReadBatch(1, maxBytes)
  640. msg, err := batch.ReadMessage()
  641. return msg, coalesceErrors(silentEOF(err), batch.Close())
  642. }
  643. // ReadBatch reads a batch of messages from the kafka server. The method always
  644. // returns a non-nil Batch value. If an error occurred, either sending the fetch
  645. // request or reading the response, the error will be made available by the
  646. // returned value of the batch's Close method.
  647. //
  648. // While it is safe to call ReadBatch concurrently from multiple goroutines it
  649. // may be hard for the program to predict the results as the connection offset
  650. // will be read and written by multiple goroutines, they could read duplicates,
  651. // or messages may be seen by only some of the goroutines.
  652. //
  653. // A program doesn't specify the number of messages in wants from a batch, but
  654. // gives the minimum and maximum number of bytes that it wants to receive from
  655. // the kafka server.
  656. func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
  657. return c.ReadBatchWith(ReadBatchConfig{
  658. MinBytes: minBytes,
  659. MaxBytes: maxBytes,
  660. })
  661. }
  662. // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
  663. // with the default values in ReadBatchConfig except for minBytes and maxBytes.
  664. func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
  665. var adjustedDeadline time.Time
  666. var maxFetch = int(c.fetchMaxBytes)
  667. if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
  668. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
  669. }
  670. if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
  671. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
  672. }
  673. if cfg.MinBytes > cfg.MaxBytes {
  674. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
  675. }
  676. offset, whence := c.Offset()
  677. offset, err := c.Seek(offset, whence|SeekDontCheck)
  678. if err != nil {
  679. return &Batch{err: dontExpectEOF(err)}
  680. }
  681. fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
  682. if err != nil {
  683. return &Batch{err: dontExpectEOF(err)}
  684. }
  685. id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
  686. now := time.Now()
  687. var timeout time.Duration
  688. if cfg.MaxWait > 0 {
  689. // explicitly-configured case: no changes are made to the deadline,
  690. // and the timeout is sent exactly as specified.
  691. timeout = cfg.MaxWait
  692. } else {
  693. // default case: use the original logic to adjust the conn's
  694. // deadline.T
  695. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  696. timeout = deadlineToTimeout(deadline, now)
  697. }
  698. // save this variable outside of the closure for later use in detecting
  699. // truncated messages.
  700. adjustedDeadline = deadline
  701. switch fetchVersion {
  702. case v10:
  703. return c.wb.writeFetchRequestV10(
  704. id,
  705. c.clientID,
  706. c.topic,
  707. c.partition,
  708. offset,
  709. cfg.MinBytes,
  710. cfg.MaxBytes+int(c.fetchMinSize),
  711. timeout,
  712. int8(cfg.IsolationLevel),
  713. )
  714. case v5:
  715. return c.wb.writeFetchRequestV5(
  716. id,
  717. c.clientID,
  718. c.topic,
  719. c.partition,
  720. offset,
  721. cfg.MinBytes,
  722. cfg.MaxBytes+int(c.fetchMinSize),
  723. timeout,
  724. int8(cfg.IsolationLevel),
  725. )
  726. default:
  727. return c.wb.writeFetchRequestV2(
  728. id,
  729. c.clientID,
  730. c.topic,
  731. c.partition,
  732. offset,
  733. cfg.MinBytes,
  734. cfg.MaxBytes+int(c.fetchMinSize),
  735. timeout,
  736. )
  737. }
  738. })
  739. if err != nil {
  740. return &Batch{err: dontExpectEOF(err)}
  741. }
  742. _, size, lock, err := c.waitResponse(&c.rdeadline, id)
  743. if err != nil {
  744. return &Batch{err: dontExpectEOF(err)}
  745. }
  746. var throttle int32
  747. var highWaterMark int64
  748. var remain int
  749. switch fetchVersion {
  750. case v10:
  751. throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size)
  752. case v5:
  753. throttle, highWaterMark, remain, err = readFetchResponseHeaderV5(&c.rbuf, size)
  754. default:
  755. throttle, highWaterMark, remain, err = readFetchResponseHeaderV2(&c.rbuf, size)
  756. }
  757. if err == errShortRead {
  758. err = checkTimeoutErr(adjustedDeadline)
  759. }
  760. var msgs *messageSetReader
  761. if err == nil {
  762. if highWaterMark == offset {
  763. msgs = &messageSetReader{empty: true}
  764. } else {
  765. msgs, err = newMessageSetReader(&c.rbuf, remain)
  766. }
  767. }
  768. if err == errShortRead {
  769. err = checkTimeoutErr(adjustedDeadline)
  770. }
  771. return &Batch{
  772. conn: c,
  773. msgs: msgs,
  774. deadline: adjustedDeadline,
  775. throttle: makeDuration(throttle),
  776. lock: lock,
  777. topic: c.topic, // topic is copied to Batch to prevent race with Batch.close
  778. partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close
  779. offset: offset,
  780. highWaterMark: highWaterMark,
  781. // there shouldn't be a short read on initially setting up the batch.
  782. // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we
  783. // don't accidentally signal that we successfully reached the end of the
  784. // batch.
  785. err: dontExpectEOF(err),
  786. }
  787. }
  788. // ReadOffset returns the offset of the first message with a timestamp equal or
  789. // greater to t.
  790. func (c *Conn) ReadOffset(t time.Time) (int64, error) {
  791. return c.readOffset(timestamp(t))
  792. }
  793. // ReadFirstOffset returns the first offset available on the connection.
  794. func (c *Conn) ReadFirstOffset() (int64, error) {
  795. return c.readOffset(FirstOffset)
  796. }
  797. // ReadLastOffset returns the last offset available on the connection.
  798. func (c *Conn) ReadLastOffset() (int64, error) {
  799. return c.readOffset(LastOffset)
  800. }
  801. // ReadOffsets returns the absolute first and last offsets of the topic used by
  802. // the connection.
  803. func (c *Conn) ReadOffsets() (first, last int64, err error) {
  804. // We have to submit two different requests to fetch the first and last
  805. // offsets because kafka refuses requests that ask for multiple offsets
  806. // on the same topic and partition.
  807. if first, err = c.ReadFirstOffset(); err != nil {
  808. return
  809. }
  810. if last, err = c.ReadLastOffset(); err != nil {
  811. first = 0 // don't leak the value on error
  812. return
  813. }
  814. return
  815. }
  816. func (c *Conn) readOffset(t int64) (offset int64, err error) {
  817. err = c.readOperation(
  818. func(deadline time.Time, id int32) error {
  819. return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
  820. },
  821. func(deadline time.Time, size int) error {
  822. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  823. // We skip the topic name because we've made a request for
  824. // a single topic.
  825. size, err := discardString(r, size)
  826. if err != nil {
  827. return size, err
  828. }
  829. // Reading the array of partitions, there will be only one
  830. // partition which gives the offset we're looking for.
  831. return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  832. var p partitionOffsetV1
  833. size, err := p.readFrom(r, size)
  834. if err != nil {
  835. return size, err
  836. }
  837. if p.ErrorCode != 0 {
  838. return size, Error(p.ErrorCode)
  839. }
  840. offset = p.Offset
  841. return size, nil
  842. })
  843. }))
  844. },
  845. )
  846. return
  847. }
  848. // ReadPartitions returns the list of available partitions for the given list of
  849. // topics.
  850. //
  851. // If the method is called with no topic, it uses the topic configured on the
  852. // connection. If there are none, the method fetches all partitions of the kafka
  853. // cluster.
  854. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
  855. if len(topics) == 0 {
  856. if len(c.topic) != 0 {
  857. defaultTopics := [...]string{c.topic}
  858. topics = defaultTopics[:]
  859. } else {
  860. // topics needs to be explicitly nil-ed out or the broker will
  861. // interpret it as a request for 0 partitions instead of all.
  862. topics = nil
  863. }
  864. }
  865. err = c.readOperation(
  866. func(deadline time.Time, id int32) error {
  867. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
  868. },
  869. func(deadline time.Time, size int) error {
  870. var res metadataResponseV1
  871. if err := c.readResponse(size, &res); err != nil {
  872. return err
  873. }
  874. brokers := make(map[int32]Broker, len(res.Brokers))
  875. for _, b := range res.Brokers {
  876. brokers[b.NodeID] = Broker{
  877. Host: b.Host,
  878. Port: int(b.Port),
  879. ID: int(b.NodeID),
  880. Rack: b.Rack,
  881. }
  882. }
  883. makeBrokers := func(ids ...int32) []Broker {
  884. b := make([]Broker, len(ids))
  885. for i, id := range ids {
  886. b[i] = brokers[id]
  887. }
  888. return b
  889. }
  890. for _, t := range res.Topics {
  891. if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
  892. // We only report errors if they happened for the topic of
  893. // the connection, otherwise the topic will simply have no
  894. // partitions in the result set.
  895. return Error(t.TopicErrorCode)
  896. }
  897. for _, p := range t.Partitions {
  898. partitions = append(partitions, Partition{
  899. Topic: t.TopicName,
  900. Leader: brokers[p.Leader],
  901. Replicas: makeBrokers(p.Replicas...),
  902. Isr: makeBrokers(p.Isr...),
  903. ID: int(p.PartitionID),
  904. })
  905. }
  906. }
  907. return nil
  908. },
  909. )
  910. return
  911. }
  912. // Write writes a message to the kafka broker that this connection was
  913. // established to. The method returns the number of bytes written, or an error
  914. // if something went wrong.
  915. //
  916. // The operation either succeeds or fail, it never partially writes the message.
  917. //
  918. // This method is exposed to satisfy the net.Conn interface but is less efficient
  919. // than the more general purpose WriteMessages method.
  920. func (c *Conn) Write(b []byte) (int, error) {
  921. return c.WriteCompressedMessages(nil, Message{Value: b})
  922. }
  923. // WriteMessages writes a batch of messages to the connection's topic and
  924. // partition, returning the number of bytes written. The write is an atomic
  925. // operation, it either fully succeeds or fails.
  926. func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
  927. return c.WriteCompressedMessages(nil, msgs...)
  928. }
  929. // WriteCompressedMessages writes a batch of messages to the connection's topic
  930. // and partition, returning the number of bytes written. The write is an atomic
  931. // operation, it either fully succeeds or fails.
  932. //
  933. // If the compression codec is not nil, the messages will be compressed.
  934. func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) {
  935. nbytes, _, _, _, err = c.writeCompressedMessages(codec, msgs...)
  936. return
  937. }
  938. // WriteCompressedMessagesAt writes a batch of messages to the connection's topic
  939. // and partition, returning the number of bytes written, partition and offset numbers
  940. // and timestamp assigned by the kafka broker to the message set. The write is an atomic
  941. // operation, it either fully succeeds or fails.
  942. //
  943. // If the compression codec is not nil, the messages will be compressed.
  944. func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  945. return c.writeCompressedMessages(codec, msgs...)
  946. }
  947. func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  948. if len(msgs) == 0 {
  949. return
  950. }
  951. writeTime := time.Now()
  952. for i, msg := range msgs {
  953. // users may believe they can set the Topic and/or Partition
  954. // on the kafka message.
  955. if msg.Topic != "" && msg.Topic != c.topic {
  956. err = errInvalidWriteTopic
  957. return
  958. }
  959. if msg.Partition != 0 {
  960. err = errInvalidWritePartition
  961. return
  962. }
  963. if msg.Time.IsZero() {
  964. msgs[i].Time = writeTime
  965. }
  966. nbytes += len(msg.Key) + len(msg.Value)
  967. }
  968. var produceVersion apiVersion
  969. if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
  970. return
  971. }
  972. err = c.writeOperation(
  973. func(deadline time.Time, id int32) error {
  974. now := time.Now()
  975. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  976. switch produceVersion {
  977. case v7:
  978. recordBatch, err :=
  979. newRecordBatch(
  980. codec,
  981. msgs...,
  982. )
  983. if err != nil {
  984. return err
  985. }
  986. return c.wb.writeProduceRequestV7(
  987. id,
  988. c.clientID,
  989. c.topic,
  990. c.partition,
  991. deadlineToTimeout(deadline, now),
  992. int16(atomic.LoadInt32(&c.requiredAcks)),
  993. c.transactionalID,
  994. recordBatch,
  995. )
  996. case v3:
  997. recordBatch, err :=
  998. newRecordBatch(
  999. codec,
  1000. msgs...,
  1001. )
  1002. if err != nil {
  1003. return err
  1004. }
  1005. return c.wb.writeProduceRequestV3(
  1006. id,
  1007. c.clientID,
  1008. c.topic,
  1009. c.partition,
  1010. deadlineToTimeout(deadline, now),
  1011. int16(atomic.LoadInt32(&c.requiredAcks)),
  1012. c.transactionalID,
  1013. recordBatch,
  1014. )
  1015. default:
  1016. return c.wb.writeProduceRequestV2(
  1017. codec,
  1018. id,
  1019. c.clientID,
  1020. c.topic,
  1021. c.partition,
  1022. deadlineToTimeout(deadline, now),
  1023. int16(atomic.LoadInt32(&c.requiredAcks)),
  1024. msgs...,
  1025. )
  1026. }
  1027. },
  1028. func(deadline time.Time, size int) error {
  1029. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  1030. // Skip the topic, we've produced the message to only one topic,
  1031. // no need to waste resources loading it in memory.
  1032. size, err := discardString(r, size)
  1033. if err != nil {
  1034. return size, err
  1035. }
  1036. // Read the list of partitions, there should be only one since
  1037. // we've produced a message to a single partition.
  1038. size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  1039. switch produceVersion {
  1040. case v7:
  1041. var p produceResponsePartitionV7
  1042. size, err := p.readFrom(r, size)
  1043. if err == nil && p.ErrorCode != 0 {
  1044. err = Error(p.ErrorCode)
  1045. }
  1046. if err == nil {
  1047. partition = p.Partition
  1048. offset = p.Offset
  1049. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1050. }
  1051. return size, err
  1052. default:
  1053. var p produceResponsePartitionV2
  1054. size, err := p.readFrom(r, size)
  1055. if err == nil && p.ErrorCode != 0 {
  1056. err = Error(p.ErrorCode)
  1057. }
  1058. if err == nil {
  1059. partition = p.Partition
  1060. offset = p.Offset
  1061. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1062. }
  1063. return size, err
  1064. }
  1065. })
  1066. if err != nil {
  1067. return size, err
  1068. }
  1069. // The response is trailed by the throttle time, also skipping
  1070. // since it's not interesting here.
  1071. return discardInt32(r, size)
  1072. }))
  1073. },
  1074. )
  1075. if err != nil {
  1076. nbytes = 0
  1077. }
  1078. return
  1079. }
  1080. // SetRequiredAcks sets the number of acknowledges from replicas that the
  1081. // connection requests when producing messages.
  1082. func (c *Conn) SetRequiredAcks(n int) error {
  1083. switch n {
  1084. case -1, 1:
  1085. atomic.StoreInt32(&c.requiredAcks, int32(n))
  1086. return nil
  1087. default:
  1088. return InvalidRequiredAcks
  1089. }
  1090. }
  1091. func (c *Conn) writeRequestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32, size int32) {
  1092. hdr := c.requestHeader(apiKey, apiVersion, correlationID)
  1093. hdr.Size = (hdr.size() + size) - 4
  1094. hdr.writeTo(&c.wb)
  1095. }
  1096. func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
  1097. hdr := c.requestHeader(apiKey, apiVersion, correlationID)
  1098. hdr.Size = (hdr.size() + req.size()) - 4
  1099. hdr.writeTo(&c.wb)
  1100. req.writeTo(&c.wb)
  1101. return c.wbuf.Flush()
  1102. }
  1103. func (c *Conn) readResponse(size int, res interface{}) error {
  1104. size, err := read(&c.rbuf, size, res)
  1105. switch err.(type) {
  1106. case Error:
  1107. var e error
  1108. if size, e = discardN(&c.rbuf, size, size); e != nil {
  1109. err = e
  1110. }
  1111. }
  1112. return expectZeroSize(size, err)
  1113. }
  1114. func (c *Conn) peekResponseSizeAndID() (int32, int32, error) {
  1115. b, err := c.rbuf.Peek(8)
  1116. if err != nil {
  1117. return 0, 0, err
  1118. }
  1119. size, id := makeInt32(b[:4]), makeInt32(b[4:])
  1120. return size, id, nil
  1121. }
  1122. func (c *Conn) skipResponseSizeAndID() {
  1123. c.rbuf.Discard(8)
  1124. }
  1125. func (c *Conn) readDeadline() time.Time {
  1126. return c.rdeadline.deadline()
  1127. }
  1128. func (c *Conn) writeDeadline() time.Time {
  1129. return c.wdeadline.deadline()
  1130. }
  1131. func (c *Conn) readOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1132. return c.do(&c.rdeadline, write, read)
  1133. }
  1134. func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1135. return c.do(&c.wdeadline, write, read)
  1136. }
  1137. func (c *Conn) enter() {
  1138. atomic.AddInt32(&c.inflight, +1)
  1139. }
  1140. func (c *Conn) leave() {
  1141. atomic.AddInt32(&c.inflight, -1)
  1142. }
  1143. func (c *Conn) concurrency() int {
  1144. return int(atomic.LoadInt32(&c.inflight))
  1145. }
  1146. func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1147. id, err := c.doRequest(d, write)
  1148. if err != nil {
  1149. return err
  1150. }
  1151. deadline, size, lock, err := c.waitResponse(d, id)
  1152. if err != nil {
  1153. return err
  1154. }
  1155. if err = read(deadline, size); err != nil {
  1156. switch err.(type) {
  1157. case Error:
  1158. default:
  1159. c.conn.Close()
  1160. }
  1161. }
  1162. d.unsetConnReadDeadline()
  1163. lock.Unlock()
  1164. return err
  1165. }
  1166. func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
  1167. c.enter()
  1168. c.wlock.Lock()
  1169. c.correlationID++
  1170. id = c.correlationID
  1171. err = write(d.setConnWriteDeadline(c.conn), id)
  1172. d.unsetConnWriteDeadline()
  1173. if err != nil {
  1174. // When an error occurs there's no way to know if the connection is in a
  1175. // recoverable state so we're better off just giving up at this point to
  1176. // avoid any risk of corrupting the following operations.
  1177. c.conn.Close()
  1178. c.leave()
  1179. }
  1180. c.wlock.Unlock()
  1181. return
  1182. }
  1183. func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
  1184. for {
  1185. var rsz int32
  1186. var rid int32
  1187. c.rlock.Lock()
  1188. deadline = d.setConnReadDeadline(c.conn)
  1189. rsz, rid, err = c.peekResponseSizeAndID()
  1190. if err != nil {
  1191. d.unsetConnReadDeadline()
  1192. c.conn.Close()
  1193. c.rlock.Unlock()
  1194. break
  1195. }
  1196. if id == rid {
  1197. c.skipResponseSizeAndID()
  1198. size, lock = int(rsz-4), &c.rlock
  1199. // Don't unlock the read mutex to yield ownership to the caller.
  1200. break
  1201. }
  1202. if c.concurrency() == 1 {
  1203. // If the goroutine is the only one waiting on this connection it
  1204. // should be impossible to read a correlation id different from the
  1205. // one it expects. This is a sign that the data we are reading on
  1206. // the wire is corrupted and the connection needs to be closed.
  1207. err = io.ErrNoProgress
  1208. c.rlock.Unlock()
  1209. break
  1210. }
  1211. // Optimistically release the read lock if a response has already
  1212. // been received but the current operation is not the target for it.
  1213. c.rlock.Unlock()
  1214. runtime.Gosched()
  1215. }
  1216. c.leave()
  1217. return
  1218. }
  1219. func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader {
  1220. return requestHeader{
  1221. ApiKey: int16(apiKey),
  1222. ApiVersion: int16(apiVersion),
  1223. CorrelationID: correlationID,
  1224. ClientID: c.clientID,
  1225. }
  1226. }
  1227. func (c *Conn) ApiVersions() ([]ApiVersion, error) {
  1228. deadline := &c.rdeadline
  1229. if deadline.deadline().IsZero() {
  1230. // ApiVersions is called automatically when API version negotiation
  1231. // needs to happen, so we are not guaranteed that a read deadline has
  1232. // been set yet. Fallback to use the write deadline in case it was
  1233. // set, for example when version negotiation is initiated during a
  1234. // produce request.
  1235. deadline = &c.wdeadline
  1236. }
  1237. id, err := c.doRequest(deadline, func(_ time.Time, id int32) error {
  1238. h := requestHeader{
  1239. ApiKey: int16(apiVersions),
  1240. ApiVersion: int16(v0),
  1241. CorrelationID: id,
  1242. ClientID: c.clientID,
  1243. }
  1244. h.Size = (h.size() - 4)
  1245. h.writeTo(&c.wb)
  1246. return c.wbuf.Flush()
  1247. })
  1248. if err != nil {
  1249. return nil, err
  1250. }
  1251. _, size, lock, err := c.waitResponse(deadline, id)
  1252. if err != nil {
  1253. return nil, err
  1254. }
  1255. defer lock.Unlock()
  1256. var errorCode int16
  1257. if size, err = readInt16(&c.rbuf, size, &errorCode); err != nil {
  1258. return nil, err
  1259. }
  1260. var arrSize int32
  1261. if size, err = readInt32(&c.rbuf, size, &arrSize); err != nil {
  1262. return nil, err
  1263. }
  1264. r := make([]ApiVersion, arrSize)
  1265. for i := 0; i < int(arrSize); i++ {
  1266. if size, err = readInt16(&c.rbuf, size, &r[i].ApiKey); err != nil {
  1267. return nil, err
  1268. }
  1269. if size, err = readInt16(&c.rbuf, size, &r[i].MinVersion); err != nil {
  1270. return nil, err
  1271. }
  1272. if size, err = readInt16(&c.rbuf, size, &r[i].MaxVersion); err != nil {
  1273. return nil, err
  1274. }
  1275. }
  1276. if errorCode != 0 {
  1277. return r, Error(errorCode)
  1278. }
  1279. return r, nil
  1280. }
  1281. // connDeadline is a helper type to implement read/write deadline management on
  1282. // the kafka connection.
  1283. type connDeadline struct {
  1284. mutex sync.Mutex
  1285. value time.Time
  1286. rconn net.Conn
  1287. wconn net.Conn
  1288. }
  1289. func (d *connDeadline) deadline() time.Time {
  1290. d.mutex.Lock()
  1291. t := d.value
  1292. d.mutex.Unlock()
  1293. return t
  1294. }
  1295. func (d *connDeadline) setDeadline(t time.Time) {
  1296. d.mutex.Lock()
  1297. d.value = t
  1298. if d.rconn != nil {
  1299. d.rconn.SetReadDeadline(t)
  1300. }
  1301. if d.wconn != nil {
  1302. d.wconn.SetWriteDeadline(t)
  1303. }
  1304. d.mutex.Unlock()
  1305. }
  1306. func (d *connDeadline) setConnReadDeadline(conn net.Conn) time.Time {
  1307. d.mutex.Lock()
  1308. deadline := d.value
  1309. d.rconn = conn
  1310. d.rconn.SetReadDeadline(deadline)
  1311. d.mutex.Unlock()
  1312. return deadline
  1313. }
  1314. func (d *connDeadline) setConnWriteDeadline(conn net.Conn) time.Time {
  1315. d.mutex.Lock()
  1316. deadline := d.value
  1317. d.wconn = conn
  1318. d.wconn.SetWriteDeadline(deadline)
  1319. d.mutex.Unlock()
  1320. return deadline
  1321. }
  1322. func (d *connDeadline) unsetConnReadDeadline() {
  1323. d.mutex.Lock()
  1324. d.rconn = nil
  1325. d.mutex.Unlock()
  1326. }
  1327. func (d *connDeadline) unsetConnWriteDeadline() {
  1328. d.mutex.Lock()
  1329. d.wconn = nil
  1330. d.mutex.Unlock()
  1331. }
  1332. // saslHandshake sends the SASL handshake message. This will determine whether
  1333. // the Mechanism is supported by the cluster. If it's not, this function will
  1334. // error out with UnsupportedSASLMechanism.
  1335. //
  1336. // If the mechanism is unsupported, the handshake request will reply with the
  1337. // list of the cluster's configured mechanisms, which could potentially be used
  1338. // to facilitate negotiation. At the moment, we are not negotiating the
  1339. // mechanism as we believe that brokers are usually known to the client, and
  1340. // therefore the client should already know which mechanisms are supported.
  1341. //
  1342. // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
  1343. func (c *Conn) saslHandshake(mechanism string) error {
  1344. // The wire format for V0 and V1 is identical, but the version
  1345. // number will affect how the SASL authentication
  1346. // challenge/responses are sent
  1347. var resp saslHandshakeResponseV0
  1348. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1349. if err != nil {
  1350. return err
  1351. }
  1352. err = c.writeOperation(
  1353. func(deadline time.Time, id int32) error {
  1354. return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
  1355. },
  1356. func(deadline time.Time, size int) error {
  1357. return expectZeroSize(func() (int, error) {
  1358. return (&resp).readFrom(&c.rbuf, size)
  1359. }())
  1360. },
  1361. )
  1362. if err == nil && resp.ErrorCode != 0 {
  1363. err = Error(resp.ErrorCode)
  1364. }
  1365. return err
  1366. }
  1367. // saslAuthenticate sends the SASL authenticate message. This function must
  1368. // be immediately preceded by a successful saslHandshake.
  1369. //
  1370. // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
  1371. func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
  1372. // if we sent a v1 handshake, then we must encapsulate the authentication
  1373. // request in a saslAuthenticateRequest. otherwise, we read and write raw
  1374. // bytes.
  1375. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1376. if err != nil {
  1377. return nil, err
  1378. }
  1379. if version == v1 {
  1380. var request = saslAuthenticateRequestV0{Data: data}
  1381. var response saslAuthenticateResponseV0
  1382. err := c.writeOperation(
  1383. func(deadline time.Time, id int32) error {
  1384. return c.writeRequest(saslAuthenticate, v0, id, request)
  1385. },
  1386. func(deadline time.Time, size int) error {
  1387. return expectZeroSize(func() (remain int, err error) {
  1388. return (&response).readFrom(&c.rbuf, size)
  1389. }())
  1390. },
  1391. )
  1392. if err == nil && response.ErrorCode != 0 {
  1393. err = Error(response.ErrorCode)
  1394. }
  1395. return response.Data, err
  1396. }
  1397. // fall back to opaque bytes on the wire. the broker is expecting these if
  1398. // it just processed a v0 sasl handshake.
  1399. c.wb.writeInt32(int32(len(data)))
  1400. if _, err := c.wb.Write(data); err != nil {
  1401. return nil, err
  1402. }
  1403. if err := c.wb.Flush(); err != nil {
  1404. return nil, err
  1405. }
  1406. var respLen int32
  1407. if _, err := readInt32(&c.rbuf, 4, &respLen); err != nil {
  1408. return nil, err
  1409. }
  1410. resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
  1411. return resp, err
  1412. }