parser.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. package protocol
  2. import (
  3. "fmt"
  4. "io"
  5. "strings"
  6. "sync"
  7. "time"
  8. )
  9. const (
  10. maxErrorBufferSize = 1024
  11. )
  12. // TimeFunc is used to override the default time for a metric
  13. // with no specified timestamp.
  14. type TimeFunc func() time.Time
  15. // ParseError indicates a error in the parsing of the text.
  16. type ParseError struct {
  17. Offset int
  18. LineOffset int
  19. LineNumber int
  20. Column int
  21. msg string
  22. buf string
  23. }
  24. func (e *ParseError) Error() string {
  25. buffer := e.buf[e.LineOffset:]
  26. eol := strings.IndexAny(buffer, "\r\n")
  27. if eol >= 0 {
  28. buffer = buffer[:eol]
  29. }
  30. if len(buffer) > maxErrorBufferSize {
  31. buffer = buffer[:maxErrorBufferSize] + "..."
  32. }
  33. return fmt.Sprintf("metric parse error: %s at %d:%d: %q", e.msg, e.LineNumber, e.Column, buffer)
  34. }
  35. // Parser is an InfluxDB Line Protocol parser that implements the
  36. // parsers.Parser interface.
  37. type Parser struct {
  38. DefaultTags map[string]string
  39. sync.Mutex
  40. *machine
  41. handler *MetricHandler
  42. }
  43. // NewParser returns a Parser than accepts line protocol
  44. func NewParser(handler *MetricHandler) *Parser {
  45. return &Parser{
  46. machine: NewMachine(handler),
  47. handler: handler,
  48. }
  49. }
  50. // NewSeriesParser returns a Parser than accepts a measurement and tagset
  51. func NewSeriesParser(handler *MetricHandler) *Parser {
  52. return &Parser{
  53. machine: NewSeriesMachine(handler),
  54. handler: handler,
  55. }
  56. }
  57. // SetTimeFunc allows default times to be set when no time is specified
  58. // for a metric in line-protocol.
  59. func (p *Parser) SetTimeFunc(f TimeFunc) {
  60. p.handler.SetTimeFunc(f)
  61. }
  62. // Parse interprets line-protocol bytes as many metrics.
  63. func (p *Parser) Parse(input []byte) ([]Metric, error) {
  64. p.Lock()
  65. defer p.Unlock()
  66. metrics := make([]Metric, 0)
  67. p.machine.SetData(input)
  68. for {
  69. err := p.machine.Next()
  70. if err == EOF {
  71. break
  72. }
  73. if err != nil {
  74. return nil, &ParseError{
  75. Offset: p.machine.Position(),
  76. LineOffset: p.machine.LineOffset(),
  77. LineNumber: p.machine.LineNumber(),
  78. Column: p.machine.Column(),
  79. msg: err.Error(),
  80. buf: string(input),
  81. }
  82. }
  83. metric, err := p.handler.Metric()
  84. if err != nil {
  85. return nil, err
  86. }
  87. if metric == nil {
  88. continue
  89. }
  90. metrics = append(metrics, metric)
  91. }
  92. return metrics, nil
  93. }
  94. // StreamParser is an InfluxDB Line Protocol parser. It is not safe for
  95. // concurrent use in multiple goroutines.
  96. type StreamParser struct {
  97. machine *streamMachine
  98. handler *MetricHandler
  99. }
  100. // NewStreamParser parses from a reader and iterates the machine
  101. // metric by metric. Not safe for concurrent use in multiple goroutines.
  102. func NewStreamParser(r io.Reader) *StreamParser {
  103. handler := NewMetricHandler()
  104. return &StreamParser{
  105. machine: NewStreamMachine(r, handler),
  106. handler: handler,
  107. }
  108. }
  109. // SetTimeFunc changes the function used to determine the time of metrics
  110. // without a timestamp. The default TimeFunc is time.Now. Useful mostly for
  111. // testing, or perhaps if you want all metrics to have the same timestamp.
  112. func (p *StreamParser) SetTimeFunc(f TimeFunc) {
  113. p.handler.SetTimeFunc(f)
  114. }
  115. // SetTimePrecision specifies units for the time stamp.
  116. func (p *StreamParser) SetTimePrecision(u time.Duration) {
  117. p.handler.SetTimePrecision(u)
  118. }
  119. // Next parses the next item from the stream. You can repeat calls to this
  120. // function until it returns EOF.
  121. func (p *StreamParser) Next() (Metric, error) {
  122. err := p.machine.Next()
  123. if err == EOF {
  124. return nil, EOF
  125. }
  126. if err != nil {
  127. return nil, &ParseError{
  128. Offset: p.machine.Position(),
  129. LineOffset: p.machine.LineOffset(),
  130. LineNumber: p.machine.LineNumber(),
  131. Column: p.machine.Column(),
  132. msg: err.Error(),
  133. buf: p.machine.LineText(),
  134. }
  135. }
  136. metric, err := p.handler.Metric()
  137. if err != nil {
  138. return nil, err
  139. }
  140. return metric, nil
  141. }
  142. // Position returns the current byte offset into the data.
  143. func (p *StreamParser) Position() int {
  144. return p.machine.Position()
  145. }
  146. // LineOffset returns the byte offset of the current line.
  147. func (p *StreamParser) LineOffset() int {
  148. return p.machine.LineOffset()
  149. }
  150. // LineNumber returns the current line number. Lines are counted based on the
  151. // regular expression `\r?\n`.
  152. func (p *StreamParser) LineNumber() int {
  153. return p.machine.LineNumber()
  154. }
  155. // Column returns the current column.
  156. func (p *StreamParser) Column() int {
  157. return p.machine.Column()
  158. }
  159. // LineText returns the text of the current line that has been parsed so far.
  160. func (p *StreamParser) LineText() string {
  161. return p.machine.LineText()
  162. }