machine.go.rl 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. package protocol
  2. import (
  3. "errors"
  4. "io"
  5. )
  6. var (
  7. ErrNameParse = errors.New("expected measurement name")
  8. ErrFieldParse = errors.New("expected field")
  9. ErrTagParse = errors.New("expected tag")
  10. ErrTimestampParse = errors.New("expected timestamp")
  11. ErrParse = errors.New("parse error")
  12. EOF = errors.New("EOF")
  13. )
  14. %%{
  15. machine LineProtocol;
  16. action begin {
  17. m.pb = m.p
  18. }
  19. action name_error {
  20. err = ErrNameParse
  21. fhold;
  22. fnext discard_line;
  23. fbreak;
  24. }
  25. action field_error {
  26. err = ErrFieldParse
  27. fhold;
  28. fnext discard_line;
  29. fbreak;
  30. }
  31. action tagset_error {
  32. err = ErrTagParse
  33. fhold;
  34. fnext discard_line;
  35. fbreak;
  36. }
  37. action timestamp_error {
  38. err = ErrTimestampParse
  39. fhold;
  40. fnext discard_line;
  41. fbreak;
  42. }
  43. action parse_error {
  44. err = ErrParse
  45. fhold;
  46. fnext discard_line;
  47. fbreak;
  48. }
  49. action align_error {
  50. err = ErrParse
  51. fnext discard_line;
  52. fbreak;
  53. }
  54. action hold_recover {
  55. fhold;
  56. fgoto main;
  57. }
  58. action goto_align {
  59. fgoto align;
  60. }
  61. action begin_metric {
  62. m.beginMetric = true
  63. }
  64. action name {
  65. err = m.handler.SetMeasurement(m.text())
  66. if err != nil {
  67. fhold;
  68. fnext discard_line;
  69. fbreak;
  70. }
  71. }
  72. action tagkey {
  73. m.key = m.text()
  74. }
  75. action tagvalue {
  76. err = m.handler.AddTag(m.key, m.text())
  77. if err != nil {
  78. fhold;
  79. fnext discard_line;
  80. fbreak;
  81. }
  82. }
  83. action fieldkey {
  84. m.key = m.text()
  85. }
  86. action integer {
  87. err = m.handler.AddInt(m.key, m.text())
  88. if err != nil {
  89. fhold;
  90. fnext discard_line;
  91. fbreak;
  92. }
  93. }
  94. action unsigned {
  95. err = m.handler.AddUint(m.key, m.text())
  96. if err != nil {
  97. fhold;
  98. fnext discard_line;
  99. fbreak;
  100. }
  101. }
  102. action float {
  103. err = m.handler.AddFloat(m.key, m.text())
  104. if err != nil {
  105. fhold;
  106. fnext discard_line;
  107. fbreak;
  108. }
  109. }
  110. action bool {
  111. err = m.handler.AddBool(m.key, m.text())
  112. if err != nil {
  113. fhold;
  114. fnext discard_line;
  115. fbreak;
  116. }
  117. }
  118. action string {
  119. err = m.handler.AddString(m.key, m.text())
  120. if err != nil {
  121. fhold;
  122. fnext discard_line;
  123. fbreak;
  124. }
  125. }
  126. action timestamp {
  127. err = m.handler.SetTimestamp(m.text())
  128. if err != nil {
  129. fhold;
  130. fnext discard_line;
  131. fbreak;
  132. }
  133. }
  134. action incr_newline {
  135. m.lineno++
  136. m.sol = m.p
  137. m.sol++ // next char will be the first column in the line
  138. }
  139. action eol {
  140. m.finishMetric = true
  141. fnext align;
  142. fbreak;
  143. }
  144. action finish_metric {
  145. m.finishMetric = true
  146. }
  147. ws =
  148. [\t\v\f ];
  149. newline =
  150. '\r'? '\n' >incr_newline;
  151. non_zero_digit =
  152. [1-9];
  153. integer =
  154. '-'? ( digit | ( non_zero_digit digit* ) );
  155. unsigned =
  156. ( digit | ( non_zero_digit digit* ) );
  157. number =
  158. '-'? (digit+ ('.' digit*)? | '.' digit+);
  159. scientific =
  160. number 'e'i ["\-+"]? digit+;
  161. timestamp =
  162. ('-'? digit{1,19}) >begin %timestamp;
  163. fieldkeychar =
  164. [^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r] );
  165. fieldkey =
  166. fieldkeychar+ >begin %fieldkey;
  167. fieldfloat =
  168. (scientific | number) >begin %float;
  169. fieldinteger =
  170. (integer 'i') >begin %integer;
  171. fieldunsigned =
  172. (unsigned 'u') >begin %unsigned;
  173. false =
  174. "false" | "FALSE" | "False" | "F" | "f";
  175. true =
  176. "true" | "TRUE" | "True" | "T" | "t";
  177. fieldbool =
  178. (true | false) >begin %bool;
  179. fieldstringchar =
  180. [^\f\r\n\\"] | '\\' [\\"] | newline;
  181. fieldstring =
  182. fieldstringchar* >begin %string;
  183. fieldstringquoted =
  184. '"' fieldstring '"';
  185. fieldvalue = fieldinteger | fieldunsigned | fieldfloat | fieldstringquoted | fieldbool;
  186. field =
  187. fieldkey '=' fieldvalue;
  188. fieldset =
  189. field ( ',' field )*;
  190. tagchar =
  191. [^\t\n\v\f\r ,=\\] | ( '\\' [^\t\n\v\f\r\\] ) | '\\\\' %to{ fhold; };
  192. tagkey =
  193. tagchar+ >begin %tagkey;
  194. tagvalue =
  195. tagchar+ >begin %eof(tagvalue) %tagvalue;
  196. tagset =
  197. ((',' tagkey '=' tagvalue) $err(tagset_error))*;
  198. measurement_chars =
  199. [^\t\n\v\f\r ,\\] | ( '\\' [^\t\n\v\f\r] );
  200. measurement_start =
  201. measurement_chars - '#';
  202. measurement =
  203. (measurement_start measurement_chars*) >begin %eof(name) %name;
  204. eol_break =
  205. newline %to(eol)
  206. ;
  207. metric =
  208. measurement >err(name_error)
  209. tagset
  210. ws+ fieldset $err(field_error)
  211. (ws+ timestamp)? $err(timestamp_error)
  212. ;
  213. line_with_term =
  214. ws* metric ws* eol_break
  215. ;
  216. line_without_term =
  217. ws* metric ws*
  218. ;
  219. main :=
  220. (line_with_term*
  221. (line_with_term | line_without_term?)
  222. ) >begin_metric %eof(finish_metric)
  223. ;
  224. # The discard_line machine discards the current line. Useful for recovering
  225. # on the next line when an error occurs.
  226. discard_line :=
  227. (any -- newline)* newline @goto_align;
  228. commentline =
  229. ws* '#' (any -- newline)* newline;
  230. emptyline =
  231. ws* newline;
  232. # The align machine scans forward to the start of the next line. This machine
  233. # is used to skip over whitespace and comments, keeping this logic out of the
  234. # main machine.
  235. #
  236. # Skip valid lines that don't contain line protocol, any other data will move
  237. # control to the main parser via the err action.
  238. align :=
  239. (emptyline | commentline | ws+)* %err(hold_recover);
  240. # Series is a machine for matching measurement+tagset
  241. series :=
  242. (measurement >err(name_error) tagset eol_break?)
  243. >begin_metric
  244. ;
  245. }%%
  246. %% write data;
  247. type Handler interface {
  248. SetMeasurement(name []byte) error
  249. AddTag(key []byte, value []byte) error
  250. AddInt(key []byte, value []byte) error
  251. AddUint(key []byte, value []byte) error
  252. AddFloat(key []byte, value []byte) error
  253. AddString(key []byte, value []byte) error
  254. AddBool(key []byte, value []byte) error
  255. SetTimestamp(tm []byte) error
  256. }
  257. type machine struct {
  258. data []byte
  259. cs int
  260. p, pe, eof int
  261. pb int
  262. lineno int
  263. sol int
  264. handler Handler
  265. initState int
  266. key []byte
  267. beginMetric bool
  268. finishMetric bool
  269. }
  270. func NewMachine(handler Handler) *machine {
  271. m := &machine{
  272. handler: handler,
  273. initState: LineProtocol_en_align,
  274. }
  275. %% access m.;
  276. %% variable p m.p;
  277. %% variable cs m.cs;
  278. %% variable pe m.pe;
  279. %% variable eof m.eof;
  280. %% variable data m.data;
  281. %% write init;
  282. return m
  283. }
  284. func NewSeriesMachine(handler Handler) *machine {
  285. m := &machine{
  286. handler: handler,
  287. initState: LineProtocol_en_series,
  288. }
  289. %% access m.;
  290. %% variable p m.p;
  291. %% variable pe m.pe;
  292. %% variable eof m.eof;
  293. %% variable data m.data;
  294. %% write init;
  295. return m
  296. }
  297. func (m *machine) SetData(data []byte) {
  298. m.data = data
  299. m.p = 0
  300. m.pb = 0
  301. m.lineno = 1
  302. m.sol = 0
  303. m.pe = len(data)
  304. m.eof = len(data)
  305. m.key = nil
  306. m.beginMetric = false
  307. m.finishMetric = false
  308. %% write init;
  309. m.cs = m.initState
  310. }
  311. // Next parses the next metric line and returns nil if it was successfully
  312. // processed. If the line contains a syntax error an error is returned,
  313. // otherwise if the end of file is reached before finding a metric line then
  314. // EOF is returned.
  315. func (m *machine) Next() error {
  316. if m.p == m.pe && m.pe == m.eof {
  317. return EOF
  318. }
  319. m.key = nil
  320. m.beginMetric = false
  321. m.finishMetric = false
  322. return m.exec()
  323. }
  324. func (m *machine) exec() error {
  325. var err error
  326. %% write exec;
  327. if err != nil {
  328. return err
  329. }
  330. // This would indicate an error in the machine that was reported with a
  331. // more specific error. We return a generic error but this should
  332. // possibly be a panic.
  333. if m.cs == %%{ write error; }%% {
  334. m.cs = LineProtocol_en_discard_line
  335. return ErrParse
  336. }
  337. // If we haven't found a metric line yet and we reached the EOF, report it
  338. // now. This happens when the data ends with a comment or whitespace.
  339. //
  340. // Otherwise we have successfully parsed a metric line, so if we are at
  341. // the EOF we will report it the next call.
  342. if !m.beginMetric && m.p == m.pe && m.pe == m.eof {
  343. return EOF
  344. }
  345. return nil
  346. }
  347. // Position returns the current byte offset into the data.
  348. func (m *machine) Position() int {
  349. return m.p
  350. }
  351. // LineOffset returns the byte offset of the current line.
  352. func (m *machine) LineOffset() int {
  353. return m.sol
  354. }
  355. // LineNumber returns the current line number. Lines are counted based on the
  356. // regular expression `\r?\n`.
  357. func (m *machine) LineNumber() int {
  358. return m.lineno
  359. }
  360. // Column returns the current column.
  361. func (m *machine) Column() int {
  362. lineOffset := m.p - m.sol
  363. return lineOffset + 1
  364. }
  365. func (m *machine) text() []byte {
  366. return m.data[m.pb:m.p]
  367. }
  368. type streamMachine struct {
  369. machine *machine
  370. reader io.Reader
  371. }
  372. func NewStreamMachine(r io.Reader, handler Handler) *streamMachine {
  373. m := &streamMachine{
  374. machine: NewMachine(handler),
  375. reader: r,
  376. }
  377. m.machine.SetData(make([]byte, 1024))
  378. m.machine.pe = 0
  379. m.machine.eof = -1
  380. return m
  381. }
  382. func (m *streamMachine) Next() error {
  383. // Check if we are already at EOF, this should only happen if called again
  384. // after already returning EOF.
  385. if m.machine.p == m.machine.pe && m.machine.pe == m.machine.eof {
  386. return EOF
  387. }
  388. copy(m.machine.data, m.machine.data[m.machine.p:])
  389. m.machine.pe = m.machine.pe - m.machine.p
  390. m.machine.sol = m.machine.sol - m.machine.p
  391. m.machine.pb = 0
  392. m.machine.p = 0
  393. m.machine.eof = -1
  394. m.machine.key = nil
  395. m.machine.beginMetric = false
  396. m.machine.finishMetric = false
  397. for {
  398. // Expand the buffer if it is full
  399. if m.machine.pe == len(m.machine.data) {
  400. expanded := make([]byte, 2 * len(m.machine.data))
  401. copy(expanded, m.machine.data)
  402. m.machine.data = expanded
  403. }
  404. n, err := m.reader.Read(m.machine.data[m.machine.pe:])
  405. if n == 0 && err == io.EOF {
  406. m.machine.eof = m.machine.pe
  407. } else if err != nil && err != io.EOF {
  408. return err
  409. }
  410. m.machine.pe += n
  411. err = m.machine.exec()
  412. if err != nil {
  413. return err
  414. }
  415. // If we have successfully parsed a full metric line break out
  416. if m.machine.finishMetric {
  417. break
  418. }
  419. }
  420. return nil
  421. }
  422. // Position returns the current byte offset into the data.
  423. func (m *streamMachine) Position() int {
  424. return m.machine.Position()
  425. }
  426. // LineOffset returns the byte offset of the current line.
  427. func (m *streamMachine) LineOffset() int {
  428. return m.machine.LineOffset()
  429. }
  430. // LineNumber returns the current line number. Lines are counted based on the
  431. // regular expression `\r?\n`.
  432. func (m *streamMachine) LineNumber() int {
  433. return m.machine.LineNumber()
  434. }
  435. // Column returns the current column.
  436. func (m *streamMachine) Column() int {
  437. return m.machine.Column()
  438. }
  439. // LineText returns the text of the current line that has been parsed so far.
  440. func (m *streamMachine) LineText() string {
  441. return string(m.machine.data[0:m.machine.p])
  442. }