conn.go 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645
  1. package kafka
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "sync/atomic"
  13. "time"
  14. )
  15. var (
  16. errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
  17. errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
  18. )
  19. // Conn represents a connection to a kafka broker.
  20. //
  21. // Instances of Conn are safe to use concurrently from multiple goroutines.
  22. type Conn struct {
  23. // base network connection
  24. conn net.Conn
  25. // number of inflight requests on the connection.
  26. inflight int32
  27. // offset management (synchronized on the mutex field)
  28. mutex sync.Mutex
  29. offset int64
  30. // read buffer (synchronized on rlock)
  31. rlock sync.Mutex
  32. rbuf bufio.Reader
  33. // write buffer (synchronized on wlock)
  34. wlock sync.Mutex
  35. wbuf bufio.Writer
  36. wb writeBuffer
  37. // deadline management
  38. wdeadline connDeadline
  39. rdeadline connDeadline
  40. // immutable values of the connection object
  41. clientID string
  42. topic string
  43. partition int32
  44. fetchMaxBytes int32
  45. fetchMinSize int32
  46. broker int32
  47. rack string
  48. // correlation ID generator (synchronized on wlock)
  49. correlationID int32
  50. // number of replica acks required when publishing to a partition
  51. requiredAcks int32
  52. // lazily loaded API versions used by this connection
  53. apiVersions atomic.Value // apiVersionMap
  54. transactionalID *string
  55. }
  56. type apiVersionMap map[apiKey]ApiVersion
  57. func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
  58. x := v[key]
  59. for i := len(sortedSupportedVersions) - 1; i >= 0; i-- {
  60. s := sortedSupportedVersions[i]
  61. if apiVersion(x.MaxVersion) >= s {
  62. return s
  63. }
  64. }
  65. return -1
  66. }
  67. // ConnConfig is a configuration object used to create new instances of Conn.
  68. type ConnConfig struct {
  69. ClientID string
  70. Topic string
  71. Partition int
  72. Broker int
  73. Rack string
  74. // The transactional id to use for transactional delivery. Idempotent
  75. // deliver should be enabled if transactional id is configured.
  76. // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
  77. // Empty string means that this connection can't be transactional.
  78. TransactionalID string
  79. }
  80. // ReadBatchConfig is a configuration object used for reading batches of messages.
  81. type ReadBatchConfig struct {
  82. // MinBytes indicates to the broker the minimum batch size that the consumer
  83. // will accept. Setting a high minimum when consuming from a low-volume topic
  84. // may result in delayed delivery when the broker does not have enough data to
  85. // satisfy the defined minimum.
  86. MinBytes int
  87. // MaxBytes indicates to the broker the maximum batch size that the consumer
  88. // will accept. The broker will truncate a message to satisfy this maximum, so
  89. // choose a value that is high enough for your largest message size.
  90. MaxBytes int
  91. // IsolationLevel controls the visibility of transactional records.
  92. // ReadUncommitted makes all records visible. With ReadCommitted only
  93. // non-transactional and committed records are visible.
  94. IsolationLevel IsolationLevel
  95. // MaxWait is the amount of time for the broker while waiting to hit the
  96. // min/max byte targets. This setting is independent of any network-level
  97. // timeouts or deadlines.
  98. //
  99. // For backward compatibility, when this field is left zero, kafka-go will
  100. // infer the max wait from the connection's read deadline.
  101. MaxWait time.Duration
  102. }
  103. type IsolationLevel int8
  104. const (
  105. ReadUncommitted IsolationLevel = 0
  106. ReadCommitted IsolationLevel = 1
  107. )
  108. var (
  109. // DefaultClientID is the default value used as ClientID of kafka
  110. // connections.
  111. DefaultClientID string
  112. )
  113. func init() {
  114. progname := filepath.Base(os.Args[0])
  115. hostname, _ := os.Hostname()
  116. DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
  117. }
  118. // NewConn returns a new kafka connection for the given topic and partition.
  119. func NewConn(conn net.Conn, topic string, partition int) *Conn {
  120. return NewConnWith(conn, ConnConfig{
  121. Topic: topic,
  122. Partition: partition,
  123. })
  124. }
  125. func emptyToNullable(transactionalID string) (result *string) {
  126. if transactionalID != "" {
  127. result = &transactionalID
  128. }
  129. return result
  130. }
  131. // NewConnWith returns a new kafka connection configured with config.
  132. // The offset is initialized to FirstOffset.
  133. func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
  134. if len(config.ClientID) == 0 {
  135. config.ClientID = DefaultClientID
  136. }
  137. if config.Partition < 0 || config.Partition > math.MaxInt32 {
  138. panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
  139. }
  140. c := &Conn{
  141. conn: conn,
  142. rbuf: *bufio.NewReader(conn),
  143. wbuf: *bufio.NewWriter(conn),
  144. clientID: config.ClientID,
  145. topic: config.Topic,
  146. partition: int32(config.Partition),
  147. broker: int32(config.Broker),
  148. rack: config.Rack,
  149. offset: FirstOffset,
  150. requiredAcks: -1,
  151. transactionalID: emptyToNullable(config.TransactionalID),
  152. }
  153. c.wb.w = &c.wbuf
  154. // The fetch request needs to ask for a MaxBytes value that is at least
  155. // enough to load the control data of the response. To avoid having to
  156. // recompute it on every read, it is cached here in the Conn value.
  157. c.fetchMinSize = (fetchResponseV2{
  158. Topics: []fetchResponseTopicV2{{
  159. TopicName: config.Topic,
  160. Partitions: []fetchResponsePartitionV2{{
  161. Partition: int32(config.Partition),
  162. MessageSet: messageSet{{}},
  163. }},
  164. }},
  165. }).size()
  166. c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
  167. return c
  168. }
  169. func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
  170. v, err := c.loadVersions()
  171. if err != nil {
  172. return -1, err
  173. }
  174. a := v.negotiate(key, sortedSupportedVersions...)
  175. if a < 0 {
  176. return -1, fmt.Errorf("no matching versions were found between the client and the broker for API key %d", key)
  177. }
  178. return a, nil
  179. }
  180. func (c *Conn) loadVersions() (apiVersionMap, error) {
  181. v, _ := c.apiVersions.Load().(apiVersionMap)
  182. if v != nil {
  183. return v, nil
  184. }
  185. brokerVersions, err := c.ApiVersions()
  186. if err != nil {
  187. return nil, err
  188. }
  189. v = make(apiVersionMap, len(brokerVersions))
  190. for _, a := range brokerVersions {
  191. v[apiKey(a.ApiKey)] = a
  192. }
  193. c.apiVersions.Store(v)
  194. return v, nil
  195. }
  196. // Broker returns a Broker value representing the kafka broker that this
  197. // connection was established to.
  198. func (c *Conn) Broker() Broker {
  199. addr := c.conn.RemoteAddr()
  200. host, port, _ := splitHostPortNumber(addr.String())
  201. return Broker{
  202. Host: host,
  203. Port: port,
  204. ID: int(c.broker),
  205. Rack: c.rack,
  206. }
  207. }
  208. // Controller requests kafka for the current controller and returns its URL.
  209. func (c *Conn) Controller() (broker Broker, err error) {
  210. err = c.readOperation(
  211. func(deadline time.Time, id int32) error {
  212. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  213. },
  214. func(deadline time.Time, size int) error {
  215. var res metadataResponseV1
  216. if err := c.readResponse(size, &res); err != nil {
  217. return err
  218. }
  219. for _, brokerMeta := range res.Brokers {
  220. if brokerMeta.NodeID == res.ControllerID {
  221. broker = Broker{ID: int(brokerMeta.NodeID),
  222. Port: int(brokerMeta.Port),
  223. Host: brokerMeta.Host,
  224. Rack: brokerMeta.Rack}
  225. break
  226. }
  227. }
  228. return nil
  229. },
  230. )
  231. return broker, err
  232. }
  233. // Brokers retrieve the broker list from the Kafka metadata.
  234. func (c *Conn) Brokers() ([]Broker, error) {
  235. var brokers []Broker
  236. err := c.readOperation(
  237. func(deadline time.Time, id int32) error {
  238. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  239. },
  240. func(deadline time.Time, size int) error {
  241. var res metadataResponseV1
  242. if err := c.readResponse(size, &res); err != nil {
  243. return err
  244. }
  245. brokers = make([]Broker, len(res.Brokers))
  246. for i, brokerMeta := range res.Brokers {
  247. brokers[i] = Broker{
  248. ID: int(brokerMeta.NodeID),
  249. Port: int(brokerMeta.Port),
  250. Host: brokerMeta.Host,
  251. Rack: brokerMeta.Rack,
  252. }
  253. }
  254. return nil
  255. },
  256. )
  257. return brokers, err
  258. }
  259. // DeleteTopics deletes the specified topics.
  260. func (c *Conn) DeleteTopics(topics ...string) error {
  261. _, err := c.deleteTopics(deleteTopicsRequestV0{
  262. Topics: topics,
  263. })
  264. return err
  265. }
  266. // findCoordinator finds the coordinator for the specified group or transaction
  267. //
  268. // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
  269. func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  270. var response findCoordinatorResponseV0
  271. err := c.readOperation(
  272. func(deadline time.Time, id int32) error {
  273. return c.writeRequest(findCoordinator, v0, id, request)
  274. },
  275. func(deadline time.Time, size int) error {
  276. return expectZeroSize(func() (remain int, err error) {
  277. return (&response).readFrom(&c.rbuf, size)
  278. }())
  279. },
  280. )
  281. if err != nil {
  282. return findCoordinatorResponseV0{}, err
  283. }
  284. if response.ErrorCode != 0 {
  285. return findCoordinatorResponseV0{}, Error(response.ErrorCode)
  286. }
  287. return response, nil
  288. }
  289. // heartbeat sends a heartbeat message required by consumer groups
  290. //
  291. // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
  292. func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
  293. var response heartbeatResponseV0
  294. err := c.writeOperation(
  295. func(deadline time.Time, id int32) error {
  296. return c.writeRequest(heartbeat, v0, id, request)
  297. },
  298. func(deadline time.Time, size int) error {
  299. return expectZeroSize(func() (remain int, err error) {
  300. return (&response).readFrom(&c.rbuf, size)
  301. }())
  302. },
  303. )
  304. if err != nil {
  305. return heartbeatResponseV0{}, err
  306. }
  307. if response.ErrorCode != 0 {
  308. return heartbeatResponseV0{}, Error(response.ErrorCode)
  309. }
  310. return response, nil
  311. }
  312. // joinGroup attempts to join a consumer group
  313. //
  314. // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
  315. func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
  316. var response joinGroupResponseV1
  317. err := c.writeOperation(
  318. func(deadline time.Time, id int32) error {
  319. return c.writeRequest(joinGroup, v1, id, request)
  320. },
  321. func(deadline time.Time, size int) error {
  322. return expectZeroSize(func() (remain int, err error) {
  323. return (&response).readFrom(&c.rbuf, size)
  324. }())
  325. },
  326. )
  327. if err != nil {
  328. return joinGroupResponseV1{}, err
  329. }
  330. if response.ErrorCode != 0 {
  331. return joinGroupResponseV1{}, Error(response.ErrorCode)
  332. }
  333. return response, nil
  334. }
  335. // leaveGroup leaves the consumer from the consumer group
  336. //
  337. // See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
  338. func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  339. var response leaveGroupResponseV0
  340. err := c.writeOperation(
  341. func(deadline time.Time, id int32) error {
  342. return c.writeRequest(leaveGroup, v0, id, request)
  343. },
  344. func(deadline time.Time, size int) error {
  345. return expectZeroSize(func() (remain int, err error) {
  346. return (&response).readFrom(&c.rbuf, size)
  347. }())
  348. },
  349. )
  350. if err != nil {
  351. return leaveGroupResponseV0{}, err
  352. }
  353. if response.ErrorCode != 0 {
  354. return leaveGroupResponseV0{}, Error(response.ErrorCode)
  355. }
  356. return response, nil
  357. }
  358. // listGroups lists all the consumer groups
  359. //
  360. // See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
  361. func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, error) {
  362. var response listGroupsResponseV1
  363. err := c.readOperation(
  364. func(deadline time.Time, id int32) error {
  365. return c.writeRequest(listGroups, v1, id, request)
  366. },
  367. func(deadline time.Time, size int) error {
  368. return expectZeroSize(func() (remain int, err error) {
  369. return (&response).readFrom(&c.rbuf, size)
  370. }())
  371. },
  372. )
  373. if err != nil {
  374. return listGroupsResponseV1{}, err
  375. }
  376. if response.ErrorCode != 0 {
  377. return listGroupsResponseV1{}, Error(response.ErrorCode)
  378. }
  379. return response, nil
  380. }
  381. // offsetCommit commits the specified topic partition offsets
  382. //
  383. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
  384. func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  385. var response offsetCommitResponseV2
  386. err := c.writeOperation(
  387. func(deadline time.Time, id int32) error {
  388. return c.writeRequest(offsetCommit, v2, id, request)
  389. },
  390. func(deadline time.Time, size int) error {
  391. return expectZeroSize(func() (remain int, err error) {
  392. return (&response).readFrom(&c.rbuf, size)
  393. }())
  394. },
  395. )
  396. if err != nil {
  397. return offsetCommitResponseV2{}, err
  398. }
  399. for _, r := range response.Responses {
  400. for _, pr := range r.PartitionResponses {
  401. if pr.ErrorCode != 0 {
  402. return offsetCommitResponseV2{}, Error(pr.ErrorCode)
  403. }
  404. }
  405. }
  406. return response, nil
  407. }
  408. // offsetFetch fetches the offsets for the specified topic partitions.
  409. // -1 indicates that there is no offset saved for the partition.
  410. //
  411. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
  412. func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  413. var response offsetFetchResponseV1
  414. err := c.readOperation(
  415. func(deadline time.Time, id int32) error {
  416. return c.writeRequest(offsetFetch, v1, id, request)
  417. },
  418. func(deadline time.Time, size int) error {
  419. return expectZeroSize(func() (remain int, err error) {
  420. return (&response).readFrom(&c.rbuf, size)
  421. }())
  422. },
  423. )
  424. if err != nil {
  425. return offsetFetchResponseV1{}, err
  426. }
  427. for _, r := range response.Responses {
  428. for _, pr := range r.PartitionResponses {
  429. if pr.ErrorCode != 0 {
  430. return offsetFetchResponseV1{}, Error(pr.ErrorCode)
  431. }
  432. }
  433. }
  434. return response, nil
  435. }
  436. // syncGroup completes the handshake to join a consumer group
  437. //
  438. // See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
  439. func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
  440. var response syncGroupResponseV0
  441. err := c.readOperation(
  442. func(deadline time.Time, id int32) error {
  443. return c.writeRequest(syncGroup, v0, id, request)
  444. },
  445. func(deadline time.Time, size int) error {
  446. return expectZeroSize(func() (remain int, err error) {
  447. return (&response).readFrom(&c.rbuf, size)
  448. }())
  449. },
  450. )
  451. if err != nil {
  452. return syncGroupResponseV0{}, err
  453. }
  454. if response.ErrorCode != 0 {
  455. return syncGroupResponseV0{}, Error(response.ErrorCode)
  456. }
  457. return response, nil
  458. }
  459. // Close closes the kafka connection.
  460. func (c *Conn) Close() error {
  461. return c.conn.Close()
  462. }
  463. // LocalAddr returns the local network address.
  464. func (c *Conn) LocalAddr() net.Addr {
  465. return c.conn.LocalAddr()
  466. }
  467. // RemoteAddr returns the remote network address.
  468. func (c *Conn) RemoteAddr() net.Addr {
  469. return c.conn.RemoteAddr()
  470. }
  471. // SetDeadline sets the read and write deadlines associated with the connection.
  472. // It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
  473. //
  474. // A deadline is an absolute time after which I/O operations fail with a timeout
  475. // (see type Error) instead of blocking. The deadline applies to all future and
  476. // pending I/O, not just the immediately following call to Read or Write. After
  477. // a deadline has been exceeded, the connection may be closed if it was found to
  478. // be in an unrecoverable state.
  479. //
  480. // A zero value for t means I/O operations will not time out.
  481. func (c *Conn) SetDeadline(t time.Time) error {
  482. c.rdeadline.setDeadline(t)
  483. c.wdeadline.setDeadline(t)
  484. return nil
  485. }
  486. // SetReadDeadline sets the deadline for future Read calls and any
  487. // currently-blocked Read call.
  488. // A zero value for t means Read will not time out.
  489. func (c *Conn) SetReadDeadline(t time.Time) error {
  490. c.rdeadline.setDeadline(t)
  491. return nil
  492. }
  493. // SetWriteDeadline sets the deadline for future Write calls and any
  494. // currently-blocked Write call.
  495. // Even if write times out, it may return n > 0, indicating that some of the
  496. // data was successfully written.
  497. // A zero value for t means Write will not time out.
  498. func (c *Conn) SetWriteDeadline(t time.Time) error {
  499. c.wdeadline.setDeadline(t)
  500. return nil
  501. }
  502. // Offset returns the current offset of the connection as pair of integers,
  503. // where the first one is an offset value and the second one indicates how
  504. // to interpret it.
  505. //
  506. // See Seek for more details about the offset and whence values.
  507. func (c *Conn) Offset() (offset int64, whence int) {
  508. c.mutex.Lock()
  509. offset = c.offset
  510. c.mutex.Unlock()
  511. switch offset {
  512. case FirstOffset:
  513. offset = 0
  514. whence = SeekStart
  515. case LastOffset:
  516. offset = 0
  517. whence = SeekEnd
  518. default:
  519. whence = SeekAbsolute
  520. }
  521. return
  522. }
  523. const (
  524. SeekStart = 0 // Seek relative to the first offset available in the partition.
  525. SeekAbsolute = 1 // Seek to an absolute offset.
  526. SeekEnd = 2 // Seek relative to the last offset available in the partition.
  527. SeekCurrent = 3 // Seek relative to the current offset.
  528. // This flag may be combined to any of the SeekAbsolute and SeekCurrent
  529. // constants to skip the bound check that the connection would do otherwise.
  530. // Programs can use this flag to avoid making a metadata request to the kafka
  531. // broker to read the current first and last offsets of the partition.
  532. SeekDontCheck = 1 << 30
  533. )
  534. // Seek sets the offset for the next read or write operation according to whence, which
  535. // should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent.
  536. // When seeking relative to the end, the offset is subtracted from the current offset.
  537. // Note that for historical reasons, these do not align with the usual whence constants
  538. // as in lseek(2) or os.Seek.
  539. // The method returns the new absolute offset of the connection.
  540. func (c *Conn) Seek(offset int64, whence int) (int64, error) {
  541. seekDontCheck := (whence & SeekDontCheck) != 0
  542. whence &= ^SeekDontCheck
  543. switch whence {
  544. case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
  545. default:
  546. return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
  547. }
  548. if seekDontCheck {
  549. if whence == SeekAbsolute {
  550. c.mutex.Lock()
  551. c.offset = offset
  552. c.mutex.Unlock()
  553. return offset, nil
  554. }
  555. if whence == SeekCurrent {
  556. c.mutex.Lock()
  557. c.offset += offset
  558. offset = c.offset
  559. c.mutex.Unlock()
  560. return offset, nil
  561. }
  562. }
  563. if whence == SeekAbsolute {
  564. c.mutex.Lock()
  565. unchanged := offset == c.offset
  566. c.mutex.Unlock()
  567. if unchanged {
  568. return offset, nil
  569. }
  570. }
  571. if whence == SeekCurrent {
  572. c.mutex.Lock()
  573. offset = c.offset + offset
  574. c.mutex.Unlock()
  575. }
  576. first, last, err := c.ReadOffsets()
  577. if err != nil {
  578. return 0, err
  579. }
  580. switch whence {
  581. case SeekStart:
  582. offset = first + offset
  583. case SeekEnd:
  584. offset = last - offset
  585. }
  586. if offset < first || offset > last {
  587. return 0, OffsetOutOfRange
  588. }
  589. c.mutex.Lock()
  590. c.offset = offset
  591. c.mutex.Unlock()
  592. return offset, nil
  593. }
  594. // Read reads the message at the current offset from the connection, advancing
  595. // the offset on success so the next call to a read method will produce the next
  596. // message.
  597. // The method returns the number of bytes read, or an error if something went
  598. // wrong.
  599. //
  600. // While it is safe to call Read concurrently from multiple goroutines it may
  601. // be hard for the program to predict the results as the connection offset will
  602. // be read and written by multiple goroutines, they could read duplicates, or
  603. // messages may be seen by only some of the goroutines.
  604. //
  605. // The method fails with io.ErrShortBuffer if the buffer passed as argument is
  606. // too small to hold the message value.
  607. //
  608. // This method is provided to satisfy the net.Conn interface but is much less
  609. // efficient than using the more general purpose ReadBatch method.
  610. func (c *Conn) Read(b []byte) (int, error) {
  611. batch := c.ReadBatch(1, len(b))
  612. n, err := batch.Read(b)
  613. return n, coalesceErrors(silentEOF(err), batch.Close())
  614. }
  615. // ReadMessage reads the message at the current offset from the connection,
  616. // advancing the offset on success so the next call to a read method will
  617. // produce the next message.
  618. //
  619. // Because this method allocate memory buffers for the message key and value
  620. // it is less memory-efficient than Read, but has the advantage of never
  621. // failing with io.ErrShortBuffer.
  622. //
  623. // While it is safe to call Read concurrently from multiple goroutines it may
  624. // be hard for the program to predict the results as the connection offset will
  625. // be read and written by multiple goroutines, they could read duplicates, or
  626. // messages may be seen by only some of the goroutines.
  627. //
  628. // This method is provided for convenience purposes but is much less efficient
  629. // than using the more general purpose ReadBatch method.
  630. func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
  631. batch := c.ReadBatch(1, maxBytes)
  632. msg, err := batch.ReadMessage()
  633. return msg, coalesceErrors(silentEOF(err), batch.Close())
  634. }
  635. // ReadBatch reads a batch of messages from the kafka server. The method always
  636. // returns a non-nil Batch value. If an error occurred, either sending the fetch
  637. // request or reading the response, the error will be made available by the
  638. // returned value of the batch's Close method.
  639. //
  640. // While it is safe to call ReadBatch concurrently from multiple goroutines it
  641. // may be hard for the program to predict the results as the connection offset
  642. // will be read and written by multiple goroutines, they could read duplicates,
  643. // or messages may be seen by only some of the goroutines.
  644. //
  645. // A program doesn't specify the number of messages in wants from a batch, but
  646. // gives the minimum and maximum number of bytes that it wants to receive from
  647. // the kafka server.
  648. func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
  649. return c.ReadBatchWith(ReadBatchConfig{
  650. MinBytes: minBytes,
  651. MaxBytes: maxBytes,
  652. })
  653. }
  654. // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
  655. // with the default values in ReadBatchConfig except for minBytes and maxBytes.
  656. func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
  657. var adjustedDeadline time.Time
  658. var maxFetch = int(c.fetchMaxBytes)
  659. if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
  660. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
  661. }
  662. if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
  663. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
  664. }
  665. if cfg.MinBytes > cfg.MaxBytes {
  666. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
  667. }
  668. offset, whence := c.Offset()
  669. offset, err := c.Seek(offset, whence|SeekDontCheck)
  670. if err != nil {
  671. return &Batch{err: dontExpectEOF(err)}
  672. }
  673. fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
  674. if err != nil {
  675. return &Batch{err: dontExpectEOF(err)}
  676. }
  677. id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
  678. now := time.Now()
  679. var timeout time.Duration
  680. if cfg.MaxWait > 0 {
  681. // explicitly-configured case: no changes are made to the deadline,
  682. // and the timeout is sent exactly as specified.
  683. timeout = cfg.MaxWait
  684. } else {
  685. // default case: use the original logic to adjust the conn's
  686. // deadline.T
  687. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  688. timeout = deadlineToTimeout(deadline, now)
  689. }
  690. // save this variable outside of the closure for later use in detecting
  691. // truncated messages.
  692. adjustedDeadline = deadline
  693. switch fetchVersion {
  694. case v10:
  695. return c.wb.writeFetchRequestV10(
  696. id,
  697. c.clientID,
  698. c.topic,
  699. c.partition,
  700. offset,
  701. cfg.MinBytes,
  702. cfg.MaxBytes+int(c.fetchMinSize),
  703. timeout,
  704. int8(cfg.IsolationLevel),
  705. )
  706. case v5:
  707. return c.wb.writeFetchRequestV5(
  708. id,
  709. c.clientID,
  710. c.topic,
  711. c.partition,
  712. offset,
  713. cfg.MinBytes,
  714. cfg.MaxBytes+int(c.fetchMinSize),
  715. timeout,
  716. int8(cfg.IsolationLevel),
  717. )
  718. default:
  719. return c.wb.writeFetchRequestV2(
  720. id,
  721. c.clientID,
  722. c.topic,
  723. c.partition,
  724. offset,
  725. cfg.MinBytes,
  726. cfg.MaxBytes+int(c.fetchMinSize),
  727. timeout,
  728. )
  729. }
  730. })
  731. if err != nil {
  732. return &Batch{err: dontExpectEOF(err)}
  733. }
  734. _, size, lock, err := c.waitResponse(&c.rdeadline, id)
  735. if err != nil {
  736. return &Batch{err: dontExpectEOF(err)}
  737. }
  738. var throttle int32
  739. var highWaterMark int64
  740. var remain int
  741. switch fetchVersion {
  742. case v10:
  743. throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size)
  744. case v5:
  745. throttle, highWaterMark, remain, err = readFetchResponseHeaderV5(&c.rbuf, size)
  746. default:
  747. throttle, highWaterMark, remain, err = readFetchResponseHeaderV2(&c.rbuf, size)
  748. }
  749. if errors.Is(err, errShortRead) {
  750. err = checkTimeoutErr(adjustedDeadline)
  751. }
  752. var msgs *messageSetReader
  753. if err == nil {
  754. if highWaterMark == offset {
  755. msgs = &messageSetReader{empty: true}
  756. } else {
  757. msgs, err = newMessageSetReader(&c.rbuf, remain)
  758. }
  759. }
  760. if errors.Is(err, errShortRead) {
  761. err = checkTimeoutErr(adjustedDeadline)
  762. }
  763. return &Batch{
  764. conn: c,
  765. msgs: msgs,
  766. deadline: adjustedDeadline,
  767. throttle: makeDuration(throttle),
  768. lock: lock,
  769. topic: c.topic, // topic is copied to Batch to prevent race with Batch.close
  770. partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close
  771. offset: offset,
  772. highWaterMark: highWaterMark,
  773. // there shouldn't be a short read on initially setting up the batch.
  774. // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we
  775. // don't accidentally signal that we successfully reached the end of the
  776. // batch.
  777. err: dontExpectEOF(err),
  778. }
  779. }
  780. // ReadOffset returns the offset of the first message with a timestamp equal or
  781. // greater to t.
  782. func (c *Conn) ReadOffset(t time.Time) (int64, error) {
  783. return c.readOffset(timestamp(t))
  784. }
  785. // ReadFirstOffset returns the first offset available on the connection.
  786. func (c *Conn) ReadFirstOffset() (int64, error) {
  787. return c.readOffset(FirstOffset)
  788. }
  789. // ReadLastOffset returns the last offset available on the connection.
  790. func (c *Conn) ReadLastOffset() (int64, error) {
  791. return c.readOffset(LastOffset)
  792. }
  793. // ReadOffsets returns the absolute first and last offsets of the topic used by
  794. // the connection.
  795. func (c *Conn) ReadOffsets() (first, last int64, err error) {
  796. // We have to submit two different requests to fetch the first and last
  797. // offsets because kafka refuses requests that ask for multiple offsets
  798. // on the same topic and partition.
  799. if first, err = c.ReadFirstOffset(); err != nil {
  800. return
  801. }
  802. if last, err = c.ReadLastOffset(); err != nil {
  803. first = 0 // don't leak the value on error
  804. return
  805. }
  806. return
  807. }
  808. func (c *Conn) readOffset(t int64) (offset int64, err error) {
  809. err = c.readOperation(
  810. func(deadline time.Time, id int32) error {
  811. return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
  812. },
  813. func(deadline time.Time, size int) error {
  814. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  815. // We skip the topic name because we've made a request for
  816. // a single topic.
  817. size, err := discardString(r, size)
  818. if err != nil {
  819. return size, err
  820. }
  821. // Reading the array of partitions, there will be only one
  822. // partition which gives the offset we're looking for.
  823. return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  824. var p partitionOffsetV1
  825. size, err := p.readFrom(r, size)
  826. if err != nil {
  827. return size, err
  828. }
  829. if p.ErrorCode != 0 {
  830. return size, Error(p.ErrorCode)
  831. }
  832. offset = p.Offset
  833. return size, nil
  834. })
  835. }))
  836. },
  837. )
  838. return
  839. }
  840. // ReadPartitions returns the list of available partitions for the given list of
  841. // topics.
  842. //
  843. // If the method is called with no topic, it uses the topic configured on the
  844. // connection. If there are none, the method fetches all partitions of the kafka
  845. // cluster.
  846. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
  847. if len(topics) == 0 {
  848. if len(c.topic) != 0 {
  849. defaultTopics := [...]string{c.topic}
  850. topics = defaultTopics[:]
  851. } else {
  852. // topics needs to be explicitly nil-ed out or the broker will
  853. // interpret it as a request for 0 partitions instead of all.
  854. topics = nil
  855. }
  856. }
  857. metadataVersion, err := c.negotiateVersion(metadata, v1, v6)
  858. if err != nil {
  859. return nil, err
  860. }
  861. err = c.readOperation(
  862. func(deadline time.Time, id int32) error {
  863. switch metadataVersion {
  864. case v6:
  865. return c.writeRequest(metadata, v6, id, topicMetadataRequestV6{Topics: topics, AllowAutoTopicCreation: true})
  866. default:
  867. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
  868. }
  869. },
  870. func(deadline time.Time, size int) error {
  871. partitions, err = c.readPartitionsResponse(metadataVersion, size)
  872. return err
  873. },
  874. )
  875. return
  876. }
  877. func (c *Conn) readPartitionsResponse(metadataVersion apiVersion, size int) ([]Partition, error) {
  878. switch metadataVersion {
  879. case v6:
  880. var res metadataResponseV6
  881. if err := c.readResponse(size, &res); err != nil {
  882. return nil, err
  883. }
  884. brokers := readBrokerMetadata(res.Brokers)
  885. return c.readTopicMetadatav6(brokers, res.Topics)
  886. default:
  887. var res metadataResponseV1
  888. if err := c.readResponse(size, &res); err != nil {
  889. return nil, err
  890. }
  891. brokers := readBrokerMetadata(res.Brokers)
  892. return c.readTopicMetadatav1(brokers, res.Topics)
  893. }
  894. }
  895. func readBrokerMetadata(brokerMetadata []brokerMetadataV1) map[int32]Broker {
  896. brokers := make(map[int32]Broker, len(brokerMetadata))
  897. for _, b := range brokerMetadata {
  898. brokers[b.NodeID] = Broker{
  899. Host: b.Host,
  900. Port: int(b.Port),
  901. ID: int(b.NodeID),
  902. Rack: b.Rack,
  903. }
  904. }
  905. return brokers
  906. }
  907. func (c *Conn) readTopicMetadatav1(brokers map[int32]Broker, topicMetadata []topicMetadataV1) (partitions []Partition, err error) {
  908. for _, t := range topicMetadata {
  909. if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
  910. // We only report errors if they happened for the topic of
  911. // the connection, otherwise the topic will simply have no
  912. // partitions in the result set.
  913. return nil, Error(t.TopicErrorCode)
  914. }
  915. for _, p := range t.Partitions {
  916. partitions = append(partitions, Partition{
  917. Topic: t.TopicName,
  918. Leader: brokers[p.Leader],
  919. Replicas: makeBrokers(brokers, p.Replicas...),
  920. Isr: makeBrokers(brokers, p.Isr...),
  921. ID: int(p.PartitionID),
  922. OfflineReplicas: []Broker{},
  923. })
  924. }
  925. }
  926. return
  927. }
  928. func (c *Conn) readTopicMetadatav6(brokers map[int32]Broker, topicMetadata []topicMetadataV6) (partitions []Partition, err error) {
  929. for _, t := range topicMetadata {
  930. if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
  931. // We only report errors if they happened for the topic of
  932. // the connection, otherwise the topic will simply have no
  933. // partitions in the result set.
  934. return nil, Error(t.TopicErrorCode)
  935. }
  936. for _, p := range t.Partitions {
  937. partitions = append(partitions, Partition{
  938. Topic: t.TopicName,
  939. Leader: brokers[p.Leader],
  940. Replicas: makeBrokers(brokers, p.Replicas...),
  941. Isr: makeBrokers(brokers, p.Isr...),
  942. ID: int(p.PartitionID),
  943. OfflineReplicas: makeBrokers(brokers, p.OfflineReplicas...),
  944. })
  945. }
  946. }
  947. return
  948. }
  949. func makeBrokers(brokers map[int32]Broker, ids ...int32) []Broker {
  950. b := make([]Broker, len(ids))
  951. for i, id := range ids {
  952. br, ok := brokers[id]
  953. if !ok {
  954. // When the broker id isn't found in the current list of known
  955. // brokers, use a placeholder to report that the cluster has
  956. // logical knowledge of the broker but no information about the
  957. // physical host where it is running.
  958. br.ID = int(id)
  959. }
  960. b[i] = br
  961. }
  962. return b
  963. }
  964. // Write writes a message to the kafka broker that this connection was
  965. // established to. The method returns the number of bytes written, or an error
  966. // if something went wrong.
  967. //
  968. // The operation either succeeds or fail, it never partially writes the message.
  969. //
  970. // This method is exposed to satisfy the net.Conn interface but is less efficient
  971. // than the more general purpose WriteMessages method.
  972. func (c *Conn) Write(b []byte) (int, error) {
  973. return c.WriteCompressedMessages(nil, Message{Value: b})
  974. }
  975. // WriteMessages writes a batch of messages to the connection's topic and
  976. // partition, returning the number of bytes written. The write is an atomic
  977. // operation, it either fully succeeds or fails.
  978. func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
  979. return c.WriteCompressedMessages(nil, msgs...)
  980. }
  981. // WriteCompressedMessages writes a batch of messages to the connection's topic
  982. // and partition, returning the number of bytes written. The write is an atomic
  983. // operation, it either fully succeeds or fails.
  984. //
  985. // If the compression codec is not nil, the messages will be compressed.
  986. func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) {
  987. nbytes, _, _, _, err = c.writeCompressedMessages(codec, msgs...)
  988. return
  989. }
  990. // WriteCompressedMessagesAt writes a batch of messages to the connection's topic
  991. // and partition, returning the number of bytes written, partition and offset numbers
  992. // and timestamp assigned by the kafka broker to the message set. The write is an atomic
  993. // operation, it either fully succeeds or fails.
  994. //
  995. // If the compression codec is not nil, the messages will be compressed.
  996. func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  997. return c.writeCompressedMessages(codec, msgs...)
  998. }
  999. func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  1000. if len(msgs) == 0 {
  1001. return
  1002. }
  1003. writeTime := time.Now()
  1004. for i, msg := range msgs {
  1005. // users may believe they can set the Topic and/or Partition
  1006. // on the kafka message.
  1007. if msg.Topic != "" && msg.Topic != c.topic {
  1008. err = errInvalidWriteTopic
  1009. return
  1010. }
  1011. if msg.Partition != 0 {
  1012. err = errInvalidWritePartition
  1013. return
  1014. }
  1015. if msg.Time.IsZero() {
  1016. msgs[i].Time = writeTime
  1017. }
  1018. nbytes += len(msg.Key) + len(msg.Value)
  1019. }
  1020. var produceVersion apiVersion
  1021. if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
  1022. return
  1023. }
  1024. err = c.writeOperation(
  1025. func(deadline time.Time, id int32) error {
  1026. now := time.Now()
  1027. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  1028. switch produceVersion {
  1029. case v7:
  1030. recordBatch, err :=
  1031. newRecordBatch(
  1032. codec,
  1033. msgs...,
  1034. )
  1035. if err != nil {
  1036. return err
  1037. }
  1038. return c.wb.writeProduceRequestV7(
  1039. id,
  1040. c.clientID,
  1041. c.topic,
  1042. c.partition,
  1043. deadlineToTimeout(deadline, now),
  1044. int16(atomic.LoadInt32(&c.requiredAcks)),
  1045. c.transactionalID,
  1046. recordBatch,
  1047. )
  1048. case v3:
  1049. recordBatch, err :=
  1050. newRecordBatch(
  1051. codec,
  1052. msgs...,
  1053. )
  1054. if err != nil {
  1055. return err
  1056. }
  1057. return c.wb.writeProduceRequestV3(
  1058. id,
  1059. c.clientID,
  1060. c.topic,
  1061. c.partition,
  1062. deadlineToTimeout(deadline, now),
  1063. int16(atomic.LoadInt32(&c.requiredAcks)),
  1064. c.transactionalID,
  1065. recordBatch,
  1066. )
  1067. default:
  1068. return c.wb.writeProduceRequestV2(
  1069. codec,
  1070. id,
  1071. c.clientID,
  1072. c.topic,
  1073. c.partition,
  1074. deadlineToTimeout(deadline, now),
  1075. int16(atomic.LoadInt32(&c.requiredAcks)),
  1076. msgs...,
  1077. )
  1078. }
  1079. },
  1080. func(deadline time.Time, size int) error {
  1081. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  1082. // Skip the topic, we've produced the message to only one topic,
  1083. // no need to waste resources loading it in memory.
  1084. size, err := discardString(r, size)
  1085. if err != nil {
  1086. return size, err
  1087. }
  1088. // Read the list of partitions, there should be only one since
  1089. // we've produced a message to a single partition.
  1090. size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  1091. switch produceVersion {
  1092. case v7:
  1093. var p produceResponsePartitionV7
  1094. size, err := p.readFrom(r, size)
  1095. if err == nil && p.ErrorCode != 0 {
  1096. err = Error(p.ErrorCode)
  1097. }
  1098. if err == nil {
  1099. partition = p.Partition
  1100. offset = p.Offset
  1101. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1102. }
  1103. return size, err
  1104. default:
  1105. var p produceResponsePartitionV2
  1106. size, err := p.readFrom(r, size)
  1107. if err == nil && p.ErrorCode != 0 {
  1108. err = Error(p.ErrorCode)
  1109. }
  1110. if err == nil {
  1111. partition = p.Partition
  1112. offset = p.Offset
  1113. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1114. }
  1115. return size, err
  1116. }
  1117. })
  1118. if err != nil {
  1119. return size, err
  1120. }
  1121. // The response is trailed by the throttle time, also skipping
  1122. // since it's not interesting here.
  1123. return discardInt32(r, size)
  1124. }))
  1125. },
  1126. )
  1127. if err != nil {
  1128. nbytes = 0
  1129. }
  1130. return
  1131. }
  1132. // SetRequiredAcks sets the number of acknowledges from replicas that the
  1133. // connection requests when producing messages.
  1134. func (c *Conn) SetRequiredAcks(n int) error {
  1135. switch n {
  1136. case -1, 1:
  1137. atomic.StoreInt32(&c.requiredAcks, int32(n))
  1138. return nil
  1139. default:
  1140. return InvalidRequiredAcks
  1141. }
  1142. }
  1143. func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
  1144. hdr := c.requestHeader(apiKey, apiVersion, correlationID)
  1145. hdr.Size = (hdr.size() + req.size()) - 4
  1146. hdr.writeTo(&c.wb)
  1147. req.writeTo(&c.wb)
  1148. return c.wbuf.Flush()
  1149. }
  1150. func (c *Conn) readResponse(size int, res interface{}) error {
  1151. size, err := read(&c.rbuf, size, res)
  1152. if err != nil {
  1153. var kafkaError Error
  1154. if errors.As(err, &kafkaError) {
  1155. size, err = discardN(&c.rbuf, size, size)
  1156. }
  1157. }
  1158. return expectZeroSize(size, err)
  1159. }
  1160. func (c *Conn) peekResponseSizeAndID() (int32, int32, error) {
  1161. b, err := c.rbuf.Peek(8)
  1162. if err != nil {
  1163. return 0, 0, err
  1164. }
  1165. size, id := makeInt32(b[:4]), makeInt32(b[4:])
  1166. return size, id, nil
  1167. }
  1168. func (c *Conn) skipResponseSizeAndID() {
  1169. c.rbuf.Discard(8)
  1170. }
  1171. func (c *Conn) readDeadline() time.Time {
  1172. return c.rdeadline.deadline()
  1173. }
  1174. func (c *Conn) writeDeadline() time.Time {
  1175. return c.wdeadline.deadline()
  1176. }
  1177. func (c *Conn) readOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1178. return c.do(&c.rdeadline, write, read)
  1179. }
  1180. func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1181. return c.do(&c.wdeadline, write, read)
  1182. }
  1183. func (c *Conn) enter() {
  1184. atomic.AddInt32(&c.inflight, +1)
  1185. }
  1186. func (c *Conn) leave() {
  1187. atomic.AddInt32(&c.inflight, -1)
  1188. }
  1189. func (c *Conn) concurrency() int {
  1190. return int(atomic.LoadInt32(&c.inflight))
  1191. }
  1192. func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1193. id, err := c.doRequest(d, write)
  1194. if err != nil {
  1195. return err
  1196. }
  1197. deadline, size, lock, err := c.waitResponse(d, id)
  1198. if err != nil {
  1199. return err
  1200. }
  1201. if err = read(deadline, size); err != nil {
  1202. var kafkaError Error
  1203. if !errors.As(err, &kafkaError) {
  1204. c.conn.Close()
  1205. }
  1206. }
  1207. d.unsetConnReadDeadline()
  1208. lock.Unlock()
  1209. return err
  1210. }
  1211. func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
  1212. c.enter()
  1213. c.wlock.Lock()
  1214. c.correlationID++
  1215. id = c.correlationID
  1216. err = write(d.setConnWriteDeadline(c.conn), id)
  1217. d.unsetConnWriteDeadline()
  1218. if err != nil {
  1219. // When an error occurs there's no way to know if the connection is in a
  1220. // recoverable state so we're better off just giving up at this point to
  1221. // avoid any risk of corrupting the following operations.
  1222. c.conn.Close()
  1223. c.leave()
  1224. }
  1225. c.wlock.Unlock()
  1226. return
  1227. }
  1228. func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
  1229. for {
  1230. var rsz int32
  1231. var rid int32
  1232. c.rlock.Lock()
  1233. deadline = d.setConnReadDeadline(c.conn)
  1234. rsz, rid, err = c.peekResponseSizeAndID()
  1235. if err != nil {
  1236. d.unsetConnReadDeadline()
  1237. c.conn.Close()
  1238. c.rlock.Unlock()
  1239. break
  1240. }
  1241. if id == rid {
  1242. c.skipResponseSizeAndID()
  1243. size, lock = int(rsz-4), &c.rlock
  1244. // Don't unlock the read mutex to yield ownership to the caller.
  1245. break
  1246. }
  1247. if c.concurrency() == 1 {
  1248. // If the goroutine is the only one waiting on this connection it
  1249. // should be impossible to read a correlation id different from the
  1250. // one it expects. This is a sign that the data we are reading on
  1251. // the wire is corrupted and the connection needs to be closed.
  1252. err = io.ErrNoProgress
  1253. c.rlock.Unlock()
  1254. break
  1255. }
  1256. // Optimistically release the read lock if a response has already
  1257. // been received but the current operation is not the target for it.
  1258. c.rlock.Unlock()
  1259. }
  1260. c.leave()
  1261. return
  1262. }
  1263. func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader {
  1264. return requestHeader{
  1265. ApiKey: int16(apiKey),
  1266. ApiVersion: int16(apiVersion),
  1267. CorrelationID: correlationID,
  1268. ClientID: c.clientID,
  1269. }
  1270. }
  1271. func (c *Conn) ApiVersions() ([]ApiVersion, error) {
  1272. deadline := &c.rdeadline
  1273. if deadline.deadline().IsZero() {
  1274. // ApiVersions is called automatically when API version negotiation
  1275. // needs to happen, so we are not guaranteed that a read deadline has
  1276. // been set yet. Fallback to use the write deadline in case it was
  1277. // set, for example when version negotiation is initiated during a
  1278. // produce request.
  1279. deadline = &c.wdeadline
  1280. }
  1281. id, err := c.doRequest(deadline, func(_ time.Time, id int32) error {
  1282. h := requestHeader{
  1283. ApiKey: int16(apiVersions),
  1284. ApiVersion: int16(v0),
  1285. CorrelationID: id,
  1286. ClientID: c.clientID,
  1287. }
  1288. h.Size = (h.size() - 4)
  1289. h.writeTo(&c.wb)
  1290. return c.wbuf.Flush()
  1291. })
  1292. if err != nil {
  1293. return nil, err
  1294. }
  1295. _, size, lock, err := c.waitResponse(deadline, id)
  1296. if err != nil {
  1297. return nil, err
  1298. }
  1299. defer lock.Unlock()
  1300. var errorCode int16
  1301. if size, err = readInt16(&c.rbuf, size, &errorCode); err != nil {
  1302. return nil, err
  1303. }
  1304. var arrSize int32
  1305. if size, err = readInt32(&c.rbuf, size, &arrSize); err != nil {
  1306. return nil, err
  1307. }
  1308. r := make([]ApiVersion, arrSize)
  1309. for i := 0; i < int(arrSize); i++ {
  1310. if size, err = readInt16(&c.rbuf, size, &r[i].ApiKey); err != nil {
  1311. return nil, err
  1312. }
  1313. if size, err = readInt16(&c.rbuf, size, &r[i].MinVersion); err != nil {
  1314. return nil, err
  1315. }
  1316. if size, err = readInt16(&c.rbuf, size, &r[i].MaxVersion); err != nil {
  1317. return nil, err
  1318. }
  1319. }
  1320. if errorCode != 0 {
  1321. return r, Error(errorCode)
  1322. }
  1323. return r, nil
  1324. }
  1325. // connDeadline is a helper type to implement read/write deadline management on
  1326. // the kafka connection.
  1327. type connDeadline struct {
  1328. mutex sync.Mutex
  1329. value time.Time
  1330. rconn net.Conn
  1331. wconn net.Conn
  1332. }
  1333. func (d *connDeadline) deadline() time.Time {
  1334. d.mutex.Lock()
  1335. t := d.value
  1336. d.mutex.Unlock()
  1337. return t
  1338. }
  1339. func (d *connDeadline) setDeadline(t time.Time) {
  1340. d.mutex.Lock()
  1341. d.value = t
  1342. if d.rconn != nil {
  1343. d.rconn.SetReadDeadline(t)
  1344. }
  1345. if d.wconn != nil {
  1346. d.wconn.SetWriteDeadline(t)
  1347. }
  1348. d.mutex.Unlock()
  1349. }
  1350. func (d *connDeadline) setConnReadDeadline(conn net.Conn) time.Time {
  1351. d.mutex.Lock()
  1352. deadline := d.value
  1353. d.rconn = conn
  1354. d.rconn.SetReadDeadline(deadline)
  1355. d.mutex.Unlock()
  1356. return deadline
  1357. }
  1358. func (d *connDeadline) setConnWriteDeadline(conn net.Conn) time.Time {
  1359. d.mutex.Lock()
  1360. deadline := d.value
  1361. d.wconn = conn
  1362. d.wconn.SetWriteDeadline(deadline)
  1363. d.mutex.Unlock()
  1364. return deadline
  1365. }
  1366. func (d *connDeadline) unsetConnReadDeadline() {
  1367. d.mutex.Lock()
  1368. d.rconn = nil
  1369. d.mutex.Unlock()
  1370. }
  1371. func (d *connDeadline) unsetConnWriteDeadline() {
  1372. d.mutex.Lock()
  1373. d.wconn = nil
  1374. d.mutex.Unlock()
  1375. }
  1376. // saslHandshake sends the SASL handshake message. This will determine whether
  1377. // the Mechanism is supported by the cluster. If it's not, this function will
  1378. // error out with UnsupportedSASLMechanism.
  1379. //
  1380. // If the mechanism is unsupported, the handshake request will reply with the
  1381. // list of the cluster's configured mechanisms, which could potentially be used
  1382. // to facilitate negotiation. At the moment, we are not negotiating the
  1383. // mechanism as we believe that brokers are usually known to the client, and
  1384. // therefore the client should already know which mechanisms are supported.
  1385. //
  1386. // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
  1387. func (c *Conn) saslHandshake(mechanism string) error {
  1388. // The wire format for V0 and V1 is identical, but the version
  1389. // number will affect how the SASL authentication
  1390. // challenge/responses are sent
  1391. var resp saslHandshakeResponseV0
  1392. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1393. if err != nil {
  1394. return err
  1395. }
  1396. err = c.writeOperation(
  1397. func(deadline time.Time, id int32) error {
  1398. return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
  1399. },
  1400. func(deadline time.Time, size int) error {
  1401. return expectZeroSize(func() (int, error) {
  1402. return (&resp).readFrom(&c.rbuf, size)
  1403. }())
  1404. },
  1405. )
  1406. if err == nil && resp.ErrorCode != 0 {
  1407. err = Error(resp.ErrorCode)
  1408. }
  1409. return err
  1410. }
  1411. // saslAuthenticate sends the SASL authenticate message. This function must
  1412. // be immediately preceded by a successful saslHandshake.
  1413. //
  1414. // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
  1415. func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
  1416. // if we sent a v1 handshake, then we must encapsulate the authentication
  1417. // request in a saslAuthenticateRequest. otherwise, we read and write raw
  1418. // bytes.
  1419. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1420. if err != nil {
  1421. return nil, err
  1422. }
  1423. if version == v1 {
  1424. var request = saslAuthenticateRequestV0{Data: data}
  1425. var response saslAuthenticateResponseV0
  1426. err := c.writeOperation(
  1427. func(deadline time.Time, id int32) error {
  1428. return c.writeRequest(saslAuthenticate, v0, id, request)
  1429. },
  1430. func(deadline time.Time, size int) error {
  1431. return expectZeroSize(func() (remain int, err error) {
  1432. return (&response).readFrom(&c.rbuf, size)
  1433. }())
  1434. },
  1435. )
  1436. if err == nil && response.ErrorCode != 0 {
  1437. err = Error(response.ErrorCode)
  1438. }
  1439. return response.Data, err
  1440. }
  1441. // fall back to opaque bytes on the wire. the broker is expecting these if
  1442. // it just processed a v0 sasl handshake.
  1443. c.wb.writeInt32(int32(len(data)))
  1444. if _, err := c.wb.Write(data); err != nil {
  1445. return nil, err
  1446. }
  1447. if err := c.wb.Flush(); err != nil {
  1448. return nil, err
  1449. }
  1450. var respLen int32
  1451. if _, err := readInt32(&c.rbuf, 4, &respLen); err != nil {
  1452. return nil, err
  1453. }
  1454. resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
  1455. return resp, err
  1456. }