123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
- // Use of this source code is governed by MIT
- // license that can be found in the LICENSE file.
- package write
- import (
- "context"
- "errors"
- "fmt"
- ilog "log"
- ihttp "net/http"
- "net/http/httptest"
- "runtime"
- "strings"
- "sync"
- "testing"
- "time"
- "github.com/influxdata/influxdb-client-go/v2/api/http"
- "github.com/influxdata/influxdb-client-go/v2/api/write"
- "github.com/influxdata/influxdb-client-go/v2/internal/test"
- "github.com/influxdata/influxdb-client-go/v2/log"
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- )
- func TestPrecisionToString(t *testing.T) {
- assert.Equal(t, "ns", precisionToString(time.Nanosecond))
- assert.Equal(t, "us", precisionToString(time.Microsecond))
- assert.Equal(t, "ms", precisionToString(time.Millisecond))
- assert.Equal(t, "s", precisionToString(time.Second))
- assert.Equal(t, "ns", precisionToString(time.Hour))
- assert.Equal(t, "ns", precisionToString(time.Microsecond*20))
- }
- func TestAddDefaultTags(t *testing.T) {
- hs := test.NewTestService(t, "http://localhost:8888")
- opts := write.DefaultOptions()
- assert.Len(t, opts.DefaultTags(), 0)
- opts.AddDefaultTag("dt1", "val1")
- opts.AddDefaultTag("zdt", "val2")
- srv := NewService("org", "buc", hs, opts)
- p := write.NewPointWithMeasurement("test")
- p.AddTag("id", "101")
- p.AddField("float32", float32(80.0))
- s, err := srv.EncodePoints(p)
- require.Nil(t, err)
- assert.Equal(t, "test,dt1=val1,id=101,zdt=val2 float32=80\n", s)
- assert.Len(t, p.TagList(), 1)
- p = write.NewPointWithMeasurement("x")
- p.AddTag("xt", "1")
- p.AddField("i", 1)
- s, err = srv.EncodePoints(p)
- require.Nil(t, err)
- assert.Equal(t, "x,dt1=val1,xt=1,zdt=val2 i=1i\n", s)
- assert.Len(t, p.TagList(), 1)
- p = write.NewPointWithMeasurement("d")
- p.AddTag("id", "1")
- // do not overwrite point tag
- p.AddTag("zdt", "val10")
- p.AddField("i", -1)
- s, err = srv.EncodePoints(p)
- require.Nil(t, err)
- assert.Equal(t, "d,dt1=val1,id=1,zdt=val10 i=-1i\n", s)
- assert.Len(t, p.TagList(), 2)
- }
- func TestRetryStrategy(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- opts := write.DefaultOptions().SetRetryInterval(1)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- StatusCode: 429,
- })
- // This batch will fail and it be added to retry queue
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.EqualValues(t, 1, srv.retryDelay)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- //wait retry delay + little more
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- // First batch will be tried to write again and this one will added to retry queue
- b2 := NewBatch("2\n", opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b2)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 2, 4)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- //wait retry delay + little more
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- // First batch will be tried to write again and this one will added to retry queue
- b3 := NewBatch("3\n", opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b3)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 4, 8)
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- //wait retry delay + little more
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- // First batch will be tried to write again and this one will added to retry queue
- b4 := NewBatch("4\n", opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b4)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 8, 16)
- assert.Equal(t, 4, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // Batches from retry queue will be sent first
- err = srv.HandleWrite(ctx, NewBatch("5\n", opts.MaxRetryTime()))
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- require.Len(t, hs.Lines(), 5)
- assert.Equal(t, "1", hs.Lines()[0])
- assert.Equal(t, "2", hs.Lines()[1])
- assert.Equal(t, "3", hs.Lines()[2])
- assert.Equal(t, "4", hs.Lines()[3])
- assert.Equal(t, "5", hs.Lines()[4])
- }
- func TestBufferOverwrite(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- ilog.SetFlags(ilog.Ldate | ilog.Lmicroseconds)
- hs := test.NewTestService(t, "http://localhost:8086")
- // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
- baseRetryInterval := uint(1)
- if runtime.GOOS == "windows" {
- baseRetryInterval = 20
- }
- // Buffer limit 15000, bach is 5000 => buffer for 3 batches
- opts := write.DefaultOptions().SetRetryInterval(baseRetryInterval).SetRetryBufferLimit(15000)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- StatusCode: 429,
- })
- // This batch will fail and it will be added to retry queue
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- //assert.Equal(t, uint(baseRetryInterval), srv.retryDelay)
- assertBetween(t, srv.retryDelay, baseRetryInterval, baseRetryInterval*2)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b2 := NewBatch("2\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b2)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, baseRetryInterval*2, baseRetryInterval*4)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b3 := NewBatch("3\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b3)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, baseRetryInterval*4, baseRetryInterval*8)
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- // Write early and overwrite
- b4 := NewBatch("4\n", opts.MaxRetryTime())
- // No write will occur, because retry delay has not passed yet
- // However new bach will be added to retry queue. Retry queue has limit 3,
- // so first batch will be discarded
- priorRetryDelay := srv.retryDelay
- err = srv.HandleWrite(ctx, b4)
- assert.NoError(t, err)
- assert.Equal(t, priorRetryDelay, srv.retryDelay) // Accumulated retry delay should be retained despite batch discard
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- // Overwrite
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2)
- b5 := NewBatch("5\n", opts.MaxRetryTime())
- // Second batch will be tried to write again
- // However, write will fail and as new batch is added to retry queue
- // the second batch will be discarded
- err = srv.HandleWrite(ctx, b5)
- assert.Nil(t, err) // No error should be returned, because no write was attempted (still waiting for retryDelay to expire)
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // Batches from retry queue will be sent first
- err = srv.HandleWrite(ctx, NewBatch("6\n", opts.MaxRetryTime()))
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- require.Len(t, hs.Lines(), 4)
- assert.Equal(t, "3", hs.Lines()[0])
- assert.Equal(t, "4", hs.Lines()[1])
- assert.Equal(t, "5", hs.Lines()[2])
- assert.Equal(t, "6", hs.Lines()[3])
- }
- func TestMaxRetryInterval(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- // MaxRetryInterval only 4ms, will be reached quickly
- opts := write.DefaultOptions().SetRetryInterval(1).SetMaxRetryInterval(4)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- StatusCode: 503,
- })
- // This batch will fail and it be added to retry queue
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.Equal(t, uint(1), srv.retryDelay)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b2 := NewBatch("2\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b2)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 2, 4)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b3 := NewBatch("3\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b3)
- assert.NotNil(t, err)
- // New computed delay of first batch should be 4-8, is limited to 4
- assert.EqualValues(t, 4, srv.retryDelay)
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b4 := NewBatch("4\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b4)
- assert.NotNil(t, err)
- // New computed delay of first batch should be 8-116, is limited to 4
- assert.EqualValues(t, 4, srv.retryDelay)
- assert.Equal(t, 4, srv.retryQueue.list.Len())
- }
- func min(a, b uint) uint {
- if a > b {
- return b
- }
- return a
- }
- func TestMaxRetries(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- opts := write.DefaultOptions().SetRetryInterval(1)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- StatusCode: 429,
- })
- // This batch will fail and it be added to retry queue
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.EqualValues(t, 1, srv.retryDelay)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- // Write so many batches as it is maxRetries (5)
- // First batch will be written and it will reach max retry limit
- for i, e := uint(1), uint(2); i <= opts.MaxRetries(); i++ {
- //wait retry delay + little more
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, e, e*2)
- exp := min(i+1, opts.MaxRetries())
- assert.EqualValues(t, exp, srv.retryQueue.list.Len())
- e *= 2
- }
- //Test if was removed from retry queue
- assert.True(t, b1.Evicted)
- <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // Batches from retry queue will be sent first
- err = srv.HandleWrite(ctx, NewBatch(fmt.Sprintf("%d\n", opts.MaxRetries()+2), opts.MaxRetryTime()))
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- require.Len(t, hs.Lines(), int(opts.MaxRetries()+1))
- for i := uint(2); i <= opts.MaxRetries()+2; i++ {
- assert.Equal(t, fmt.Sprintf("%d", i), hs.Lines()[i-2])
- }
- }
- func TestMaxRetryTime(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- // Set maxRetryTime 5ms
- opts := write.DefaultOptions().SetRetryInterval(1).SetMaxRetryTime(5)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- StatusCode: 429,
- })
- // This batch will fail and it be added to retry queue and it will expire 5ms after
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.EqualValues(t, 1, srv.retryDelay)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- // Wait for batch expiration
- <-time.After(5 * time.Millisecond)
- exp := opts.MaxRetryTime()
- // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
- if runtime.GOOS == "windows" {
- exp = 20
- }
- // create new batch for sending
- b := NewBatch("2\n", exp)
- // First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
- err = srv.HandleWrite(ctx, b)
- require.NotNil(t, err)
- // 1st Batch expires and writing 2nd trows error
- assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- //wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet
- <-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))))
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // A batch from retry queue will be sent first
- err = srv.HandleWrite(ctx, NewBatch("3\n", opts.MaxRetryTime()))
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- require.Len(t, hs.Lines(), 2)
- assert.Equal(t, "2", hs.Lines()[0])
- assert.Equal(t, "3", hs.Lines()[1])
- }
- func TestRetryOnConnectionError(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- //
- opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- // Set permanent non HTTP error to force writes fail and retry
- hs.SetReplyError(&http.Error{
- Err: errors.New("connection refused"),
- })
- // This batch will fail and it be added to retry queue
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.EqualValues(t, 1, srv.retryDelay)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b2 := NewBatch("2\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b2)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 2, 4)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b3 := NewBatch("3\n", opts.MaxRetryTime())
- // First batch will be tried to write again and this one will added to retry queue
- err = srv.HandleWrite(ctx, b3)
- assert.NotNil(t, err)
- assertBetween(t, srv.retryDelay, 4, 8)
- assert.Equal(t, 3, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // Batches from retry queue will be sent first
- err = srv.HandleWrite(ctx, NewBatch("4\n", opts.MaxRetryTime()))
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- require.Len(t, hs.Lines(), 4)
- assert.Equal(t, "1", hs.Lines()[0])
- assert.Equal(t, "2", hs.Lines()[1])
- assert.Equal(t, "3", hs.Lines()[2])
- assert.Equal(t, "4", hs.Lines()[3])
- }
- func TestNoRetryIfMaxRetriesIsZero(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- //
- opts := write.DefaultOptions().SetMaxRetries(0)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- hs.SetReplyError(&http.Error{
- Err: errors.New("connection refused"),
- })
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- }
- func TestWriteContextCancel(t *testing.T) {
- hs := test.NewTestService(t, "http://localhost:8888")
- opts := write.DefaultOptions()
- srv := NewService("my-org", "my-bucket", hs, opts)
- lines := test.GenRecords(10)
- ctx, cancel := context.WithCancel(context.Background())
- var err error
- var wg sync.WaitGroup
- wg.Add(1)
- go func() {
- <-time.After(10 * time.Millisecond)
- err = srv.HandleWrite(ctx, NewBatch(strings.Join(lines, "\n"), opts.MaxRetryTime()))
- wg.Done()
- }()
- cancel()
- wg.Wait()
- require.Equal(t, context.Canceled, err)
- assert.Len(t, hs.Lines(), 0)
- }
- func TestPow(t *testing.T) {
- assert.EqualValues(t, 1, pow(10, 0))
- assert.EqualValues(t, 10, pow(10, 1))
- assert.EqualValues(t, 4, pow(2, 2))
- assert.EqualValues(t, 1, pow(1, 2))
- assert.EqualValues(t, 125, pow(5, 3))
- }
- func assertBetween(t *testing.T, val, min, max uint) {
- t.Helper()
- assert.True(t, val >= min && val <= max, fmt.Sprintf("%d is outside <%d;%d>", val, min, max))
- }
- func TestComputeRetryDelay(t *testing.T) {
- hs := test.NewTestService(t, "http://localhost:8888")
- opts := write.DefaultOptions()
- srv := NewService("my-org", "my-bucket", hs, opts)
- assertBetween(t, srv.computeRetryDelay(0), 5_000, 10_000)
- assertBetween(t, srv.computeRetryDelay(1), 10_000, 20_000)
- assertBetween(t, srv.computeRetryDelay(2), 20_000, 40_000)
- assertBetween(t, srv.computeRetryDelay(3), 40_000, 80_000)
- assertBetween(t, srv.computeRetryDelay(4), 80_000, 125_000)
- for i := uint(5); i < 200; i++ { //test also limiting higher values
- assert.EqualValues(t, 125_000, srv.computeRetryDelay(i))
- }
- }
- func TestErrorCallback(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- //
- opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- hs.SetReplyError(&http.Error{
- Err: errors.New("connection refused"),
- })
- srv.SetBatchErrorCallback(func(batch *Batch, error2 http.Error) bool {
- return batch.RetryAttempts < 2
- })
- b1 := NewBatch("1\n", opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b1)
- assert.NotNil(t, err)
- assert.Equal(t, 1, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b := NewBatch("2\n", opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b)
- assert.NotNil(t, err)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
- b = NewBatch("3\n", opts.MaxRetryTime())
- err = srv.HandleWrite(ctx, b)
- assert.NotNil(t, err)
- assert.Equal(t, 2, srv.retryQueue.list.Len())
- }
- func minInt(a, b int) int {
- if a > b {
- return b
- }
- return a
- }
- func TestRetryIntervalAccumulation(t *testing.T) {
- // log.Log.SetLogLevel(log.DebugLevel)
- log.Log.SetLogLevel(log.InfoLevel)
- // Setup test service with scenario's configuration
- hs := test.NewTestService(t, "http://localhost:8086")
- baseRetryInterval := uint(20)
- if runtime.GOOS == "windows" {
- baseRetryInterval = 30
- }
- opts := write.DefaultOptions().
- SetRetryInterval(baseRetryInterval).
- SetMaxRetryInterval(300).
- SetMaxRetryTime(baseRetryInterval * 5)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond
- // Set permanent reply error to force writes fail and retry
- hs.SetReplyError(&http.Error{StatusCode: 429})
- lastInterval := uint(0)
- assert.Equal(t, uint(0), srv.retryAttempts) // Should initialize to zero
- i := 1
- for ; i <= 45; i++ {
- b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b)
- assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
- assert.GreaterOrEqual(t, srv.retryDelay, lastInterval) // Should not decrease while writes failing
- assert.LessOrEqual(t, srv.retryDelay, opts.MaxRetryInterval()) // Should not grow larger than max
- if err != nil {
- if lastInterval == opts.MaxRetryInterval() {
- // Write attempt failed, and interval was already at max, so should stay there
- assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval())
- log.Log.Infof("Retry interval capped at %d ms", srv.retryDelay)
- } else {
- // A write attempt was made and failed, so retry interval should have increased
- assert.Greater(t, srv.retryDelay, lastInterval)
- log.Log.Infof("Retry interval increased to %d ms", srv.retryDelay)
- }
- } else {
- // Write attempt was not made, so retry interval should remain the same
- assert.Equal(t, srv.retryDelay, lastInterval)
- log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
- }
- lastInterval = srv.retryDelay
- <-time.After(writeInterval)
- }
- // Clear error and let write pass
- hs.SetReplyError(nil)
- // Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state
- retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))
- log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds())
- for ; time.Until(retryTimeout) >= 0; i++ {
- b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b)
- assert.Nil(t, err) // There should be no write attempt
- assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
- assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) // Should remain the same
- log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
- <-time.After(writeInterval)
- }
- // Retry interval should now have expired, so this write attempt should succeed and cause retry queue to flush
- b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
- err := srv.HandleWrite(ctx, b)
- assert.Nil(t, err)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- assert.Equal(t, srv.retryAttempts, uint(0)) // Should reset to zero
- // Ensure proper batches got written to server
- require.Len(t, hs.Lines(), 5)
- assert.Equal(t, fmt.Sprintf("%d", i-4), hs.Lines()[0])
- assert.Equal(t, fmt.Sprintf("%d", i-3), hs.Lines()[1])
- assert.Equal(t, fmt.Sprintf("%d", i-2), hs.Lines()[2])
- assert.Equal(t, fmt.Sprintf("%d", i-1), hs.Lines()[3])
- assert.Equal(t, fmt.Sprintf("%d", i-0), hs.Lines()[4])
- // Debug line to capture output of successful test
- // assert.True(t, false)
- }
- func TestFlush(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- hs := test.NewTestService(t, "http://localhost:8086")
- //
- opts := write.DefaultOptions().SetRetryInterval(1)
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", hs, opts)
- hs.SetReplyError(&http.Error{
- Err: errors.New("connection refused"),
- })
- lines := test.GenRecords(5)
- // Test flush will fail all batches
- for _, line := range lines {
- b := NewBatch(line, 20)
- _ = srv.HandleWrite(ctx, b)
- }
- assert.Equal(t, 5, srv.retryQueue.list.Len())
- srv.Flush()
- assert.Len(t, hs.Lines(), 0)
- // Test flush will find all batches expired
- for _, line := range lines {
- b := NewBatch(line, 5)
- _ = srv.HandleWrite(ctx, b)
- }
- assert.Equal(t, 5, srv.retryQueue.list.Len())
- <-time.After(5 * time.Millisecond)
- hs.SetReplyError(nil)
- // all batches should expire
- srv.Flush()
- assert.Len(t, hs.Lines(), 0)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- // Test flush will succeed
- hs.SetReplyError(&http.Error{
- Err: errors.New("connection refused"),
- })
- for _, line := range lines {
- b := NewBatch(line, 5)
- _ = srv.HandleWrite(ctx, b)
- }
- assert.Equal(t, 5, srv.retryQueue.list.Len())
- hs.SetReplyError(nil)
- // all batches should expire
- srv.Flush()
- assert.Len(t, hs.Lines(), 5)
- assert.Equal(t, 0, srv.retryQueue.list.Len())
- }
- func TestConsistencyParam(t *testing.T) {
- hs := test.NewTestService(t, "http://localhost:8888")
- opts := write.DefaultOptions().SetConsistency(write.ConsistencyQuorum)
- srv := NewService("org", "buc", hs, opts)
- require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&consistency=quorum&org=org&precision=ns", srv.WriteURL())
- opts = write.DefaultOptions()
- srv = NewService("org", "buc", hs, opts)
- require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
- }
- func TestIgnoreErrors(t *testing.T) {
- log.Log.SetLogLevel(log.DebugLevel)
- i := 0
- server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
- i++
- w.WriteHeader(ihttp.StatusInternalServerError)
- switch i {
- case 1:
- _, _ = w.Write([]byte(`{"error":" "write failed: hinted handoff queue not empty"`))
- case 2:
- _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: field type conflict"}`))
- case 3:
- _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: points beyond retention policy"}`))
- case 4:
- _, _ = w.Write([]byte(`{"code":"internal error", "message":"unable to parse 'cpu value': invalid field format"}`))
- case 5:
- _, _ = w.Write([]byte(`{"code":"internal error", "message":"gateway error"}`))
- }
- }))
- defer server.Close()
- //
- opts := write.DefaultOptions()
- ctx := context.Background()
- srv := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), opts)
- b := NewBatch("1", 20)
- err := srv.HandleWrite(ctx, b)
- assert.NoError(t, err)
- err = srv.HandleWrite(ctx, b)
- assert.NoError(t, err)
- err = srv.HandleWrite(ctx, b)
- assert.NoError(t, err)
- err = srv.HandleWrite(ctx, b)
- assert.NoError(t, err)
- err = srv.HandleWrite(ctx, b)
- assert.Error(t, err)
- }
|