service.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. // Package write provides service and its stuff
  5. package write
  6. import (
  7. "bytes"
  8. "context"
  9. "fmt"
  10. "io"
  11. "math/rand"
  12. "net/http"
  13. "net/url"
  14. "sort"
  15. "strings"
  16. "sync"
  17. "time"
  18. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  19. "github.com/influxdata/influxdb-client-go/v2/api/write"
  20. "github.com/influxdata/influxdb-client-go/v2/internal/gzip"
  21. "github.com/influxdata/influxdb-client-go/v2/internal/log"
  22. ilog "github.com/influxdata/influxdb-client-go/v2/log"
  23. lp "github.com/influxdata/line-protocol"
  24. )
  25. // Batch holds information for sending points batch
  26. type Batch struct {
  27. // lines to send
  28. Batch string
  29. // retry attempts so far
  30. RetryAttempts uint
  31. // true if it was removed from queue
  32. Evicted bool
  33. // time when this batch expires
  34. Expires time.Time
  35. }
  36. // NewBatch creates new batch
  37. func NewBatch(data string, expireDelayMs uint) *Batch {
  38. return &Batch{
  39. Batch: data,
  40. Expires: time.Now().Add(time.Duration(expireDelayMs) * time.Millisecond),
  41. }
  42. }
  43. // BatchErrorCallback is synchronously notified in case non-blocking write fails.
  44. // It returns true if WriteAPI should continue with retrying, false will discard the batch.
  45. type BatchErrorCallback func(batch *Batch, error2 http2.Error) bool
  46. // Service is responsible for reliable writing of batches
  47. type Service struct {
  48. org string
  49. bucket string
  50. httpService http2.Service
  51. url string
  52. lastWriteAttempt time.Time
  53. retryQueue *queue
  54. lock sync.Mutex
  55. writeOptions *write.Options
  56. retryExponentialBase uint
  57. errorCb BatchErrorCallback
  58. retryDelay uint
  59. retryAttempts uint
  60. }
  61. // NewService creates new write service
  62. func NewService(org string, bucket string, httpService http2.Service, options *write.Options) *Service {
  63. retryBufferLimit := options.RetryBufferLimit() / options.BatchSize()
  64. if retryBufferLimit == 0 {
  65. retryBufferLimit = 1
  66. }
  67. u, _ := url.Parse(httpService.ServerAPIURL())
  68. u, _ = u.Parse("write")
  69. params := u.Query()
  70. params.Set("org", org)
  71. params.Set("bucket", bucket)
  72. params.Set("precision", precisionToString(options.Precision()))
  73. if options.Consistency() != "" {
  74. params.Set("consistency", string(options.Consistency()))
  75. }
  76. u.RawQuery = params.Encode()
  77. writeURL := u.String()
  78. return &Service{
  79. org: org,
  80. bucket: bucket,
  81. httpService: httpService,
  82. url: writeURL,
  83. writeOptions: options,
  84. retryQueue: newQueue(int(retryBufferLimit)),
  85. retryExponentialBase: 2,
  86. retryDelay: options.RetryInterval(),
  87. retryAttempts: 0,
  88. }
  89. }
  90. // SetBatchErrorCallback sets callback allowing custom handling of failed writes.
  91. // If callback returns true, failed batch will be retried, otherwise discarded.
  92. func (w *Service) SetBatchErrorCallback(cb BatchErrorCallback) {
  93. w.errorCb = cb
  94. }
  95. // HandleWrite handles writes of batches and handles retrying.
  96. // Retrying is triggered by new writes, there is no scheduler.
  97. // It first checks retry queue, because it has the highest priority.
  98. // If there are some batches in retry queue, those are written and incoming batch is added to end of retry queue.
  99. // Immediate write is allowed only in case there was success or not retryable error.
  100. // Otherwise, delay is checked based on recent batch.
  101. // If write of batch fails with retryable error (connection errors and HTTP code >= 429),
  102. // Batch retry time is calculated based on #of attempts.
  103. // If writes continues failing and # of attempts reaches maximum or total retry time reaches maxRetryTime,
  104. // batch is discarded.
  105. func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error {
  106. log.Debug("Write proc: received write request")
  107. batchToWrite := batch
  108. retrying := false
  109. for {
  110. select {
  111. case <-ctx.Done():
  112. log.Debug("Write proc: ctx cancelled req")
  113. return ctx.Err()
  114. default:
  115. }
  116. if !w.retryQueue.isEmpty() {
  117. log.Debug("Write proc: taking batch from retry queue")
  118. if !retrying {
  119. b := w.retryQueue.first()
  120. // Discard batches at beginning of retryQueue that have already expired
  121. if time.Now().After(b.Expires) {
  122. log.Error("Write proc: oldest batch in retry queue expired, discarding")
  123. if !b.Evicted {
  124. w.retryQueue.pop()
  125. }
  126. continue
  127. }
  128. // Can we write? In case of retryable error we must wait a bit
  129. if w.lastWriteAttempt.IsZero() || time.Now().After(w.lastWriteAttempt.Add(time.Millisecond*time.Duration(w.retryDelay))) {
  130. retrying = true
  131. } else {
  132. log.Warn("Write proc: cannot write yet, storing batch to queue")
  133. if w.retryQueue.push(batch) {
  134. log.Error("Write proc: Retry buffer full, discarding oldest batch")
  135. }
  136. batchToWrite = nil
  137. }
  138. }
  139. if retrying {
  140. batchToWrite = w.retryQueue.first()
  141. if batch != nil { //store actual batch to retry queue
  142. if w.retryQueue.push(batch) {
  143. log.Error("Write proc: Retry buffer full, discarding oldest batch")
  144. }
  145. batch = nil
  146. }
  147. }
  148. }
  149. // write batch
  150. if batchToWrite != nil {
  151. perror := w.WriteBatch(ctx, batchToWrite)
  152. if perror != nil {
  153. if isIgnorableError(perror) {
  154. log.Warnf("Write error: %s", perror.Error())
  155. } else {
  156. if w.writeOptions.MaxRetries() != 0 && (perror.StatusCode == 0 || perror.StatusCode >= http.StatusTooManyRequests) {
  157. log.Errorf("Write error: %s, batch kept for retrying\n", perror.Error())
  158. if perror.RetryAfter > 0 {
  159. w.retryDelay = perror.RetryAfter * 1000
  160. } else {
  161. w.retryDelay = w.computeRetryDelay(w.retryAttempts)
  162. }
  163. if w.errorCb != nil && !w.errorCb(batchToWrite, *perror) {
  164. log.Error("Callback rejected batch, discarding")
  165. if !batchToWrite.Evicted {
  166. w.retryQueue.pop()
  167. }
  168. return perror
  169. }
  170. // store new batch (not taken from queue)
  171. if !batchToWrite.Evicted && batchToWrite != w.retryQueue.first() {
  172. if w.retryQueue.push(batch) {
  173. log.Error("Retry buffer full, discarding oldest batch")
  174. }
  175. } else if batchToWrite.RetryAttempts == w.writeOptions.MaxRetries() {
  176. log.Error("Reached maximum number of retries, discarding batch")
  177. if !batchToWrite.Evicted {
  178. w.retryQueue.pop()
  179. }
  180. }
  181. batchToWrite.RetryAttempts++
  182. w.retryAttempts++
  183. log.Debugf("Write proc: next wait for write is %dms\n", w.retryDelay)
  184. } else {
  185. log.Errorf("Write error: %s\n", perror.Error())
  186. }
  187. return fmt.Errorf("write failed (attempts %d): %w", batchToWrite.RetryAttempts, perror)
  188. }
  189. }
  190. w.retryDelay = w.writeOptions.RetryInterval()
  191. w.retryAttempts = 0
  192. if retrying && !batchToWrite.Evicted {
  193. w.retryQueue.pop()
  194. }
  195. batchToWrite = nil
  196. } else {
  197. break
  198. }
  199. }
  200. return nil
  201. }
  202. // Non-retryable errors
  203. const (
  204. errStringHintedHandoffNotEmpty = "hinted handoff queue not empty"
  205. errStringPartialWrite = "partial write"
  206. errStringPointsBeyondRP = "points beyond retention policy"
  207. errStringUnableToParse = "unable to parse"
  208. )
  209. func isIgnorableError(error *http2.Error) bool {
  210. // This "error" is an informational message about the state of the
  211. // InfluxDB cluster.
  212. if strings.Contains(error.Message, errStringHintedHandoffNotEmpty) {
  213. return true
  214. }
  215. // Points beyond retention policy is returned when points are immediately
  216. // discarded for being older than the retention policy. Usually this not
  217. // a cause for concern, and we don't want to retry.
  218. if strings.Contains(error.Message, errStringPointsBeyondRP) {
  219. return true
  220. }
  221. // Other partial write errors, such as "field type conflict", are not
  222. // correctable at this point and so the point is dropped instead of
  223. // retrying.
  224. if strings.Contains(error.Message, errStringPartialWrite) {
  225. return true
  226. }
  227. // This error indicates an error in line protocol
  228. // serialization, retries would not be successful.
  229. if strings.Contains(error.Message, errStringUnableToParse) {
  230. return true
  231. }
  232. return false
  233. }
  234. // computeRetryDelay calculates retry delay.
  235. // Retry delay is calculated as random value within the interval
  236. // [retry_interval * exponential_base^(attempts) and retry_interval * exponential_base^(attempts+1)]
  237. func (w *Service) computeRetryDelay(attempts uint) uint {
  238. minDelay := int(w.writeOptions.RetryInterval() * pow(w.writeOptions.ExponentialBase(), attempts))
  239. maxDelay := int(w.writeOptions.RetryInterval() * pow(w.writeOptions.ExponentialBase(), attempts+1))
  240. diff := maxDelay - minDelay
  241. if diff <= 0 { //check overflows
  242. return w.writeOptions.MaxRetryInterval()
  243. }
  244. retryDelay := uint(rand.Intn(diff) + minDelay)
  245. if retryDelay > w.writeOptions.MaxRetryInterval() {
  246. retryDelay = w.writeOptions.MaxRetryInterval()
  247. }
  248. return retryDelay
  249. }
  250. // pow computes x**y
  251. func pow(x, y uint) uint {
  252. p := uint(1)
  253. if y == 0 {
  254. return 1
  255. }
  256. for i := uint(1); i <= y; i++ {
  257. p = p * x
  258. }
  259. return p
  260. }
  261. // WriteBatch performs actual writing via HTTP service
  262. func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error {
  263. var body io.Reader
  264. var err error
  265. body = strings.NewReader(batch.Batch)
  266. if log.Level() >= ilog.DebugLevel {
  267. log.Debugf("Writing batch: %s", batch.Batch)
  268. }
  269. if w.writeOptions.UseGZip() {
  270. body, err = gzip.CompressWithGzip(body)
  271. if err != nil {
  272. return http2.NewError(err)
  273. }
  274. }
  275. w.lock.Lock()
  276. w.lastWriteAttempt = time.Now()
  277. w.lock.Unlock()
  278. perror := w.httpService.DoPostRequest(ctx, w.url, body, func(req *http.Request) {
  279. if w.writeOptions.UseGZip() {
  280. req.Header.Set("Content-Encoding", "gzip")
  281. }
  282. }, func(r *http.Response) error {
  283. return r.Body.Close()
  284. })
  285. return perror
  286. }
  287. // Flush sends batches from retry queue immediately, without retrying
  288. func (w *Service) Flush() {
  289. for !w.retryQueue.isEmpty() {
  290. b := w.retryQueue.pop()
  291. if time.Now().After(b.Expires) {
  292. log.Error("Oldest batch in retry queue expired, discarding")
  293. continue
  294. }
  295. if err := w.WriteBatch(context.Background(), b); err != nil {
  296. log.Errorf("Error flushing batch from retry queue: %w", err.Unwrap())
  297. }
  298. }
  299. }
  300. // pointWithDefaultTags encapsulates Point with default tags
  301. type pointWithDefaultTags struct {
  302. point *write.Point
  303. defaultTags map[string]string
  304. }
  305. // Name returns the name of measurement of a point.
  306. func (p *pointWithDefaultTags) Name() string {
  307. return p.point.Name()
  308. }
  309. // Time is the timestamp of a Point.
  310. func (p *pointWithDefaultTags) Time() time.Time {
  311. return p.point.Time()
  312. }
  313. // FieldList returns a slice containing the fields of a Point.
  314. func (p *pointWithDefaultTags) FieldList() []*lp.Field {
  315. return p.point.FieldList()
  316. }
  317. // TagList returns tags from point along with default tags
  318. // If point of tag can override default tag
  319. func (p *pointWithDefaultTags) TagList() []*lp.Tag {
  320. tags := make([]*lp.Tag, 0, len(p.point.TagList())+len(p.defaultTags))
  321. tags = append(tags, p.point.TagList()...)
  322. for k, v := range p.defaultTags {
  323. if !existTag(p.point.TagList(), k) {
  324. tags = append(tags, &lp.Tag{
  325. Key: k,
  326. Value: v,
  327. })
  328. }
  329. }
  330. sort.Slice(tags, func(i, j int) bool { return tags[i].Key < tags[j].Key })
  331. return tags
  332. }
  333. func existTag(tags []*lp.Tag, key string) bool {
  334. for _, tag := range tags {
  335. if key == tag.Key {
  336. return true
  337. }
  338. }
  339. return false
  340. }
  341. // EncodePoints creates line protocol string from points
  342. func (w *Service) EncodePoints(points ...*write.Point) (string, error) {
  343. var buffer bytes.Buffer
  344. e := lp.NewEncoder(&buffer)
  345. e.SetFieldTypeSupport(lp.UintSupport)
  346. e.FailOnFieldErr(true)
  347. e.SetPrecision(w.writeOptions.Precision())
  348. for _, point := range points {
  349. _, err := e.Encode(w.pointToEncode(point))
  350. if err != nil {
  351. return "", err
  352. }
  353. }
  354. return buffer.String(), nil
  355. }
  356. // pointToEncode determines whether default tags should be applied
  357. // and returns point with default tags instead of point
  358. func (w *Service) pointToEncode(point *write.Point) lp.Metric {
  359. var m lp.Metric
  360. if len(w.writeOptions.DefaultTags()) > 0 {
  361. m = &pointWithDefaultTags{
  362. point: point,
  363. defaultTags: w.writeOptions.DefaultTags(),
  364. }
  365. } else {
  366. m = point
  367. }
  368. return m
  369. }
  370. // WriteURL returns current write URL
  371. func (w *Service) WriteURL() string {
  372. return w.url
  373. }
  374. func precisionToString(precision time.Duration) string {
  375. prec := "ns"
  376. switch precision {
  377. case time.Microsecond:
  378. prec = "us"
  379. case time.Millisecond:
  380. prec = "ms"
  381. case time.Second:
  382. prec = "s"
  383. }
  384. return prec
  385. }