parser.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. // Copyright 2012-2023 The NATS Authors
  2. // Licensed under the Apache License, Version 2.0 (the "License");
  3. // you may not use this file except in compliance with the License.
  4. // You may obtain a copy of the License at
  5. //
  6. // http://www.apache.org/licenses/LICENSE-2.0
  7. //
  8. // Unless required by applicable law or agreed to in writing, software
  9. // distributed under the License is distributed on an "AS IS" BASIS,
  10. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. // See the License for the specific language governing permissions and
  12. // limitations under the License.
  13. package nats
  14. import (
  15. "fmt"
  16. )
  17. type msgArg struct {
  18. subject []byte
  19. reply []byte
  20. sid int64
  21. hdr int
  22. size int
  23. }
  24. const MAX_CONTROL_LINE_SIZE = 4096
  25. type parseState struct {
  26. state int
  27. as int
  28. drop int
  29. hdr int
  30. ma msgArg
  31. argBuf []byte
  32. msgBuf []byte
  33. msgCopied bool
  34. scratch [MAX_CONTROL_LINE_SIZE]byte
  35. }
  36. const (
  37. OP_START = iota
  38. OP_PLUS
  39. OP_PLUS_O
  40. OP_PLUS_OK
  41. OP_MINUS
  42. OP_MINUS_E
  43. OP_MINUS_ER
  44. OP_MINUS_ERR
  45. OP_MINUS_ERR_SPC
  46. MINUS_ERR_ARG
  47. OP_M
  48. OP_MS
  49. OP_MSG
  50. OP_MSG_SPC
  51. MSG_ARG
  52. MSG_PAYLOAD
  53. MSG_END
  54. OP_H
  55. OP_P
  56. OP_PI
  57. OP_PIN
  58. OP_PING
  59. OP_PO
  60. OP_PON
  61. OP_PONG
  62. OP_I
  63. OP_IN
  64. OP_INF
  65. OP_INFO
  66. OP_INFO_SPC
  67. INFO_ARG
  68. )
  69. // parse is the fast protocol parser engine.
  70. func (nc *Conn) parse(buf []byte) error {
  71. var i int
  72. var b byte
  73. // Move to loop instead of range syntax to allow jumping of i
  74. for i = 0; i < len(buf); i++ {
  75. b = buf[i]
  76. switch nc.ps.state {
  77. case OP_START:
  78. switch b {
  79. case 'M', 'm':
  80. nc.ps.state = OP_M
  81. nc.ps.hdr = -1
  82. nc.ps.ma.hdr = -1
  83. case 'H', 'h':
  84. nc.ps.state = OP_H
  85. nc.ps.hdr = 0
  86. nc.ps.ma.hdr = 0
  87. case 'P', 'p':
  88. nc.ps.state = OP_P
  89. case '+':
  90. nc.ps.state = OP_PLUS
  91. case '-':
  92. nc.ps.state = OP_MINUS
  93. case 'I', 'i':
  94. nc.ps.state = OP_I
  95. default:
  96. goto parseErr
  97. }
  98. case OP_H:
  99. switch b {
  100. case 'M', 'm':
  101. nc.ps.state = OP_M
  102. default:
  103. goto parseErr
  104. }
  105. case OP_M:
  106. switch b {
  107. case 'S', 's':
  108. nc.ps.state = OP_MS
  109. default:
  110. goto parseErr
  111. }
  112. case OP_MS:
  113. switch b {
  114. case 'G', 'g':
  115. nc.ps.state = OP_MSG
  116. default:
  117. goto parseErr
  118. }
  119. case OP_MSG:
  120. switch b {
  121. case ' ', '\t':
  122. nc.ps.state = OP_MSG_SPC
  123. default:
  124. goto parseErr
  125. }
  126. case OP_MSG_SPC:
  127. switch b {
  128. case ' ', '\t':
  129. continue
  130. default:
  131. nc.ps.state = MSG_ARG
  132. nc.ps.as = i
  133. }
  134. case MSG_ARG:
  135. switch b {
  136. case '\r':
  137. nc.ps.drop = 1
  138. case '\n':
  139. var arg []byte
  140. if nc.ps.argBuf != nil {
  141. arg = nc.ps.argBuf
  142. } else {
  143. arg = buf[nc.ps.as : i-nc.ps.drop]
  144. }
  145. if err := nc.processMsgArgs(arg); err != nil {
  146. return err
  147. }
  148. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, MSG_PAYLOAD
  149. // jump ahead with the index. If this overruns
  150. // what is left we fall out and process a split buffer.
  151. i = nc.ps.as + nc.ps.ma.size - 1
  152. default:
  153. if nc.ps.argBuf != nil {
  154. nc.ps.argBuf = append(nc.ps.argBuf, b)
  155. }
  156. }
  157. case MSG_PAYLOAD:
  158. if nc.ps.msgBuf != nil {
  159. if len(nc.ps.msgBuf) >= nc.ps.ma.size {
  160. nc.processMsg(nc.ps.msgBuf)
  161. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.msgCopied, nc.ps.state = nil, nil, false, MSG_END
  162. } else {
  163. // copy as much as we can to the buffer and skip ahead.
  164. toCopy := nc.ps.ma.size - len(nc.ps.msgBuf)
  165. avail := len(buf) - i
  166. if avail < toCopy {
  167. toCopy = avail
  168. }
  169. if toCopy > 0 {
  170. start := len(nc.ps.msgBuf)
  171. // This is needed for copy to work.
  172. nc.ps.msgBuf = nc.ps.msgBuf[:start+toCopy]
  173. copy(nc.ps.msgBuf[start:], buf[i:i+toCopy])
  174. // Update our index
  175. i = (i + toCopy) - 1
  176. } else {
  177. nc.ps.msgBuf = append(nc.ps.msgBuf, b)
  178. }
  179. }
  180. } else if i-nc.ps.as >= nc.ps.ma.size {
  181. nc.processMsg(buf[nc.ps.as:i])
  182. nc.ps.argBuf, nc.ps.msgBuf, nc.ps.msgCopied, nc.ps.state = nil, nil, false, MSG_END
  183. }
  184. case MSG_END:
  185. switch b {
  186. case '\n':
  187. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  188. default:
  189. continue
  190. }
  191. case OP_PLUS:
  192. switch b {
  193. case 'O', 'o':
  194. nc.ps.state = OP_PLUS_O
  195. default:
  196. goto parseErr
  197. }
  198. case OP_PLUS_O:
  199. switch b {
  200. case 'K', 'k':
  201. nc.ps.state = OP_PLUS_OK
  202. default:
  203. goto parseErr
  204. }
  205. case OP_PLUS_OK:
  206. switch b {
  207. case '\n':
  208. nc.processOK()
  209. nc.ps.drop, nc.ps.state = 0, OP_START
  210. }
  211. case OP_MINUS:
  212. switch b {
  213. case 'E', 'e':
  214. nc.ps.state = OP_MINUS_E
  215. default:
  216. goto parseErr
  217. }
  218. case OP_MINUS_E:
  219. switch b {
  220. case 'R', 'r':
  221. nc.ps.state = OP_MINUS_ER
  222. default:
  223. goto parseErr
  224. }
  225. case OP_MINUS_ER:
  226. switch b {
  227. case 'R', 'r':
  228. nc.ps.state = OP_MINUS_ERR
  229. default:
  230. goto parseErr
  231. }
  232. case OP_MINUS_ERR:
  233. switch b {
  234. case ' ', '\t':
  235. nc.ps.state = OP_MINUS_ERR_SPC
  236. default:
  237. goto parseErr
  238. }
  239. case OP_MINUS_ERR_SPC:
  240. switch b {
  241. case ' ', '\t':
  242. continue
  243. default:
  244. nc.ps.state = MINUS_ERR_ARG
  245. nc.ps.as = i
  246. }
  247. case MINUS_ERR_ARG:
  248. switch b {
  249. case '\r':
  250. nc.ps.drop = 1
  251. case '\n':
  252. var arg []byte
  253. if nc.ps.argBuf != nil {
  254. arg = nc.ps.argBuf
  255. nc.ps.argBuf = nil
  256. } else {
  257. arg = buf[nc.ps.as : i-nc.ps.drop]
  258. }
  259. nc.processErr(string(arg))
  260. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  261. default:
  262. if nc.ps.argBuf != nil {
  263. nc.ps.argBuf = append(nc.ps.argBuf, b)
  264. }
  265. }
  266. case OP_P:
  267. switch b {
  268. case 'I', 'i':
  269. nc.ps.state = OP_PI
  270. case 'O', 'o':
  271. nc.ps.state = OP_PO
  272. default:
  273. goto parseErr
  274. }
  275. case OP_PO:
  276. switch b {
  277. case 'N', 'n':
  278. nc.ps.state = OP_PON
  279. default:
  280. goto parseErr
  281. }
  282. case OP_PON:
  283. switch b {
  284. case 'G', 'g':
  285. nc.ps.state = OP_PONG
  286. default:
  287. goto parseErr
  288. }
  289. case OP_PONG:
  290. switch b {
  291. case '\n':
  292. nc.processPong()
  293. nc.ps.drop, nc.ps.state = 0, OP_START
  294. }
  295. case OP_PI:
  296. switch b {
  297. case 'N', 'n':
  298. nc.ps.state = OP_PIN
  299. default:
  300. goto parseErr
  301. }
  302. case OP_PIN:
  303. switch b {
  304. case 'G', 'g':
  305. nc.ps.state = OP_PING
  306. default:
  307. goto parseErr
  308. }
  309. case OP_PING:
  310. switch b {
  311. case '\n':
  312. nc.processPing()
  313. nc.ps.drop, nc.ps.state = 0, OP_START
  314. }
  315. case OP_I:
  316. switch b {
  317. case 'N', 'n':
  318. nc.ps.state = OP_IN
  319. default:
  320. goto parseErr
  321. }
  322. case OP_IN:
  323. switch b {
  324. case 'F', 'f':
  325. nc.ps.state = OP_INF
  326. default:
  327. goto parseErr
  328. }
  329. case OP_INF:
  330. switch b {
  331. case 'O', 'o':
  332. nc.ps.state = OP_INFO
  333. default:
  334. goto parseErr
  335. }
  336. case OP_INFO:
  337. switch b {
  338. case ' ', '\t':
  339. nc.ps.state = OP_INFO_SPC
  340. default:
  341. goto parseErr
  342. }
  343. case OP_INFO_SPC:
  344. switch b {
  345. case ' ', '\t':
  346. continue
  347. default:
  348. nc.ps.state = INFO_ARG
  349. nc.ps.as = i
  350. }
  351. case INFO_ARG:
  352. switch b {
  353. case '\r':
  354. nc.ps.drop = 1
  355. case '\n':
  356. var arg []byte
  357. if nc.ps.argBuf != nil {
  358. arg = nc.ps.argBuf
  359. nc.ps.argBuf = nil
  360. } else {
  361. arg = buf[nc.ps.as : i-nc.ps.drop]
  362. }
  363. nc.processAsyncInfo(arg)
  364. nc.ps.drop, nc.ps.as, nc.ps.state = 0, i+1, OP_START
  365. default:
  366. if nc.ps.argBuf != nil {
  367. nc.ps.argBuf = append(nc.ps.argBuf, b)
  368. }
  369. }
  370. default:
  371. goto parseErr
  372. }
  373. }
  374. // Check for split buffer scenarios
  375. if (nc.ps.state == MSG_ARG || nc.ps.state == MINUS_ERR_ARG || nc.ps.state == INFO_ARG) && nc.ps.argBuf == nil {
  376. nc.ps.argBuf = nc.ps.scratch[:0]
  377. nc.ps.argBuf = append(nc.ps.argBuf, buf[nc.ps.as:i-nc.ps.drop]...)
  378. // FIXME, check max len
  379. }
  380. // Check for split msg
  381. if nc.ps.state == MSG_PAYLOAD && nc.ps.msgBuf == nil {
  382. // We need to clone the msgArg if it is still referencing the
  383. // read buffer and we are not able to process the msg.
  384. if nc.ps.argBuf == nil {
  385. nc.cloneMsgArg()
  386. }
  387. // If we will overflow the scratch buffer, just create a
  388. // new buffer to hold the split message.
  389. if nc.ps.ma.size > cap(nc.ps.scratch)-len(nc.ps.argBuf) {
  390. lrem := len(buf[nc.ps.as:])
  391. nc.ps.msgBuf = make([]byte, lrem, nc.ps.ma.size)
  392. copy(nc.ps.msgBuf, buf[nc.ps.as:])
  393. nc.ps.msgCopied = true
  394. } else {
  395. nc.ps.msgBuf = nc.ps.scratch[len(nc.ps.argBuf):len(nc.ps.argBuf)]
  396. nc.ps.msgBuf = append(nc.ps.msgBuf, (buf[nc.ps.as:])...)
  397. }
  398. }
  399. return nil
  400. parseErr:
  401. return fmt.Errorf("nats: Parse Error [%d]: '%s'", nc.ps.state, buf[i:])
  402. }
  403. // cloneMsgArg is used when the split buffer scenario has the pubArg in the existing read buffer, but
  404. // we need to hold onto it into the next read.
  405. func (nc *Conn) cloneMsgArg() {
  406. nc.ps.argBuf = nc.ps.scratch[:0]
  407. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.subject...)
  408. nc.ps.argBuf = append(nc.ps.argBuf, nc.ps.ma.reply...)
  409. nc.ps.ma.subject = nc.ps.argBuf[:len(nc.ps.ma.subject)]
  410. if nc.ps.ma.reply != nil {
  411. nc.ps.ma.reply = nc.ps.argBuf[len(nc.ps.ma.subject):]
  412. }
  413. }
  414. const argsLenMax = 4
  415. func (nc *Conn) processMsgArgs(arg []byte) error {
  416. // Use separate function for header based messages.
  417. if nc.ps.hdr >= 0 {
  418. return nc.processHeaderMsgArgs(arg)
  419. }
  420. // Unroll splitArgs to avoid runtime/heap issues
  421. a := [argsLenMax][]byte{}
  422. args := a[:0]
  423. start := -1
  424. for i, b := range arg {
  425. switch b {
  426. case ' ', '\t', '\r', '\n':
  427. if start >= 0 {
  428. args = append(args, arg[start:i])
  429. start = -1
  430. }
  431. default:
  432. if start < 0 {
  433. start = i
  434. }
  435. }
  436. }
  437. if start >= 0 {
  438. args = append(args, arg[start:])
  439. }
  440. switch len(args) {
  441. case 3:
  442. nc.ps.ma.subject = args[0]
  443. nc.ps.ma.sid = parseInt64(args[1])
  444. nc.ps.ma.reply = nil
  445. nc.ps.ma.size = int(parseInt64(args[2]))
  446. case 4:
  447. nc.ps.ma.subject = args[0]
  448. nc.ps.ma.sid = parseInt64(args[1])
  449. nc.ps.ma.reply = args[2]
  450. nc.ps.ma.size = int(parseInt64(args[3]))
  451. default:
  452. return fmt.Errorf("nats: processMsgArgs Parse Error: '%s'", arg)
  453. }
  454. if nc.ps.ma.sid < 0 {
  455. return fmt.Errorf("nats: processMsgArgs Bad or Missing Sid: '%s'", arg)
  456. }
  457. if nc.ps.ma.size < 0 {
  458. return fmt.Errorf("nats: processMsgArgs Bad or Missing Size: '%s'", arg)
  459. }
  460. return nil
  461. }
  462. // processHeaderMsgArgs is for a header based message.
  463. func (nc *Conn) processHeaderMsgArgs(arg []byte) error {
  464. // Unroll splitArgs to avoid runtime/heap issues
  465. a := [argsLenMax][]byte{}
  466. args := a[:0]
  467. start := -1
  468. for i, b := range arg {
  469. switch b {
  470. case ' ', '\t', '\r', '\n':
  471. if start >= 0 {
  472. args = append(args, arg[start:i])
  473. start = -1
  474. }
  475. default:
  476. if start < 0 {
  477. start = i
  478. }
  479. }
  480. }
  481. if start >= 0 {
  482. args = append(args, arg[start:])
  483. }
  484. switch len(args) {
  485. case 4:
  486. nc.ps.ma.subject = args[0]
  487. nc.ps.ma.sid = parseInt64(args[1])
  488. nc.ps.ma.reply = nil
  489. nc.ps.ma.hdr = int(parseInt64(args[2]))
  490. nc.ps.ma.size = int(parseInt64(args[3]))
  491. case 5:
  492. nc.ps.ma.subject = args[0]
  493. nc.ps.ma.sid = parseInt64(args[1])
  494. nc.ps.ma.reply = args[2]
  495. nc.ps.ma.hdr = int(parseInt64(args[3]))
  496. nc.ps.ma.size = int(parseInt64(args[4]))
  497. default:
  498. return fmt.Errorf("nats: processHeaderMsgArgs Parse Error: '%s'", arg)
  499. }
  500. if nc.ps.ma.sid < 0 {
  501. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Sid: '%s'", arg)
  502. }
  503. if nc.ps.ma.hdr < 0 || nc.ps.ma.hdr > nc.ps.ma.size {
  504. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Header Size: '%s'", arg)
  505. }
  506. if nc.ps.ma.size < 0 {
  507. return fmt.Errorf("nats: processHeaderMsgArgs Bad or Missing Size: '%s'", arg)
  508. }
  509. return nil
  510. }
  511. // ASCII numbers 0-9
  512. const (
  513. ascii_0 = 48
  514. ascii_9 = 57
  515. )
  516. // parseInt64 expects decimal positive numbers. We
  517. // return -1 to signal error
  518. func parseInt64(d []byte) (n int64) {
  519. if len(d) == 0 {
  520. return -1
  521. }
  522. for _, dec := range d {
  523. if dec < ascii_0 || dec > ascii_9 {
  524. return -1
  525. }
  526. n = n*10 + (int64(dec) - ascii_0)
  527. }
  528. return n
  529. }