123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
- // Use of this source code is governed by MIT
- // license that can be found in the LICENSE file.
- package api
- import (
- "context"
- "strings"
- "sync"
- "sync/atomic"
- http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
- "github.com/influxdata/influxdb-client-go/v2/api/write"
- iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
- )
- // WriteAPIBlocking offers blocking methods for writing time series data synchronously into an InfluxDB server.
- // It doesn't implicitly create batches of points by default. Batches are created from array of points/records.
- //
- // Implicit batching is enabled with EnableBatching(). In this mode, each call to WritePoint or WriteRecord adds a line
- // to internal buffer. If length ot the buffer is equal to the batch-size (set in write.Options), the buffer is sent to the server
- // and the result of the operation is returned.
- // When a point is written to the buffer, nil error is always returned.
- // Flush() can be used to trigger sending of batch when it doesn't have the batch-size.
- //
- // Synchronous writing is intended to use for writing less frequent data, such as a weather sensing, or if there is a need to have explicit control of failed batches.
- //
- // WriteAPIBlocking can be used concurrently.
- // When using multiple goroutines for writing, use a single WriteAPIBlocking instance in all goroutines.
- type WriteAPIBlocking interface {
- // WriteRecord writes line protocol record(s) into bucket.
- // WriteRecord writes lines without implicit batching by default, batch is created from given number of records.
- // Automatic batching can be enabled by EnableBatching()
- // Individual arguments can also be batches (multiple records separated by newline).
- // Non-blocking alternative is available in the WriteAPI interface
- WriteRecord(ctx context.Context, line ...string) error
- // WritePoint data point into bucket.
- // WriteRecord writes points without implicit batching by default, batch is created from given number of points.
- // Automatic batching can be enabled by EnableBatching().
- // Non-blocking alternative is available in the WriteAPI interface
- WritePoint(ctx context.Context, point ...*write.Point) error
- // EnableBatching turns on implicit batching
- // Batch size is controlled via write.Options
- EnableBatching()
- // Flush forces write of buffer if batching is enabled, even buffer doesn't have the batch-size.
- Flush(ctx context.Context) error
- }
- // writeAPIBlocking implements WriteAPIBlocking interface
- type writeAPIBlocking struct {
- service *iwrite.Service
- writeOptions *write.Options
- // more appropriate Bool type from sync/atomic cannot be used because it is available since go 1.19
- batching int32
- batch []string
- mu sync.Mutex
- }
- // NewWriteAPIBlocking creates new instance of blocking write client for writing data to bucket belonging to org
- func NewWriteAPIBlocking(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
- return &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
- }
- // NewWriteAPIBlockingWithBatching creates new instance of blocking write client for writing data to bucket belonging to org with batching enabled
- func NewWriteAPIBlockingWithBatching(org string, bucket string, service http2.Service, writeOptions *write.Options) WriteAPIBlocking {
- api := &writeAPIBlocking{service: iwrite.NewService(org, bucket, service, writeOptions), writeOptions: writeOptions}
- api.EnableBatching()
- return api
- }
- func (w *writeAPIBlocking) EnableBatching() {
- if atomic.LoadInt32(&w.batching) == 0 {
- w.mu.Lock()
- w.batching = 1
- w.batch = make([]string, 0, w.writeOptions.BatchSize())
- w.mu.Unlock()
- }
- }
- func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
- if atomic.LoadInt32(&w.batching) > 0 {
- w.mu.Lock()
- defer w.mu.Unlock()
- w.batch = append(w.batch, line)
- if len(w.batch) == int(w.writeOptions.BatchSize()) {
- return w.flush(ctx)
- }
- return nil
- }
- err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.MaxRetryTime()))
- if err != nil {
- return err
- }
- return nil
- }
- func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error {
- if len(line) == 0 {
- return nil
- }
- return w.write(ctx, strings.Join(line, "\n"))
- }
- func (w *writeAPIBlocking) WritePoint(ctx context.Context, point ...*write.Point) error {
- line, err := w.service.EncodePoints(point...)
- if err != nil {
- return err
- }
- return w.write(ctx, line)
- }
- // flush is unsychronized helper for creating and sending batch
- // Must be called from synchronized block
- func (w *writeAPIBlocking) flush(ctx context.Context) error {
- if len(w.batch) > 0 {
- body := strings.Join(w.batch, "\n")
- w.batch = w.batch[:0]
- b := iwrite.NewBatch(body, w.writeOptions.MaxRetryTime())
- if err:= w.service.WriteBatch(ctx, b); err != nil {
- return err
- }
- }
- return nil
- }
- func (w *writeAPIBlocking) Flush(ctx context.Context) error {
- if atomic.LoadInt32(&w.batching) > 0 {
- w.mu.Lock()
- defer w.mu.Unlock()
- return w.flush(ctx)
- }
- return nil
- }
|