writeAPIBlocking.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "context"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  11. "github.com/influxdata/influxdb-client-go/v2/api/write"
  12. iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
  13. )
  14. // WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.
  15. // It doesn't implicitly create batches of points by default. Batches are created from array of points/records.
  16. //
  17. // Implicit batching is enabled with EnableBatching(). In this mode, each call to WritePoint or WriteRecord adds a line
  18. // to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
  19. // and the result of the operation is returned.
  20. // When a point is written to the buffer, nil error is always returned.
  21. // Flush() can be used to trigger sending of batch when it doesn't have the batch-size.
  22. //
  23. // Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
  24. //
  25. // WriteAPIBlocking can be used concurrently.
  26. // When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
  27. type WriteAPIBlocking interface {
  28. // WriteRecord writes line protocol record(s) into bucket.
  29. // WriteRecord writes lines without implicit batching by default, batch is created from given number of records.
  30. // Automatic batching can be enabled by EnableBatching()
  31. // Individual arguments can also be batches (multiple records separated by newline).
  32. // Non-blocking alternative is available in the WriteAPI interface
  33. WriteRecord(ctx context.Context, line ...string) error
  34. // WritePoint data point into bucket.
  35. // WriteRecord writes points without implicit batching by default, batch is created from given number of points.
  36. // Automatic batching can be enabled by EnableBatching().
  37. // Non-blocking alternative is available in the WriteAPI interface
  38. WritePoint(ctx context.Context, point ...*write.Point) error
  39. // EnableBatching turns on implicit batching
  40. // Batch size is controlled via write.Options
  41. EnableBatching()
  42. // Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size.
  43. Flush(ctx context.Context) error
  44. }
  45. // writeAPIBlocking implements WriteAPIBlocking interface
  46. type writeAPIBlocking struct {
  47. service *iwrite.Service
  48. writeOptions *write.Options
  49. // more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
  50. batching int32
  51. batch []string
  52. mu sync.Mutex
  53. }
  54. // NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
  55. func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
  56. return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
  57. }
  58. // NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled
  59. func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
  60. api := &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
  61. api.EnableBatching()
  62. return api
  63. }
  64. func (w *writeAPIBlocking) EnableBatching() {
  65. if atomic.LoadInt32(&w.batching) == 0 {
  66. w.mu.Lock()
  67. w.batching = 1
  68. w.batch = make([]string, 0, w.writeOptions.BatchSize())
  69. w.mu.Unlock()
  70. }
  71. }
  72. func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
  73. if atomic.LoadInt32(&w.batching) > 0 {
  74. w.mu.Lock()
  75. defer w.mu.Unlock()
  76. w.batch = append(w.batch, line)
  77. if len(w.batch) == int(w.writeOptions.BatchSize()) {
  78. return w.flush(ctx)
  79. }
  80. return nil
  81. }
  82. err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
  83. if err != nil {
  84. return err
  85. }
  86. return nil
  87. }
  88. func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error {
  89. if len(line) == 0 {
  90. return nil
  91. }
  92. return w.write(ctx, strings.Join(line, "\n"))
  93. }
  94. func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point) error {
  95. line, err := w.service.EncodePoints(point...)
  96. if err != nil {
  97. return err
  98. }
  99. return w.write(ctx, line)
  100. }
  101. // flush is unsychronized helper for creating and sending batch
  102. // Must be called from synchronized block
  103. func (w *writeAPIBlocking) flush(ctx context.Context) error {
  104. if len(w.batch) > 0 {
  105. body := strings.Join(w.batch, "\n")
  106. w.batch = w.batch[:0]
  107. b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())
  108. if err:= w.service.WriteBatch(ctx, b); err != nil {
  109. return err
  110. }
  111. }
  112. return nil
  113. }
  114. func (w *writeAPIBlocking) Flush(ctx context.Context) error {
  115. if atomic.LoadInt32(&w.batching) > 0 {
  116. w.mu.Lock()
  117. defer w.mu.Unlock()
  118. return w.flush(ctx)
  119. }
  120. return nil
  121. }