service_test.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package write
  5. import (
  6. "context"
  7. "errors"
  8. "fmt"
  9. ilog "log"
  10. ihttp "net/http"
  11. "net/http/httptest"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "testing"
  16. "time"
  17. "github.com/influxdata/influxdb-client-go/v2/api/http"
  18. "github.com/influxdata/influxdb-client-go/v2/api/write"
  19. "github.com/influxdata/influxdb-client-go/v2/internal/test"
  20. "github.com/influxdata/influxdb-client-go/v2/log"
  21. "github.com/stretchr/testify/assert"
  22. "github.com/stretchr/testify/require"
  23. )
  24. func TestPrecisionToString(t *testing.T) {
  25. assert.Equal(t, "ns", precisionToString(time.Nanosecond))
  26. assert.Equal(t, "us", precisionToString(time.Microsecond))
  27. assert.Equal(t, "ms", precisionToString(time.Millisecond))
  28. assert.Equal(t, "s", precisionToString(time.Second))
  29. assert.Equal(t, "ns", precisionToString(time.Hour))
  30. assert.Equal(t, "ns", precisionToString(time.Microsecond*20))
  31. }
  32. func TestAddDefaultTags(t *testing.T) {
  33. hs := test.NewTestService(t, "http://localhost:8888")
  34. opts := write.DefaultOptions()
  35. assert.Len(t, opts.DefaultTags(), 0)
  36. opts.AddDefaultTag("dt1", "val1")
  37. opts.AddDefaultTag("zdt", "val2")
  38. srv := NewService("org", "buc", hs, opts)
  39. p := write.NewPointWithMeasurement("test")
  40. p.AddTag("id", "101")
  41. p.AddField("float32", float32(80.0))
  42. s, err := srv.EncodePoints(p)
  43. require.Nil(t, err)
  44. assert.Equal(t, "test,dt1=val1,id=101,zdt=val2 float32=80\n", s)
  45. assert.Len(t, p.TagList(), 1)
  46. p = write.NewPointWithMeasurement("x")
  47. p.AddTag("xt", "1")
  48. p.AddField("i", 1)
  49. s, err = srv.EncodePoints(p)
  50. require.Nil(t, err)
  51. assert.Equal(t, "x,dt1=val1,xt=1,zdt=val2 i=1i\n", s)
  52. assert.Len(t, p.TagList(), 1)
  53. p = write.NewPointWithMeasurement("d")
  54. p.AddTag("id", "1")
  55. // do not overwrite point tag
  56. p.AddTag("zdt", "val10")
  57. p.AddField("i", -1)
  58. s, err = srv.EncodePoints(p)
  59. require.Nil(t, err)
  60. assert.Equal(t, "d,dt1=val1,id=1,zdt=val10 i=-1i\n", s)
  61. assert.Len(t, p.TagList(), 2)
  62. }
  63. func TestRetryStrategy(t *testing.T) {
  64. log.Log.SetLogLevel(log.DebugLevel)
  65. hs := test.NewTestService(t, "http://localhost:8086")
  66. opts := write.DefaultOptions().SetRetryInterval(1)
  67. ctx := context.Background()
  68. srv := NewService("my-org", "my-bucket", hs, opts)
  69. // Set permanent reply error to force writes fail and retry
  70. hs.SetReplyError(&http.Error{
  71. StatusCode: 429,
  72. })
  73. // This batch will fail and it be added to retry queue
  74. b1 := NewBatch("1\n", opts.MaxRetryTime())
  75. err := srv.HandleWrite(ctx, b1)
  76. assert.NotNil(t, err)
  77. assert.EqualValues(t, 1, srv.retryDelay)
  78. assert.Equal(t, 1, srv.retryQueue.list.Len())
  79. //wait retry delay + little more
  80. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  81. // First batch will be tried to write again and this one will added to retry queue
  82. b2 := NewBatch("2\n", opts.MaxRetryTime())
  83. err = srv.HandleWrite(ctx, b2)
  84. assert.NotNil(t, err)
  85. assertBetween(t, srv.retryDelay, 2, 4)
  86. assert.Equal(t, 2, srv.retryQueue.list.Len())
  87. //wait retry delay + little more
  88. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  89. // First batch will be tried to write again and this one will added to retry queue
  90. b3 := NewBatch("3\n", opts.MaxRetryTime())
  91. err = srv.HandleWrite(ctx, b3)
  92. assert.NotNil(t, err)
  93. assertBetween(t, srv.retryDelay, 4, 8)
  94. assert.Equal(t, 3, srv.retryQueue.list.Len())
  95. //wait retry delay + little more
  96. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  97. // First batch will be tried to write again and this one will added to retry queue
  98. b4 := NewBatch("4\n", opts.MaxRetryTime())
  99. err = srv.HandleWrite(ctx, b4)
  100. assert.NotNil(t, err)
  101. assertBetween(t, srv.retryDelay, 8, 16)
  102. assert.Equal(t, 4, srv.retryQueue.list.Len())
  103. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  104. // Clear error and let write pass
  105. hs.SetReplyError(nil)
  106. // Batches from retry queue will be sent first
  107. err = srv.HandleWrite(ctx, NewBatch("5\n", opts.MaxRetryTime()))
  108. assert.Nil(t, err)
  109. assert.Equal(t, 0, srv.retryQueue.list.Len())
  110. require.Len(t, hs.Lines(), 5)
  111. assert.Equal(t, "1", hs.Lines()[0])
  112. assert.Equal(t, "2", hs.Lines()[1])
  113. assert.Equal(t, "3", hs.Lines()[2])
  114. assert.Equal(t, "4", hs.Lines()[3])
  115. assert.Equal(t, "5", hs.Lines()[4])
  116. }
  117. func TestBufferOverwrite(t *testing.T) {
  118. log.Log.SetLogLevel(log.DebugLevel)
  119. ilog.SetFlags(ilog.Ldate | ilog.Lmicroseconds)
  120. hs := test.NewTestService(t, "http://localhost:8086")
  121. // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
  122. baseRetryInterval := uint(1)
  123. if runtime.GOOS == "windows" {
  124. baseRetryInterval = 20
  125. }
  126. // Buffer limit 15000, bach is 5000 => buffer for 3 batches
  127. opts := write.DefaultOptions().SetRetryInterval(baseRetryInterval).SetRetryBufferLimit(15000)
  128. ctx := context.Background()
  129. srv := NewService("my-org", "my-bucket", hs, opts)
  130. // Set permanent reply error to force writes fail and retry
  131. hs.SetReplyError(&http.Error{
  132. StatusCode: 429,
  133. })
  134. // This batch will fail and it will be added to retry queue
  135. b1 := NewBatch("1\n", opts.MaxRetryTime())
  136. err := srv.HandleWrite(ctx, b1)
  137. assert.NotNil(t, err)
  138. //assert.Equal(t, uint(baseRetryInterval), srv.retryDelay)
  139. assertBetween(t, srv.retryDelay, baseRetryInterval, baseRetryInterval*2)
  140. assert.Equal(t, 1, srv.retryQueue.list.Len())
  141. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  142. b2 := NewBatch("2\n", opts.MaxRetryTime())
  143. // First batch will be tried to write again and this one will added to retry queue
  144. err = srv.HandleWrite(ctx, b2)
  145. assert.NotNil(t, err)
  146. assertBetween(t, srv.retryDelay, baseRetryInterval*2, baseRetryInterval*4)
  147. assert.Equal(t, 2, srv.retryQueue.list.Len())
  148. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  149. b3 := NewBatch("3\n", opts.MaxRetryTime())
  150. // First batch will be tried to write again and this one will added to retry queue
  151. err = srv.HandleWrite(ctx, b3)
  152. assert.NotNil(t, err)
  153. assertBetween(t, srv.retryDelay, baseRetryInterval*4, baseRetryInterval*8)
  154. assert.Equal(t, 3, srv.retryQueue.list.Len())
  155. // Write early and overwrite
  156. b4 := NewBatch("4\n", opts.MaxRetryTime())
  157. // No write will occur, because retry delay has not passed yet
  158. // However new bach will be added to retry queue. Retry queue has limit 3,
  159. // so first batch will be discarded
  160. priorRetryDelay := srv.retryDelay
  161. err = srv.HandleWrite(ctx, b4)
  162. assert.NoError(t, err)
  163. assert.Equal(t, priorRetryDelay, srv.retryDelay) // Accumulated retry delay should be retained despite batch discard
  164. assert.Equal(t, 3, srv.retryQueue.list.Len())
  165. // Overwrite
  166. <-time.After(time.Millisecond * time.Duration(srv.retryDelay) / 2)
  167. b5 := NewBatch("5\n", opts.MaxRetryTime())
  168. // Second batch will be tried to write again
  169. // However, write will fail and as new batch is added to retry queue
  170. // the second batch will be discarded
  171. err = srv.HandleWrite(ctx, b5)
  172. assert.Nil(t, err) // No error should be returned, because no write was attempted (still waiting for retryDelay to expire)
  173. assert.Equal(t, 3, srv.retryQueue.list.Len())
  174. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  175. // Clear error and let write pass
  176. hs.SetReplyError(nil)
  177. // Batches from retry queue will be sent first
  178. err = srv.HandleWrite(ctx, NewBatch("6\n", opts.MaxRetryTime()))
  179. assert.Nil(t, err)
  180. assert.Equal(t, 0, srv.retryQueue.list.Len())
  181. require.Len(t, hs.Lines(), 4)
  182. assert.Equal(t, "3", hs.Lines()[0])
  183. assert.Equal(t, "4", hs.Lines()[1])
  184. assert.Equal(t, "5", hs.Lines()[2])
  185. assert.Equal(t, "6", hs.Lines()[3])
  186. }
  187. func TestMaxRetryInterval(t *testing.T) {
  188. log.Log.SetLogLevel(log.DebugLevel)
  189. hs := test.NewTestService(t, "http://localhost:8086")
  190. // MaxRetryInterval only 4ms, will be reached quickly
  191. opts := write.DefaultOptions().SetRetryInterval(1).SetMaxRetryInterval(4)
  192. ctx := context.Background()
  193. srv := NewService("my-org", "my-bucket", hs, opts)
  194. // Set permanent reply error to force writes fail and retry
  195. hs.SetReplyError(&http.Error{
  196. StatusCode: 503,
  197. })
  198. // This batch will fail and it be added to retry queue
  199. b1 := NewBatch("1\n", opts.MaxRetryTime())
  200. err := srv.HandleWrite(ctx, b1)
  201. assert.NotNil(t, err)
  202. assert.Equal(t, uint(1), srv.retryDelay)
  203. assert.Equal(t, 1, srv.retryQueue.list.Len())
  204. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  205. b2 := NewBatch("2\n", opts.MaxRetryTime())
  206. // First batch will be tried to write again and this one will added to retry queue
  207. err = srv.HandleWrite(ctx, b2)
  208. assert.NotNil(t, err)
  209. assertBetween(t, srv.retryDelay, 2, 4)
  210. assert.Equal(t, 2, srv.retryQueue.list.Len())
  211. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  212. b3 := NewBatch("3\n", opts.MaxRetryTime())
  213. // First batch will be tried to write again and this one will added to retry queue
  214. err = srv.HandleWrite(ctx, b3)
  215. assert.NotNil(t, err)
  216. // New computed delay of first batch should be 4-8, is limited to 4
  217. assert.EqualValues(t, 4, srv.retryDelay)
  218. assert.Equal(t, 3, srv.retryQueue.list.Len())
  219. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  220. b4 := NewBatch("4\n", opts.MaxRetryTime())
  221. // First batch will be tried to write again and this one will added to retry queue
  222. err = srv.HandleWrite(ctx, b4)
  223. assert.NotNil(t, err)
  224. // New computed delay of first batch should be 8-116, is limited to 4
  225. assert.EqualValues(t, 4, srv.retryDelay)
  226. assert.Equal(t, 4, srv.retryQueue.list.Len())
  227. }
  228. func min(a, b uint) uint {
  229. if a > b {
  230. return b
  231. }
  232. return a
  233. }
  234. func TestMaxRetries(t *testing.T) {
  235. log.Log.SetLogLevel(log.DebugLevel)
  236. hs := test.NewTestService(t, "http://localhost:8086")
  237. opts := write.DefaultOptions().SetRetryInterval(1)
  238. ctx := context.Background()
  239. srv := NewService("my-org", "my-bucket", hs, opts)
  240. // Set permanent reply error to force writes fail and retry
  241. hs.SetReplyError(&http.Error{
  242. StatusCode: 429,
  243. })
  244. // This batch will fail and it be added to retry queue
  245. b1 := NewBatch("1\n", opts.MaxRetryTime())
  246. err := srv.HandleWrite(ctx, b1)
  247. assert.NotNil(t, err)
  248. assert.EqualValues(t, 1, srv.retryDelay)
  249. assert.Equal(t, 1, srv.retryQueue.list.Len())
  250. // Write so many batches as it is maxRetries (5)
  251. // First batch will be written and it will reach max retry limit
  252. for i, e := uint(1), uint(2); i <= opts.MaxRetries(); i++ {
  253. //wait retry delay + little more
  254. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  255. b := NewBatch(fmt.Sprintf("%d\n", i+1), opts.MaxRetryTime())
  256. err = srv.HandleWrite(ctx, b)
  257. assert.NotNil(t, err)
  258. assertBetween(t, srv.retryDelay, e, e*2)
  259. exp := min(i+1, opts.MaxRetries())
  260. assert.EqualValues(t, exp, srv.retryQueue.list.Len())
  261. e *= 2
  262. }
  263. //Test if was removed from retry queue
  264. assert.True(t, b1.Evicted)
  265. <-time.After(time.Millisecond*time.Duration(srv.retryDelay) + time.Microsecond*5)
  266. // Clear error and let write pass
  267. hs.SetReplyError(nil)
  268. // Batches from retry queue will be sent first
  269. err = srv.HandleWrite(ctx, NewBatch(fmt.Sprintf("%d\n", opts.MaxRetries()+2), opts.MaxRetryTime()))
  270. assert.Nil(t, err)
  271. assert.Equal(t, 0, srv.retryQueue.list.Len())
  272. require.Len(t, hs.Lines(), int(opts.MaxRetries()+1))
  273. for i := uint(2); i <= opts.MaxRetries()+2; i++ {
  274. assert.Equal(t, fmt.Sprintf("%d", i), hs.Lines()[i-2])
  275. }
  276. }
  277. func TestMaxRetryTime(t *testing.T) {
  278. log.Log.SetLogLevel(log.DebugLevel)
  279. hs := test.NewTestService(t, "http://localhost:8086")
  280. // Set maxRetryTime 5ms
  281. opts := write.DefaultOptions().SetRetryInterval(1).SetMaxRetryTime(5)
  282. ctx := context.Background()
  283. srv := NewService("my-org", "my-bucket", hs, opts)
  284. // Set permanent reply error to force writes fail and retry
  285. hs.SetReplyError(&http.Error{
  286. StatusCode: 429,
  287. })
  288. // This batch will fail and it be added to retry queue and it will expire 5ms after
  289. b1 := NewBatch("1\n", opts.MaxRetryTime())
  290. err := srv.HandleWrite(ctx, b1)
  291. assert.NotNil(t, err)
  292. assert.EqualValues(t, 1, srv.retryDelay)
  293. assert.Equal(t, 1, srv.retryQueue.list.Len())
  294. // Wait for batch expiration
  295. <-time.After(5 * time.Millisecond)
  296. exp := opts.MaxRetryTime()
  297. // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
  298. if runtime.GOOS == "windows" {
  299. exp = 20
  300. }
  301. // create new batch for sending
  302. b := NewBatch("2\n", exp)
  303. // First batch will be checked against maxRetryTime and it will expire. New batch will fail and it will added to retry queue
  304. err = srv.HandleWrite(ctx, b)
  305. require.NotNil(t, err)
  306. // 1st Batch expires and writing 2nd trows error
  307. assert.Equal(t, "write failed (attempts 1): Unexpected status code 429", err.Error())
  308. assert.Equal(t, 1, srv.retryQueue.list.Len())
  309. //wait until remaining accumulated retryDelay has passed, because there hasn't been a successful write yet
  310. <-time.After(time.Until(srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))))
  311. // Clear error and let write pass
  312. hs.SetReplyError(nil)
  313. // A batch from retry queue will be sent first
  314. err = srv.HandleWrite(ctx, NewBatch("3\n", opts.MaxRetryTime()))
  315. assert.Nil(t, err)
  316. assert.Equal(t, 0, srv.retryQueue.list.Len())
  317. require.Len(t, hs.Lines(), 2)
  318. assert.Equal(t, "2", hs.Lines()[0])
  319. assert.Equal(t, "3", hs.Lines()[1])
  320. }
  321. func TestRetryOnConnectionError(t *testing.T) {
  322. log.Log.SetLogLevel(log.DebugLevel)
  323. hs := test.NewTestService(t, "http://localhost:8086")
  324. //
  325. opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000)
  326. ctx := context.Background()
  327. srv := NewService("my-org", "my-bucket", hs, opts)
  328. // Set permanent non HTTP error to force writes fail and retry
  329. hs.SetReplyError(&http.Error{
  330. Err: errors.New("connection refused"),
  331. })
  332. // This batch will fail and it be added to retry queue
  333. b1 := NewBatch("1\n", opts.MaxRetryTime())
  334. err := srv.HandleWrite(ctx, b1)
  335. assert.NotNil(t, err)
  336. assert.EqualValues(t, 1, srv.retryDelay)
  337. assert.Equal(t, 1, srv.retryQueue.list.Len())
  338. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  339. b2 := NewBatch("2\n", opts.MaxRetryTime())
  340. // First batch will be tried to write again and this one will added to retry queue
  341. err = srv.HandleWrite(ctx, b2)
  342. assert.NotNil(t, err)
  343. assertBetween(t, srv.retryDelay, 2, 4)
  344. assert.Equal(t, 2, srv.retryQueue.list.Len())
  345. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  346. b3 := NewBatch("3\n", opts.MaxRetryTime())
  347. // First batch will be tried to write again and this one will added to retry queue
  348. err = srv.HandleWrite(ctx, b3)
  349. assert.NotNil(t, err)
  350. assertBetween(t, srv.retryDelay, 4, 8)
  351. assert.Equal(t, 3, srv.retryQueue.list.Len())
  352. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  353. // Clear error and let write pass
  354. hs.SetReplyError(nil)
  355. // Batches from retry queue will be sent first
  356. err = srv.HandleWrite(ctx, NewBatch("4\n", opts.MaxRetryTime()))
  357. assert.Nil(t, err)
  358. assert.Equal(t, 0, srv.retryQueue.list.Len())
  359. require.Len(t, hs.Lines(), 4)
  360. assert.Equal(t, "1", hs.Lines()[0])
  361. assert.Equal(t, "2", hs.Lines()[1])
  362. assert.Equal(t, "3", hs.Lines()[2])
  363. assert.Equal(t, "4", hs.Lines()[3])
  364. }
  365. func TestNoRetryIfMaxRetriesIsZero(t *testing.T) {
  366. log.Log.SetLogLevel(log.DebugLevel)
  367. hs := test.NewTestService(t, "http://localhost:8086")
  368. //
  369. opts := write.DefaultOptions().SetMaxRetries(0)
  370. ctx := context.Background()
  371. srv := NewService("my-org", "my-bucket", hs, opts)
  372. hs.SetReplyError(&http.Error{
  373. Err: errors.New("connection refused"),
  374. })
  375. b1 := NewBatch("1\n", opts.MaxRetryTime())
  376. err := srv.HandleWrite(ctx, b1)
  377. assert.NotNil(t, err)
  378. assert.Equal(t, 0, srv.retryQueue.list.Len())
  379. }
  380. func TestWriteContextCancel(t *testing.T) {
  381. hs := test.NewTestService(t, "http://localhost:8888")
  382. opts := write.DefaultOptions()
  383. srv := NewService("my-org", "my-bucket", hs, opts)
  384. lines := test.GenRecords(10)
  385. ctx, cancel := context.WithCancel(context.Background())
  386. var err error
  387. var wg sync.WaitGroup
  388. wg.Add(1)
  389. go func() {
  390. <-time.After(10 * time.Millisecond)
  391. err = srv.HandleWrite(ctx, NewBatch(strings.Join(lines, "\n"), opts.MaxRetryTime()))
  392. wg.Done()
  393. }()
  394. cancel()
  395. wg.Wait()
  396. require.Equal(t, context.Canceled, err)
  397. assert.Len(t, hs.Lines(), 0)
  398. }
  399. func TestPow(t *testing.T) {
  400. assert.EqualValues(t, 1, pow(10, 0))
  401. assert.EqualValues(t, 10, pow(10, 1))
  402. assert.EqualValues(t, 4, pow(2, 2))
  403. assert.EqualValues(t, 1, pow(1, 2))
  404. assert.EqualValues(t, 125, pow(5, 3))
  405. }
  406. func assertBetween(t *testing.T, val, min, max uint) {
  407. t.Helper()
  408. assert.True(t, val >= min && val <= max, fmt.Sprintf("%d is outside <%d;%d>", val, min, max))
  409. }
  410. func TestComputeRetryDelay(t *testing.T) {
  411. hs := test.NewTestService(t, "http://localhost:8888")
  412. opts := write.DefaultOptions()
  413. srv := NewService("my-org", "my-bucket", hs, opts)
  414. assertBetween(t, srv.computeRetryDelay(0), 5_000, 10_000)
  415. assertBetween(t, srv.computeRetryDelay(1), 10_000, 20_000)
  416. assertBetween(t, srv.computeRetryDelay(2), 20_000, 40_000)
  417. assertBetween(t, srv.computeRetryDelay(3), 40_000, 80_000)
  418. assertBetween(t, srv.computeRetryDelay(4), 80_000, 125_000)
  419. for i := uint(5); i < 200; i++ { //test also limiting higher values
  420. assert.EqualValues(t, 125_000, srv.computeRetryDelay(i))
  421. }
  422. }
  423. func TestErrorCallback(t *testing.T) {
  424. log.Log.SetLogLevel(log.DebugLevel)
  425. hs := test.NewTestService(t, "http://localhost:8086")
  426. //
  427. opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000)
  428. ctx := context.Background()
  429. srv := NewService("my-org", "my-bucket", hs, opts)
  430. hs.SetReplyError(&http.Error{
  431. Err: errors.New("connection refused"),
  432. })
  433. srv.SetBatchErrorCallback(func(batch *Batch, error2 http.Error) bool {
  434. return batch.RetryAttempts < 2
  435. })
  436. b1 := NewBatch("1\n", opts.MaxRetryTime())
  437. err := srv.HandleWrite(ctx, b1)
  438. assert.NotNil(t, err)
  439. assert.Equal(t, 1, srv.retryQueue.list.Len())
  440. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  441. b := NewBatch("2\n", opts.MaxRetryTime())
  442. err = srv.HandleWrite(ctx, b)
  443. assert.NotNil(t, err)
  444. assert.Equal(t, 2, srv.retryQueue.list.Len())
  445. <-time.After(time.Millisecond * time.Duration(srv.retryDelay))
  446. b = NewBatch("3\n", opts.MaxRetryTime())
  447. err = srv.HandleWrite(ctx, b)
  448. assert.NotNil(t, err)
  449. assert.Equal(t, 2, srv.retryQueue.list.Len())
  450. }
  451. func minInt(a, b int) int {
  452. if a > b {
  453. return b
  454. }
  455. return a
  456. }
  457. func TestRetryIntervalAccumulation(t *testing.T) {
  458. // log.Log.SetLogLevel(log.DebugLevel)
  459. log.Log.SetLogLevel(log.InfoLevel)
  460. // Setup test service with scenario's configuration
  461. hs := test.NewTestService(t, "http://localhost:8086")
  462. baseRetryInterval := uint(20)
  463. if runtime.GOOS == "windows" {
  464. baseRetryInterval = 30
  465. }
  466. opts := write.DefaultOptions().
  467. SetRetryInterval(baseRetryInterval).
  468. SetMaxRetryInterval(300).
  469. SetMaxRetryTime(baseRetryInterval * 5)
  470. ctx := context.Background()
  471. srv := NewService("my-org", "my-bucket", hs, opts)
  472. writeInterval := time.Duration(opts.RetryInterval()) * time.Millisecond
  473. // Set permanent reply error to force writes fail and retry
  474. hs.SetReplyError(&http.Error{StatusCode: 429})
  475. lastInterval := uint(0)
  476. assert.Equal(t, uint(0), srv.retryAttempts) // Should initialize to zero
  477. i := 1
  478. for ; i <= 45; i++ {
  479. b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
  480. err := srv.HandleWrite(ctx, b)
  481. assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
  482. assert.GreaterOrEqual(t, srv.retryDelay, lastInterval) // Should not decrease while writes failing
  483. assert.LessOrEqual(t, srv.retryDelay, opts.MaxRetryInterval()) // Should not grow larger than max
  484. if err != nil {
  485. if lastInterval == opts.MaxRetryInterval() {
  486. // Write attempt failed, and interval was already at max, so should stay there
  487. assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval())
  488. log.Log.Infof("Retry interval capped at %d ms", srv.retryDelay)
  489. } else {
  490. // A write attempt was made and failed, so retry interval should have increased
  491. assert.Greater(t, srv.retryDelay, lastInterval)
  492. log.Log.Infof("Retry interval increased to %d ms", srv.retryDelay)
  493. }
  494. } else {
  495. // Write attempt was not made, so retry interval should remain the same
  496. assert.Equal(t, srv.retryDelay, lastInterval)
  497. log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
  498. }
  499. lastInterval = srv.retryDelay
  500. <-time.After(writeInterval)
  501. }
  502. // Clear error and let write pass
  503. hs.SetReplyError(nil)
  504. // Wait until write queue is ready to retry; in meantime, keep writing and confirming queue state
  505. retryTimeout := srv.lastWriteAttempt.Add(time.Millisecond * time.Duration(srv.retryDelay))
  506. log.Log.Infof("Continuing to write for %d ms until flushing write attempt", time.Until(retryTimeout).Milliseconds())
  507. for ; time.Until(retryTimeout) >= 0; i++ {
  508. b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
  509. err := srv.HandleWrite(ctx, b)
  510. assert.Nil(t, err) // There should be no write attempt
  511. assert.Equal(t, minInt(i, 5), srv.retryQueue.list.Len())
  512. assert.Equal(t, srv.retryDelay, opts.MaxRetryInterval()) // Should remain the same
  513. log.Log.Infof("Retry interval still at %d ms", srv.retryDelay)
  514. <-time.After(writeInterval)
  515. }
  516. // Retry interval should now have expired, so this write attempt should succeed and cause retry queue to flush
  517. b := NewBatch(fmt.Sprintf("%d\n", i), opts.MaxRetryTime())
  518. err := srv.HandleWrite(ctx, b)
  519. assert.Nil(t, err)
  520. assert.Equal(t, 0, srv.retryQueue.list.Len())
  521. assert.Equal(t, srv.retryAttempts, uint(0)) // Should reset to zero
  522. // Ensure proper batches got written to server
  523. require.Len(t, hs.Lines(), 5)
  524. assert.Equal(t, fmt.Sprintf("%d", i-4), hs.Lines()[0])
  525. assert.Equal(t, fmt.Sprintf("%d", i-3), hs.Lines()[1])
  526. assert.Equal(t, fmt.Sprintf("%d", i-2), hs.Lines()[2])
  527. assert.Equal(t, fmt.Sprintf("%d", i-1), hs.Lines()[3])
  528. assert.Equal(t, fmt.Sprintf("%d", i-0), hs.Lines()[4])
  529. // Debug line to capture output of successful test
  530. // assert.True(t, false)
  531. }
  532. func TestFlush(t *testing.T) {
  533. log.Log.SetLogLevel(log.DebugLevel)
  534. hs := test.NewTestService(t, "http://localhost:8086")
  535. //
  536. opts := write.DefaultOptions().SetRetryInterval(1)
  537. ctx := context.Background()
  538. srv := NewService("my-org", "my-bucket", hs, opts)
  539. hs.SetReplyError(&http.Error{
  540. Err: errors.New("connection refused"),
  541. })
  542. lines := test.GenRecords(5)
  543. // Test flush will fail all batches
  544. for _, line := range lines {
  545. b := NewBatch(line, 20)
  546. _ = srv.HandleWrite(ctx, b)
  547. }
  548. assert.Equal(t, 5, srv.retryQueue.list.Len())
  549. srv.Flush()
  550. assert.Len(t, hs.Lines(), 0)
  551. // Test flush will find all batches expired
  552. for _, line := range lines {
  553. b := NewBatch(line, 5)
  554. _ = srv.HandleWrite(ctx, b)
  555. }
  556. assert.Equal(t, 5, srv.retryQueue.list.Len())
  557. <-time.After(5 * time.Millisecond)
  558. hs.SetReplyError(nil)
  559. // all batches should expire
  560. srv.Flush()
  561. assert.Len(t, hs.Lines(), 0)
  562. assert.Equal(t, 0, srv.retryQueue.list.Len())
  563. // Test flush will succeed
  564. hs.SetReplyError(&http.Error{
  565. Err: errors.New("connection refused"),
  566. })
  567. for _, line := range lines {
  568. b := NewBatch(line, 5)
  569. _ = srv.HandleWrite(ctx, b)
  570. }
  571. assert.Equal(t, 5, srv.retryQueue.list.Len())
  572. hs.SetReplyError(nil)
  573. // all batches should expire
  574. srv.Flush()
  575. assert.Len(t, hs.Lines(), 5)
  576. assert.Equal(t, 0, srv.retryQueue.list.Len())
  577. }
  578. func TestConsistencyParam(t *testing.T) {
  579. hs := test.NewTestService(t, "http://localhost:8888")
  580. opts := write.DefaultOptions().SetConsistency(write.ConsistencyQuorum)
  581. srv := NewService("org", "buc", hs, opts)
  582. require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&consistency=quorum&org=org&precision=ns", srv.WriteURL())
  583. opts = write.DefaultOptions()
  584. srv = NewService("org", "buc", hs, opts)
  585. require.Equal(t, "http://localhost:8888/api/v2/write?bucket=buc&org=org&precision=ns", srv.WriteURL())
  586. }
  587. func TestIgnoreErrors(t *testing.T) {
  588. log.Log.SetLogLevel(log.DebugLevel)
  589. i := 0
  590. server := httptest.NewServer(ihttp.HandlerFunc(func(w ihttp.ResponseWriter, r *ihttp.Request) {
  591. i++
  592. w.WriteHeader(ihttp.StatusInternalServerError)
  593. switch i {
  594. case 1:
  595. _, _ = w.Write([]byte(`{"error":" "write failed: hinted handoff queue not empty"`))
  596. case 2:
  597. _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: field type conflict"}`))
  598. case 3:
  599. _, _ = w.Write([]byte(`{"code":"internal error", "message":"partial write: points beyond retention policy"}`))
  600. case 4:
  601. _, _ = w.Write([]byte(`{"code":"internal error", "message":"unable to parse 'cpu value': invalid field format"}`))
  602. case 5:
  603. _, _ = w.Write([]byte(`{"code":"internal error", "message":"gateway error"}`))
  604. }
  605. }))
  606. defer server.Close()
  607. //
  608. opts := write.DefaultOptions()
  609. ctx := context.Background()
  610. srv := NewService("my-org", "my-bucket", http.NewService(server.URL, "", http.DefaultOptions()), opts)
  611. b := NewBatch("1", 20)
  612. err := srv.HandleWrite(ctx, b)
  613. assert.NoError(t, err)
  614. err = srv.HandleWrite(ctx, b)
  615. assert.NoError(t, err)
  616. err = srv.HandleWrite(ctx, b)
  617. assert.NoError(t, err)
  618. err = srv.HandleWrite(ctx, b)
  619. assert.NoError(t, err)
  620. err = srv.HandleWrite(ctx, b)
  621. assert.Error(t, err)
  622. }