write.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "context"
  7. "strings"
  8. "sync"
  9. "sync/atomic"
  10. "time"
  11. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  12. "github.com/influxdata/influxdb-client-go/v2/api/write"
  13. "github.com/influxdata/influxdb-client-go/v2/internal/log"
  14. iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
  15. )
  16. // WriteFailedCallback is synchronously notified in case non-blocking write fails.
  17. // batch contains complete payload, error holds detailed error information,
  18. // retryAttempts means number of retries, 0 if it failed during first write.
  19. // It must return true if WriteAPI should continue with retrying, false will discard the batch.
  20. type WriteFailedCallback func(batch string, error http2.Error, retryAttempts uint) bool
  21. // WriteAPI is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server.
  22. // WriteAPI can be used concurrently.
  23. // When using multiple goroutines for writing, use a single WriteAPI instance in all goroutines.
  24. type WriteAPI interface {
  25. // WriteRecord writes asynchronously line protocol record into bucket.
  26. // WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
  27. // Blocking alternative is available in the WriteAPIBlocking interface
  28. WriteRecord(line string)
  29. // WritePoint writes asynchronously Point into bucket.
  30. // WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size.
  31. // Blocking alternative is available in the WriteAPIBlocking interface
  32. WritePoint(point *write.Point)
  33. // Flush forces all pending writes from the buffer to be sent
  34. Flush()
  35. // Errors returns a channel for reading errors which occurs during async writes.
  36. // Must be called before performing any writes for errors to be collected.
  37. // The chan is unbuffered and must be drained or the writer will block.
  38. Errors() <-chan error
  39. // SetWriteFailedCallback sets callback allowing custom handling of failed writes.
  40. // If callback returns true, failed batch will be retried, otherwise discarded.
  41. SetWriteFailedCallback(cb WriteFailedCallback)
  42. }
  43. // WriteAPIImpl provides main implementation for WriteAPI
  44. type WriteAPIImpl struct {
  45. service *iwrite.Service
  46. writeBuffer []string
  47. errCh chan error
  48. writeCh chan *iwrite.Batch
  49. bufferCh chan string
  50. writeStop chan struct{}
  51. bufferStop chan struct{}
  52. bufferFlush chan struct{}
  53. doneCh chan struct{}
  54. bufferInfoCh chan writeBuffInfoReq
  55. writeInfoCh chan writeBuffInfoReq
  56. writeOptions *write.Options
  57. closingMu *sync.Mutex
  58. // more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
  59. isErrChReader int32
  60. }
  61. type writeBuffInfoReq struct {
  62. writeBuffLen int
  63. }
  64. // NewWriteAPI returns new non-blocking write client for writing data to bucket belonging to org
  65. func NewWriteAPI(org string, bucket string, service http2.Service, writeOptions *write.Options) *WriteAPIImpl {
  66. w := &WriteAPIImpl{
  67. service: iwrite.NewService(org, bucket, service, writeOptions),
  68. errCh: make(chan error, 1),
  69. writeBuffer: make([]string, 0, writeOptions.BatchSize()+1),
  70. writeCh: make(chan *iwrite.Batch),
  71. bufferCh: make(chan string),
  72. bufferStop: make(chan struct{}),
  73. writeStop: make(chan struct{}),
  74. bufferFlush: make(chan struct{}),
  75. doneCh: make(chan struct{}),
  76. bufferInfoCh: make(chan writeBuffInfoReq),
  77. writeInfoCh: make(chan writeBuffInfoReq),
  78. writeOptions: writeOptions,
  79. closingMu: &sync.Mutex{},
  80. }
  81. go w.bufferProc()
  82. go w.writeProc()
  83. return w
  84. }
  85. // SetWriteFailedCallback sets callback allowing custom handling of failed writes.
  86. // If callback returns true, failed batch will be retried, otherwise discarded.
  87. func (w *WriteAPIImpl) SetWriteFailedCallback(cb WriteFailedCallback) {
  88. w.service.SetBatchErrorCallback(func(batch *iwrite.Batch, error2 http2.Error) bool {
  89. return cb(batch.Batch, error2, batch.RetryAttempts)
  90. })
  91. }
  92. // Errors returns a channel for reading errors which occurs during async writes.
  93. // Must be called before performing any writes for errors to be collected.
  94. // New error is skipped when channel is not read.
  95. func (w *WriteAPIImpl) Errors() <-chan error {
  96. w.setErrChanRead()
  97. return w.errCh
  98. }
  99. // Flush forces all pending writes from the buffer to be sent.
  100. // Flush also tries sending batches from retry queue without additional retrying.
  101. func (w *WriteAPIImpl) Flush() {
  102. w.bufferFlush <- struct{}{}
  103. w.waitForFlushing()
  104. w.service.Flush()
  105. }
  106. func (w *WriteAPIImpl) waitForFlushing() {
  107. for {
  108. w.bufferInfoCh <- writeBuffInfoReq{}
  109. writeBuffInfo := <-w.bufferInfoCh
  110. if writeBuffInfo.writeBuffLen == 0 {
  111. break
  112. }
  113. log.Info("Waiting buffer is flushed")
  114. <-time.After(time.Millisecond)
  115. }
  116. for {
  117. w.writeInfoCh <- writeBuffInfoReq{}
  118. writeBuffInfo := <-w.writeInfoCh
  119. if writeBuffInfo.writeBuffLen == 0 {
  120. break
  121. }
  122. log.Info("Waiting buffer is flushed")
  123. <-time.After(time.Millisecond)
  124. }
  125. }
  126. func (w *WriteAPIImpl) bufferProc() {
  127. log.Info("Buffer proc started")
  128. ticker := time.NewTicker(time.Duration(w.writeOptions.FlushInterval()) * time.Millisecond)
  129. x:
  130. for {
  131. select {
  132. case line := <-w.bufferCh:
  133. w.writeBuffer = append(w.writeBuffer, line)
  134. if len(w.writeBuffer) == int(w.writeOptions.BatchSize()) {
  135. w.flushBuffer()
  136. }
  137. case <-ticker.C:
  138. w.flushBuffer()
  139. case <-w.bufferFlush:
  140. w.flushBuffer()
  141. case <-w.bufferStop:
  142. ticker.Stop()
  143. w.flushBuffer()
  144. break x
  145. case buffInfo := <-w.bufferInfoCh:
  146. buffInfo.writeBuffLen = len(w.bufferInfoCh)
  147. w.bufferInfoCh <- buffInfo
  148. }
  149. }
  150. log.Info("Buffer proc finished")
  151. w.doneCh <- struct{}{}
  152. }
  153. func (w *WriteAPIImpl) flushBuffer() {
  154. if len(w.writeBuffer) > 0 {
  155. log.Info("sending batch")
  156. batch := iwrite.NewBatch(buffer(w.writeBuffer), w.writeOptions.MaxRetryTime())
  157. w.writeCh <- batch
  158. w.writeBuffer = w.writeBuffer[:0]
  159. }
  160. }
  161. func (w *WriteAPIImpl) isErrChanRead() bool {
  162. return atomic.LoadInt32(&w.isErrChReader) > 0
  163. }
  164. func (w *WriteAPIImpl) setErrChanRead() {
  165. atomic.StoreInt32(&w.isErrChReader, 1)
  166. }
  167. func (w *WriteAPIImpl) writeProc() {
  168. log.Info("Write proc started")
  169. x:
  170. for {
  171. select {
  172. case batch := <-w.writeCh:
  173. err := w.service.HandleWrite(context.Background(), batch)
  174. if err != nil && w.isErrChanRead() {
  175. select {
  176. case w.errCh <- err:
  177. default:
  178. log.Warn("Cannot write error to error channel, it is not read")
  179. }
  180. }
  181. case <-w.writeStop:
  182. log.Info("Write proc: received stop")
  183. break x
  184. case buffInfo := <-w.writeInfoCh:
  185. buffInfo.writeBuffLen = len(w.writeCh)
  186. w.writeInfoCh <- buffInfo
  187. }
  188. }
  189. log.Info("Write proc finished")
  190. w.doneCh <- struct{}{}
  191. }
  192. // Close finishes outstanding write operations,
  193. // stop background routines and closes all channels
  194. func (w *WriteAPIImpl) Close() {
  195. w.closingMu.Lock()
  196. defer w.closingMu.Unlock()
  197. if w.writeCh != nil {
  198. // Flush outstanding metrics
  199. w.Flush()
  200. // stop and wait for buffer proc
  201. close(w.bufferStop)
  202. <-w.doneCh
  203. close(w.bufferFlush)
  204. close(w.bufferCh)
  205. // stop and wait for write proc
  206. close(w.writeStop)
  207. <-w.doneCh
  208. close(w.writeCh)
  209. close(w.writeInfoCh)
  210. close(w.bufferInfoCh)
  211. w.writeCh = nil
  212. close(w.errCh)
  213. w.errCh = nil
  214. }
  215. }
  216. // WriteRecord writes asynchronously line protocol record into bucket.
  217. // WriteRecord adds record into the buffer which is sent on the background when it reaches the batch size.
  218. // Blocking alternative is available in the WriteAPIBlocking interface
  219. func (w *WriteAPIImpl) WriteRecord(line string) {
  220. b := []byte(line)
  221. b = append(b, 0xa)
  222. w.bufferCh <- string(b)
  223. }
  224. // WritePoint writes asynchronously Point into bucket.
  225. // WritePoint adds Point into the buffer which is sent on the background when it reaches the batch size.
  226. // Blocking alternative is available in the WriteAPIBlocking interface
  227. func (w *WriteAPIImpl) WritePoint(point *write.Point) {
  228. line, err := w.service.EncodePoints(point)
  229. if err != nil {
  230. log.Errorf("point encoding error: %s\n", err.Error())
  231. if w.errCh != nil {
  232. w.errCh <- err
  233. }
  234. } else {
  235. w.bufferCh <- line
  236. }
  237. }
  238. func buffer(lines []string) string {
  239. return strings.Join(lines, "")
  240. }