encoder.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package protocol
  2. import (
  3. "fmt"
  4. "io"
  5. "math"
  6. "sort"
  7. "strconv"
  8. "time"
  9. )
  10. // ErrIsNaN is a field error for when a float field is NaN.
  11. var ErrIsNaN = &FieldError{"is NaN"}
  12. // ErrIsInf is a field error for when a float field is Inf.
  13. var ErrIsInf = &FieldError{"is Inf"}
  14. // Encoder marshals Metrics into influxdb line protocol.
  15. // It is not safe for concurrent use, make a new one!
  16. // The default behavior when encountering a field error is to ignore the field and move on.
  17. // If you wish it to error out on field errors, use Encoder.FailOnFieldErr(true)
  18. type Encoder struct {
  19. w io.Writer
  20. fieldSortOrder FieldSortOrder
  21. fieldTypeSupport FieldTypeSupport
  22. failOnFieldError bool
  23. maxLineBytes int
  24. fieldList []*Field
  25. header []byte
  26. footer []byte
  27. pair []byte
  28. precision time.Duration
  29. }
  30. // SetMaxLineBytes sets a maximum length for a line, Encode will error if the generated line is longer
  31. func (e *Encoder) SetMaxLineBytes(i int) {
  32. e.maxLineBytes = i
  33. }
  34. // SetFieldSortOrder sets a sort order for the data.
  35. // The options are:
  36. // NoSortFields (doesn't sort the fields)
  37. // SortFields (sorts the keys in alphabetical order)
  38. func (e *Encoder) SetFieldSortOrder(s FieldSortOrder) {
  39. e.fieldSortOrder = s
  40. }
  41. // SetFieldTypeSupport sets flags for if the encoder supports certain optional field types such as uint64
  42. func (e *Encoder) SetFieldTypeSupport(s FieldTypeSupport) {
  43. e.fieldTypeSupport = s
  44. }
  45. // FailOnFieldErr whether or not to fail on a field error or just move on.
  46. // The default behavior to move on
  47. func (e *Encoder) FailOnFieldErr(s bool) {
  48. e.failOnFieldError = s
  49. }
  50. // SetPrecision sets time precision for writes
  51. // Default is nanoseconds precision
  52. func (e *Encoder) SetPrecision(p time.Duration) {
  53. e.precision = p
  54. }
  55. // NewEncoder gives us an encoder that marshals to a writer in influxdb line protocol
  56. // as defined by:
  57. // https://docs.influxdata.com/influxdb/v1.5/write_protocols/line_protocol_reference/
  58. func NewEncoder(w io.Writer) *Encoder {
  59. return &Encoder{
  60. w: w,
  61. header: make([]byte, 0, 128),
  62. footer: make([]byte, 0, 128),
  63. pair: make([]byte, 0, 128),
  64. fieldList: make([]*Field, 0, 16),
  65. precision: time.Nanosecond,
  66. }
  67. }
  68. // This is here to significantly reduce allocations, wish that we had constant/immutable keyword that applied to
  69. // more complex objects
  70. var comma = []byte(",")
  71. // Encode marshals a Metric to the io.Writer in the Encoder
  72. func (e *Encoder) Encode(m Metric) (int, error) {
  73. err := e.buildHeader(m)
  74. if err != nil {
  75. return 0, err
  76. }
  77. e.buildFooter(m.Time())
  78. // here we make a copy of the *fields so we can do an in-place sort
  79. e.fieldList = append(e.fieldList[:0], m.FieldList()...)
  80. if e.fieldSortOrder == SortFields {
  81. sort.Slice(e.fieldList, func(i, j int) bool {
  82. return e.fieldList[i].Key < e.fieldList[j].Key
  83. })
  84. }
  85. i := 0
  86. totalWritten := 0
  87. pairsLen := 0
  88. firstField := true
  89. for _, field := range e.fieldList {
  90. err = e.buildFieldPair(field.Key, field.Value)
  91. if err != nil {
  92. if e.failOnFieldError {
  93. return 0, err
  94. }
  95. continue
  96. }
  97. bytesNeeded := len(e.header) + pairsLen + len(e.pair) + len(e.footer)
  98. // Additional length needed for field separator `,`
  99. if !firstField {
  100. bytesNeeded++
  101. }
  102. if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
  103. // Need at least one field per line
  104. if firstField {
  105. return 0, ErrNeedMoreSpace
  106. }
  107. i, err = e.w.Write(e.footer)
  108. if err != nil {
  109. return 0, err
  110. }
  111. pairsLen = 0
  112. totalWritten += i
  113. bytesNeeded = len(e.header) + len(e.pair) + len(e.footer)
  114. if e.maxLineBytes > 0 && bytesNeeded > e.maxLineBytes {
  115. return 0, ErrNeedMoreSpace
  116. }
  117. i, err = e.w.Write(e.header)
  118. if err != nil {
  119. return 0, err
  120. }
  121. totalWritten += i
  122. i, err = e.w.Write(e.pair)
  123. if err != nil {
  124. return 0, err
  125. }
  126. totalWritten += i
  127. pairsLen += len(e.pair)
  128. firstField = false
  129. continue
  130. }
  131. if firstField {
  132. i, err = e.w.Write(e.header)
  133. if err != nil {
  134. return 0, err
  135. }
  136. totalWritten += i
  137. } else {
  138. i, err = e.w.Write(comma)
  139. if err != nil {
  140. return 0, err
  141. }
  142. totalWritten += i
  143. }
  144. i, err = e.w.Write(e.pair)
  145. if err != nil {
  146. return 0, err
  147. }
  148. totalWritten += i
  149. pairsLen += len(e.pair)
  150. firstField = false
  151. }
  152. if firstField {
  153. return 0, ErrNoFields
  154. }
  155. i, err = e.w.Write(e.footer)
  156. if err != nil {
  157. return 0, err
  158. }
  159. totalWritten += i
  160. return totalWritten, nil
  161. }
  162. func (e *Encoder) buildHeader(m Metric) error {
  163. e.header = e.header[:0]
  164. name := nameEscape(m.Name())
  165. if name == "" {
  166. return ErrInvalidName
  167. }
  168. e.header = append(e.header, name...)
  169. for _, tag := range m.TagList() {
  170. key := escape(tag.Key)
  171. value := escape(tag.Value)
  172. // Some keys and values are not encodeable as line protocol, such as
  173. // those with a trailing '\' or empty strings.
  174. if key == "" || value == "" {
  175. continue
  176. }
  177. e.header = append(e.header, ',')
  178. e.header = append(e.header, key...)
  179. e.header = append(e.header, '=')
  180. e.header = append(e.header, value...)
  181. }
  182. e.header = append(e.header, ' ')
  183. return nil
  184. }
  185. func (e *Encoder) buildFieldVal(value interface{}) error {
  186. switch v := value.(type) {
  187. case uint64:
  188. if e.fieldTypeSupport&UintSupport != 0 {
  189. e.pair = append(strconv.AppendUint(e.pair, v, 10), 'u')
  190. } else if v <= uint64(math.MaxInt64) {
  191. e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
  192. } else {
  193. e.pair = append(strconv.AppendInt(e.pair, math.MaxInt64, 10), 'i')
  194. }
  195. case int64:
  196. e.pair = append(strconv.AppendInt(e.pair, v, 10), 'i')
  197. case int:
  198. e.pair = append(strconv.AppendInt(e.pair, int64(v), 10), 'i')
  199. case float64:
  200. if math.IsNaN(v) {
  201. return ErrIsNaN
  202. }
  203. if math.IsInf(v, 0) {
  204. return ErrIsInf
  205. }
  206. e.pair = strconv.AppendFloat(e.pair, v, 'f', -1, 64)
  207. case float32:
  208. v32 := float64(v)
  209. if math.IsNaN(v32) {
  210. return ErrIsNaN
  211. }
  212. if math.IsInf(v32, 0) {
  213. return ErrIsInf
  214. }
  215. e.pair = strconv.AppendFloat(e.pair, v32, 'f', -1, 64)
  216. case string:
  217. e.pair = append(e.pair, '"')
  218. e.pair = append(e.pair, stringFieldEscape(v)...)
  219. e.pair = append(e.pair, '"')
  220. case []byte:
  221. e.pair = append(e.pair, '"')
  222. stringFieldEscapeBytes(&e.pair, v)
  223. e.pair = append(e.pair, '"')
  224. case bool:
  225. e.pair = strconv.AppendBool(e.pair, v)
  226. default:
  227. return &FieldError{fmt.Sprintf("invalid value type: %T", v)}
  228. }
  229. return nil
  230. }
  231. func (e *Encoder) buildFieldPair(key string, value interface{}) error {
  232. e.pair = e.pair[:0]
  233. key = escape(key)
  234. // Some keys are not encodeable as line protocol, such as those with a
  235. // trailing '\' or empty strings.
  236. if key == "" || key[:len(key)-1] == "\\" {
  237. return &FieldError{"invalid field key"}
  238. }
  239. e.pair = append(e.pair, key...)
  240. e.pair = append(e.pair, '=')
  241. return e.buildFieldVal(value)
  242. }
  243. func (e *Encoder) buildFooter(t time.Time) {
  244. e.footer = e.footer[:0]
  245. if !t.IsZero() {
  246. e.footer = append(e.footer, ' ')
  247. switch e.precision {
  248. case time.Microsecond:
  249. e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000, 10)
  250. case time.Millisecond:
  251. e.footer = strconv.AppendInt(e.footer, t.UnixNano()/1000000, 10)
  252. case time.Second:
  253. e.footer = strconv.AppendInt(e.footer, t.Unix(), 10)
  254. default:
  255. e.footer = strconv.AppendInt(e.footer, t.UnixNano(), 10)
  256. }
  257. }
  258. e.footer = append(e.footer, '\n')
  259. }