protocol.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. package protocol
  2. import (
  3. "errors"
  4. "fmt"
  5. "io"
  6. "net"
  7. "reflect"
  8. "strconv"
  9. "strings"
  10. )
  11. // Message is an interface implemented by all request and response types of the
  12. // kafka protocol.
  13. //
  14. // This interface is used mostly as a safe-guard to provide a compile-time check
  15. // for values passed to functions dealing kafka message types.
  16. type Message interface {
  17. ApiKey() ApiKey
  18. }
  19. type ApiKey int16
  20. func (k ApiKey) String() string {
  21. if i := int(k); i >= 0 && i < len(apiNames) {
  22. return apiNames[i]
  23. }
  24. return strconv.Itoa(int(k))
  25. }
  26. func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() }
  27. func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() }
  28. func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 {
  29. min := k.MinVersion()
  30. max := k.MaxVersion()
  31. switch {
  32. case min > maxVersion:
  33. return min
  34. case max < maxVersion:
  35. return max
  36. default:
  37. return maxVersion
  38. }
  39. }
  40. func (k ApiKey) apiType() apiType {
  41. if i := int(k); i >= 0 && i < len(apiTypes) {
  42. return apiTypes[i]
  43. }
  44. return apiType{}
  45. }
  46. const (
  47. Produce ApiKey = 0
  48. Fetch ApiKey = 1
  49. ListOffsets ApiKey = 2
  50. Metadata ApiKey = 3
  51. LeaderAndIsr ApiKey = 4
  52. StopReplica ApiKey = 5
  53. UpdateMetadata ApiKey = 6
  54. ControlledShutdown ApiKey = 7
  55. OffsetCommit ApiKey = 8
  56. OffsetFetch ApiKey = 9
  57. FindCoordinator ApiKey = 10
  58. JoinGroup ApiKey = 11
  59. Heartbeat ApiKey = 12
  60. LeaveGroup ApiKey = 13
  61. SyncGroup ApiKey = 14
  62. DescribeGroups ApiKey = 15
  63. ListGroups ApiKey = 16
  64. SaslHandshake ApiKey = 17
  65. ApiVersions ApiKey = 18
  66. CreateTopics ApiKey = 19
  67. DeleteTopics ApiKey = 20
  68. DeleteRecords ApiKey = 21
  69. InitProducerId ApiKey = 22
  70. OffsetForLeaderEpoch ApiKey = 23
  71. AddPartitionsToTxn ApiKey = 24
  72. AddOffsetsToTxn ApiKey = 25
  73. EndTxn ApiKey = 26
  74. WriteTxnMarkers ApiKey = 27
  75. TxnOffsetCommit ApiKey = 28
  76. DescribeAcls ApiKey = 29
  77. CreateAcls ApiKey = 30
  78. DeleteAcls ApiKey = 31
  79. DescribeConfigs ApiKey = 32
  80. AlterConfigs ApiKey = 33
  81. AlterReplicaLogDirs ApiKey = 34
  82. DescribeLogDirs ApiKey = 35
  83. SaslAuthenticate ApiKey = 36
  84. CreatePartitions ApiKey = 37
  85. CreateDelegationToken ApiKey = 38
  86. RenewDelegationToken ApiKey = 39
  87. ExpireDelegationToken ApiKey = 40
  88. DescribeDelegationToken ApiKey = 41
  89. DeleteGroups ApiKey = 42
  90. ElectLeaders ApiKey = 43
  91. IncrementalAlterConfigs ApiKey = 44
  92. AlterPartitionReassignments ApiKey = 45
  93. ListPartitionReassignments ApiKey = 46
  94. OffsetDelete ApiKey = 47
  95. DescribeClientQuotas ApiKey = 48
  96. AlterClientQuotas ApiKey = 49
  97. DescribeUserScramCredentials ApiKey = 50
  98. AlterUserScramCredentials ApiKey = 51
  99. numApis = 52
  100. )
  101. var apiNames = [numApis]string{
  102. Produce: "Produce",
  103. Fetch: "Fetch",
  104. ListOffsets: "ListOffsets",
  105. Metadata: "Metadata",
  106. LeaderAndIsr: "LeaderAndIsr",
  107. StopReplica: "StopReplica",
  108. UpdateMetadata: "UpdateMetadata",
  109. ControlledShutdown: "ControlledShutdown",
  110. OffsetCommit: "OffsetCommit",
  111. OffsetFetch: "OffsetFetch",
  112. FindCoordinator: "FindCoordinator",
  113. JoinGroup: "JoinGroup",
  114. Heartbeat: "Heartbeat",
  115. LeaveGroup: "LeaveGroup",
  116. SyncGroup: "SyncGroup",
  117. DescribeGroups: "DescribeGroups",
  118. ListGroups: "ListGroups",
  119. SaslHandshake: "SaslHandshake",
  120. ApiVersions: "ApiVersions",
  121. CreateTopics: "CreateTopics",
  122. DeleteTopics: "DeleteTopics",
  123. DeleteRecords: "DeleteRecords",
  124. InitProducerId: "InitProducerId",
  125. OffsetForLeaderEpoch: "OffsetForLeaderEpoch",
  126. AddPartitionsToTxn: "AddPartitionsToTxn",
  127. AddOffsetsToTxn: "AddOffsetsToTxn",
  128. EndTxn: "EndTxn",
  129. WriteTxnMarkers: "WriteTxnMarkers",
  130. TxnOffsetCommit: "TxnOffsetCommit",
  131. DescribeAcls: "DescribeAcls",
  132. CreateAcls: "CreateAcls",
  133. DeleteAcls: "DeleteAcls",
  134. DescribeConfigs: "DescribeConfigs",
  135. AlterConfigs: "AlterConfigs",
  136. AlterReplicaLogDirs: "AlterReplicaLogDirs",
  137. DescribeLogDirs: "DescribeLogDirs",
  138. SaslAuthenticate: "SaslAuthenticate",
  139. CreatePartitions: "CreatePartitions",
  140. CreateDelegationToken: "CreateDelegationToken",
  141. RenewDelegationToken: "RenewDelegationToken",
  142. ExpireDelegationToken: "ExpireDelegationToken",
  143. DescribeDelegationToken: "DescribeDelegationToken",
  144. DeleteGroups: "DeleteGroups",
  145. ElectLeaders: "ElectLeaders",
  146. IncrementalAlterConfigs: "IncrementalAlterConfigs",
  147. AlterPartitionReassignments: "AlterPartitionReassignments",
  148. ListPartitionReassignments: "ListPartitionReassignments",
  149. OffsetDelete: "OffsetDelete",
  150. DescribeClientQuotas: "DescribeClientQuotas",
  151. AlterClientQuotas: "AlterClientQuotas",
  152. DescribeUserScramCredentials: "DescribeUserScramCredentials",
  153. AlterUserScramCredentials: "AlterUserScramCredentials",
  154. }
  155. type messageType struct {
  156. version int16
  157. flexible bool
  158. gotype reflect.Type
  159. decode decodeFunc
  160. encode encodeFunc
  161. }
  162. func (t *messageType) new() Message {
  163. return reflect.New(t.gotype).Interface().(Message)
  164. }
  165. type apiType struct {
  166. requests []messageType
  167. responses []messageType
  168. }
  169. func (t apiType) minVersion() int16 {
  170. if len(t.requests) == 0 {
  171. return 0
  172. }
  173. return t.requests[0].version
  174. }
  175. func (t apiType) maxVersion() int16 {
  176. if len(t.requests) == 0 {
  177. return 0
  178. }
  179. return t.requests[len(t.requests)-1].version
  180. }
  181. var apiTypes [numApis]apiType
  182. // Register is automatically called by sub-packages are imported to install a
  183. // new pair of request/response message types.
  184. func Register(req, res Message) {
  185. k1 := req.ApiKey()
  186. k2 := res.ApiKey()
  187. if k1 != k2 {
  188. panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
  189. }
  190. apiTypes[k1] = apiType{
  191. requests: typesOf(req),
  192. responses: typesOf(res),
  193. }
  194. }
  195. // OverrideTypeMessage is an interface implemented by messages that want to override the standard
  196. // request/response types for a given API.
  197. type OverrideTypeMessage interface {
  198. TypeKey() OverrideTypeKey
  199. }
  200. type OverrideTypeKey int16
  201. const (
  202. RawProduceOverride OverrideTypeKey = 0
  203. )
  204. var overrideApiTypes [numApis]map[OverrideTypeKey]apiType
  205. func RegisterOverride(req, res Message, key OverrideTypeKey) {
  206. k1 := req.ApiKey()
  207. k2 := res.ApiKey()
  208. if k1 != k2 {
  209. panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
  210. }
  211. if overrideApiTypes[k1] == nil {
  212. overrideApiTypes[k1] = make(map[OverrideTypeKey]apiType)
  213. }
  214. overrideApiTypes[k1][key] = apiType{
  215. requests: typesOf(req),
  216. responses: typesOf(res),
  217. }
  218. }
  219. func typesOf(v interface{}) []messageType {
  220. return makeTypes(reflect.TypeOf(v).Elem())
  221. }
  222. func makeTypes(t reflect.Type) []messageType {
  223. minVersion := int16(-1)
  224. maxVersion := int16(-1)
  225. // All future versions will be flexible (according to spec), so don't need to
  226. // worry about maxes here.
  227. minFlexibleVersion := int16(-1)
  228. forEachStructField(t, func(_ reflect.Type, _ index, tag string) {
  229. forEachStructTag(tag, func(tag structTag) bool {
  230. if minVersion < 0 || tag.MinVersion < minVersion {
  231. minVersion = tag.MinVersion
  232. }
  233. if maxVersion < 0 || tag.MaxVersion > maxVersion {
  234. maxVersion = tag.MaxVersion
  235. }
  236. if tag.TagID > -2 && (minFlexibleVersion < 0 || tag.MinVersion < minFlexibleVersion) {
  237. minFlexibleVersion = tag.MinVersion
  238. }
  239. return true
  240. })
  241. })
  242. types := make([]messageType, 0, (maxVersion-minVersion)+1)
  243. for v := minVersion; v <= maxVersion; v++ {
  244. flexible := minFlexibleVersion >= 0 && v >= minFlexibleVersion
  245. types = append(types, messageType{
  246. version: v,
  247. gotype: t,
  248. flexible: flexible,
  249. decode: decodeFuncOf(t, v, flexible, structTag{}),
  250. encode: encodeFuncOf(t, v, flexible, structTag{}),
  251. })
  252. }
  253. return types
  254. }
  255. type structTag struct {
  256. MinVersion int16
  257. MaxVersion int16
  258. Compact bool
  259. Nullable bool
  260. TagID int
  261. }
  262. func forEachStructTag(tag string, do func(structTag) bool) {
  263. if tag == "-" {
  264. return // special case to ignore the field
  265. }
  266. forEach(tag, '|', func(s string) bool {
  267. tag := structTag{
  268. MinVersion: -1,
  269. MaxVersion: -1,
  270. // Legitimate tag IDs can start at 0. We use -1 as a placeholder to indicate
  271. // that the message type is flexible, so that leaves -2 as the default for
  272. // indicating that there is no tag ID and the message is not flexible.
  273. TagID: -2,
  274. }
  275. var err error
  276. forEach(s, ',', func(s string) bool {
  277. switch {
  278. case strings.HasPrefix(s, "min="):
  279. tag.MinVersion, err = parseVersion(s[4:])
  280. case strings.HasPrefix(s, "max="):
  281. tag.MaxVersion, err = parseVersion(s[4:])
  282. case s == "tag":
  283. tag.TagID = -1
  284. case strings.HasPrefix(s, "tag="):
  285. tag.TagID, err = strconv.Atoi(s[4:])
  286. case s == "compact":
  287. tag.Compact = true
  288. case s == "nullable":
  289. tag.Nullable = true
  290. default:
  291. err = fmt.Errorf("unrecognized option: %q", s)
  292. }
  293. return err == nil
  294. })
  295. if err != nil {
  296. panic(fmt.Errorf("malformed struct tag: %w", err))
  297. }
  298. if tag.MinVersion < 0 && tag.MaxVersion >= 0 {
  299. panic(fmt.Errorf("missing minimum version in struct tag: %q", s))
  300. }
  301. if tag.MaxVersion < 0 && tag.MinVersion >= 0 {
  302. panic(fmt.Errorf("missing maximum version in struct tag: %q", s))
  303. }
  304. if tag.MinVersion > tag.MaxVersion {
  305. panic(fmt.Errorf("invalid version range in struct tag: %q", s))
  306. }
  307. return do(tag)
  308. })
  309. }
  310. func forEach(s string, sep byte, do func(string) bool) bool {
  311. for len(s) != 0 {
  312. p := ""
  313. i := strings.IndexByte(s, sep)
  314. if i < 0 {
  315. p, s = s, ""
  316. } else {
  317. p, s = s[:i], s[i+1:]
  318. }
  319. if !do(p) {
  320. return false
  321. }
  322. }
  323. return true
  324. }
  325. func forEachStructField(t reflect.Type, do func(reflect.Type, index, string)) {
  326. for i, n := 0, t.NumField(); i < n; i++ {
  327. f := t.Field(i)
  328. if f.PkgPath != "" && f.Name != "_" {
  329. continue
  330. }
  331. kafkaTag, ok := f.Tag.Lookup("kafka")
  332. if !ok {
  333. kafkaTag = "|"
  334. }
  335. do(f.Type, indexOf(f), kafkaTag)
  336. }
  337. }
  338. func parseVersion(s string) (int16, error) {
  339. if !strings.HasPrefix(s, "v") {
  340. return 0, fmt.Errorf("invalid version number: %q", s)
  341. }
  342. i, err := strconv.ParseInt(s[1:], 10, 16)
  343. if err != nil {
  344. return 0, fmt.Errorf("invalid version number: %q: %w", s, err)
  345. }
  346. if i < 0 {
  347. return 0, fmt.Errorf("invalid negative version number: %q", s)
  348. }
  349. return int16(i), nil
  350. }
  351. func dontExpectEOF(err error) error {
  352. if err != nil {
  353. if errors.Is(err, io.EOF) {
  354. return io.ErrUnexpectedEOF
  355. }
  356. return err
  357. }
  358. return nil
  359. }
  360. type Broker struct {
  361. Rack string
  362. Host string
  363. Port int32
  364. ID int32
  365. }
  366. func (b Broker) String() string {
  367. return net.JoinHostPort(b.Host, itoa(b.Port))
  368. }
  369. func (b Broker) Format(w fmt.State, v rune) {
  370. switch v {
  371. case 'd':
  372. io.WriteString(w, itoa(b.ID))
  373. case 's':
  374. io.WriteString(w, b.String())
  375. case 'v':
  376. io.WriteString(w, itoa(b.ID))
  377. io.WriteString(w, " ")
  378. io.WriteString(w, b.String())
  379. if b.Rack != "" {
  380. io.WriteString(w, " ")
  381. io.WriteString(w, b.Rack)
  382. }
  383. }
  384. }
  385. func itoa(i int32) string {
  386. return strconv.Itoa(int(i))
  387. }
  388. type Topic struct {
  389. Name string
  390. Error int16
  391. Partitions map[int32]Partition
  392. }
  393. type Partition struct {
  394. ID int32
  395. Error int16
  396. Leader int32
  397. Replicas []int32
  398. ISR []int32
  399. Offline []int32
  400. }
  401. // RawExchanger is an extention to the Message interface to allow messages
  402. // to control the request response cycle for the message. This is currently
  403. // only used to facilitate v0 SASL Authenticate requests being written in
  404. // a non-standard fashion when the SASL Handshake was done at v0 but not
  405. // when done at v1.
  406. type RawExchanger interface {
  407. // Required should return true when a RawExchange is needed.
  408. // The passed in versions are the negotiated versions for the connection
  409. // performing the request.
  410. Required(versions map[ApiKey]int16) bool
  411. // RawExchange is given the raw connection to the broker and the Message
  412. // is responsible for writing itself to the connection as well as reading
  413. // the response.
  414. RawExchange(rw io.ReadWriter) (Message, error)
  415. }
  416. // BrokerMessage is an extension of the Message interface implemented by some
  417. // request types to customize the broker assignment logic.
  418. type BrokerMessage interface {
  419. // Given a representation of the kafka cluster state as argument, returns
  420. // the broker that the message should be routed to.
  421. Broker(Cluster) (Broker, error)
  422. }
  423. // GroupMessage is an extension of the Message interface implemented by some
  424. // request types to inform the program that they should be routed to a group
  425. // coordinator.
  426. type GroupMessage interface {
  427. // Returns the group configured on the message.
  428. Group() string
  429. }
  430. // TransactionalMessage is an extension of the Message interface implemented by some
  431. // request types to inform the program that they should be routed to a transaction
  432. // coordinator.
  433. type TransactionalMessage interface {
  434. // Returns the transactional id configured on the message.
  435. Transaction() string
  436. }
  437. // PreparedMessage is an extension of the Message interface implemented by some
  438. // request types which may need to run some pre-processing on their state before
  439. // being sent.
  440. type PreparedMessage interface {
  441. // Prepares the message before being sent to a kafka broker using the API
  442. // version passed as argument.
  443. Prepare(apiVersion int16)
  444. }
  445. // Splitter is an interface implemented by messages that can be split into
  446. // multiple requests and have their results merged back by a Merger.
  447. type Splitter interface {
  448. // For a given cluster layout, returns the list of messages constructed
  449. // from the receiver for each requests that should be sent to the cluster.
  450. // The second return value is a Merger which can be used to merge back the
  451. // results of each request into a single message (or an error).
  452. Split(Cluster) ([]Message, Merger, error)
  453. }
  454. // Merger is an interface implemented by messages which can merge multiple
  455. // results into one response.
  456. type Merger interface {
  457. // Given a list of message and associated results, merge them back into a
  458. // response (or an error). The results must be either Message or error
  459. // values, other types should trigger a panic.
  460. Merge(messages []Message, results []interface{}) (Message, error)
  461. }
  462. // Result converts r to a Message or an error, or panics if r could not be
  463. // converted to these types.
  464. func Result(r interface{}) (Message, error) {
  465. switch v := r.(type) {
  466. case Message:
  467. return v, nil
  468. case error:
  469. return nil, v
  470. default:
  471. panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v))
  472. }
  473. }