123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
- // Use of this source code is governed by MIT
- // license that can be found in the LICENSE file.
- package api
- import (
- "fmt"
- "io"
- "math"
- "runtime"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/influxdata/influxdb-client-go/v2/api/http"
- "github.com/influxdata/influxdb-client-go/v2/api/write"
- "github.com/influxdata/influxdb-client-go/v2/internal/test"
- "github.com/influxdata/influxdb-client-go/v2/log"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- func TestWriteAPIWriteDefaultTag(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- opts := write.DefaultOptions().
- SetBatchSize(1)
- opts.AddDefaultTag("dft", "a")
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, opts)
- point := write.NewPoint("test",
- map[string]string{
- "vendor": "AWS",
- },
- map[string]interface{}{
- "mem_free": 1234567,
- }, time.Unix(60, 60))
- writeAPI.WritePoint(point)
- writeAPI.Close()
- require.Len(t, service.Lines(), 1)
- assert.Equal(t, "test,dft=a,vendor=AWS mem_free=1234567i 60000000060", service.Lines()[0])
- }
- func TestWriteAPIImpl_Write(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
- points := test.GenPoints(10)
- for _, p := range points {
- writeAPI.WritePoint(p)
- }
- writeAPI.Close()
- require.Len(t, service.Lines(), 10)
- for i, p := range points {
- line := write.PointToLineProtocol(p, writeAPI.writeOptions.Precision())
- //cut off last \n char
- line = line[:len(line)-1]
- assert.Equal(t, service.Lines()[i], line)
- }
- }
- func TestGzipWithFlushing(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
- points := test.GenPoints(5)
- for _, p := range points {
- writeAPI.WritePoint(p)
- }
- start := time.Now()
- writeAPI.waitForFlushing()
- end := time.Now()
- fmt.Printf("Flash duration: %dns\n", end.Sub(start).Nanoseconds())
- assert.Len(t, service.Lines(), 5)
- assert.True(t, service.WasGzip())
- service.Close()
- writeAPI.writeOptions.SetUseGZip(false)
- for _, p := range points {
- writeAPI.WritePoint(p)
- }
- writeAPI.waitForFlushing()
- assert.Len(t, service.Lines(), 5)
- assert.False(t, service.WasGzip())
- writeAPI.Close()
- }
- func TestFlushInterval(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(30))
- points := test.GenPoints(5)
- for _, p := range points {
- writeAPI.WritePoint(p)
- }
- require.Len(t, service.Lines(), 0)
- <-time.After(time.Millisecond * 50)
- require.Len(t, service.Lines(), 5)
- writeAPI.Close()
- service.Close()
- }
- func TestRetry(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
- points := test.GenPoints(15)
- for i := 0; i < 5; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- require.Len(t, service.Lines(), 5)
- service.Close()
- service.SetReplyError(&http.Error{
- StatusCode: 429,
- RetryAfter: 1,
- })
- for i := 0; i < 5; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- require.Len(t, service.Lines(), 0)
- service.Close()
- for i := 5; i < 10; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- require.Len(t, service.Lines(), 0)
- <-time.After(time.Second + 50*time.Millisecond)
- for i := 10; i < 15; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- require.Len(t, service.Lines(), 15)
- assert.True(t, strings.HasPrefix(service.Lines()[7], "test,hostname=host_7"))
- assert.True(t, strings.HasPrefix(service.Lines()[14], "test,hostname=host_14"))
- writeAPI.Close()
- }
- func TestWriteError(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- service.SetReplyError(&http.Error{
- StatusCode: 400,
- Code: "write",
- Message: "error",
- })
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
- errCh := writeAPI.Errors()
- var recErr error
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- recErr = <-errCh
- wg.Done()
- }()
- points := test.GenPoints(15)
- for i := 0; i < 5; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- wg.Wait()
- require.NotNil(t, recErr)
- writeAPI.Close()
- }
- func TestWriteErrorCallback(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- service.SetReplyError(&http.Error{
- StatusCode: 429,
- Code: "write",
- Message: "error",
- })
- // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
- retryInterval := uint(1)
- if runtime.GOOS == "windows" {
- retryInterval = 20
- }
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(retryInterval))
- writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
- return retryAttempts < 2
- })
- points := test.GenPoints(10)
- // first batch will be discarded by callback after 3 write attempts, second batch should survive with only one failed attempt
- for i, j := 0, 0; i < 6; i++ {
- writeAPI.WritePoint(points[i])
- writeAPI.waitForFlushing()
- w := int(math.Pow(5, float64(j)) * float64(retryInterval))
- fmt.Printf("Waiting %dms\n", w)
- <-time.After(time.Duration(w) * time.Millisecond)
- j++
- if j == 3 {
- j = 0
- }
- }
- service.SetReplyError(nil)
- writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
- return true
- })
- for i := 6; i < 10; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.waitForFlushing()
- assert.Len(t, service.Lines(), 9)
- writeAPI.Close()
- }
- func TestClosing(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
- points := test.GenPoints(15)
- for i := 0; i < 5; i++ {
- writeAPI.WritePoint(points[i])
- }
- writeAPI.Close()
- require.Len(t, service.Lines(), 5)
- writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
- service.Close()
- service.SetReplyError(&http.Error{
- StatusCode: 425,
- })
- _ = writeAPI.Errors()
- for i := 0; i < 15; i++ {
- writeAPI.WritePoint(points[i])
- }
- start := time.Now()
- writeAPI.Close()
- diff := time.Since(start)
- fmt.Println("Diff", diff)
- assert.Len(t, service.Lines(), 0)
- }
- func TestFlushWithRetries(t *testing.T) {
- service := test.NewTestService(t, "http://localhost:8888")
- log.Log.SetLogLevel(log.DebugLevel)
- writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetRetryInterval(200).SetBatchSize(1))
- points := test.GenPoints(5)
- fails := 0
- var mu sync.Mutex
- service.SetRequestHandler(func(url string, body io.Reader) error {
- mu.Lock()
- defer mu.Unlock()
- // fail 4 times, then succeed on the 5th try - maxRetries default is 5
- if fails >= 4 {
- _ = service.DecodeLines(body)
- return nil
- }
- fails++
- return fmt.Errorf("spurious failure")
- })
- // write will try first batch and others will be put to the retry queue of retry delay caused by first write error
- for i := 0; i < len(points); i++ {
- writeAPI.WritePoint(points[i])
- }
- // Flush will try sending first batch again and then others
- // 1st, 2nd and 3rd will fail, because test service rejects 4 writes
- writeAPI.Flush()
- writeAPI.Close()
- // two remained
- assert.Equal(t, 2, len(service.Lines()))
- }
|