write_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "fmt"
  7. "io"
  8. "math"
  9. "runtime"
  10. "strings"
  11. "sync"
  12. "testing"
  13. "time"
  14. "github.com/influxdata/influxdb-client-go/v2/api/http"
  15. "github.com/influxdata/influxdb-client-go/v2/api/write"
  16. "github.com/influxdata/influxdb-client-go/v2/internal/test"
  17. "github.com/influxdata/influxdb-client-go/v2/log"
  18. "github.com/stretchr/testify/assert"
  19. "github.com/stretchr/testify/require"
  20. )
  21. func TestWriteAPIWriteDefaultTag(t *testing.T) {
  22. service := test.NewTestService(t, "http://localhost:8888")
  23. opts := write.DefaultOptions().
  24. SetBatchSize(1)
  25. opts.AddDefaultTag("dft", "a")
  26. writeAPI := NewWriteAPI("my-org", "my-bucket", service, opts)
  27. point := write.NewPoint("test",
  28. map[string]string{
  29. "vendor": "AWS",
  30. },
  31. map[string]interface{}{
  32. "mem_free": 1234567,
  33. }, time.Unix(60, 60))
  34. writeAPI.WritePoint(point)
  35. writeAPI.Close()
  36. require.Len(t, service.Lines(), 1)
  37. assert.Equal(t, "test,dft=a,vendor=AWS mem_free=1234567i 60000000060", service.Lines()[0])
  38. }
  39. func TestWriteAPIImpl_Write(t *testing.T) {
  40. service := test.NewTestService(t, "http://localhost:8888")
  41. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  42. points := test.GenPoints(10)
  43. for _, p := range points {
  44. writeAPI.WritePoint(p)
  45. }
  46. writeAPI.Close()
  47. require.Len(t, service.Lines(), 10)
  48. for i, p := range points {
  49. line := write.PointToLineProtocol(p, writeAPI.writeOptions.Precision())
  50. //cut off last \n char
  51. line = line[:len(line)-1]
  52. assert.Equal(t, service.Lines()[i], line)
  53. }
  54. }
  55. func TestGzipWithFlushing(t *testing.T) {
  56. service := test.NewTestService(t, "http://localhost:8888")
  57. log.Log.SetLogLevel(log.DebugLevel)
  58. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
  59. points := test.GenPoints(5)
  60. for _, p := range points {
  61. writeAPI.WritePoint(p)
  62. }
  63. start := time.Now()
  64. writeAPI.waitForFlushing()
  65. end := time.Now()
  66. fmt.Printf("Flash duration: %dns\n", end.Sub(start).Nanoseconds())
  67. assert.Len(t, service.Lines(), 5)
  68. assert.True(t, service.WasGzip())
  69. service.Close()
  70. writeAPI.writeOptions.SetUseGZip(false)
  71. for _, p := range points {
  72. writeAPI.WritePoint(p)
  73. }
  74. writeAPI.waitForFlushing()
  75. assert.Len(t, service.Lines(), 5)
  76. assert.False(t, service.WasGzip())
  77. writeAPI.Close()
  78. }
  79. func TestFlushInterval(t *testing.T) {
  80. service := test.NewTestService(t, "http://localhost:8888")
  81. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(30))
  82. points := test.GenPoints(5)
  83. for _, p := range points {
  84. writeAPI.WritePoint(p)
  85. }
  86. require.Len(t, service.Lines(), 0)
  87. <-time.After(time.Millisecond * 50)
  88. require.Len(t, service.Lines(), 5)
  89. writeAPI.Close()
  90. service.Close()
  91. }
  92. func TestRetry(t *testing.T) {
  93. service := test.NewTestService(t, "http://localhost:8888")
  94. log.Log.SetLogLevel(log.DebugLevel)
  95. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
  96. points := test.GenPoints(15)
  97. for i := 0; i < 5; i++ {
  98. writeAPI.WritePoint(points[i])
  99. }
  100. writeAPI.waitForFlushing()
  101. require.Len(t, service.Lines(), 5)
  102. service.Close()
  103. service.SetReplyError(&http.Error{
  104. StatusCode: 429,
  105. RetryAfter: 1,
  106. })
  107. for i := 0; i < 5; i++ {
  108. writeAPI.WritePoint(points[i])
  109. }
  110. writeAPI.waitForFlushing()
  111. require.Len(t, service.Lines(), 0)
  112. service.Close()
  113. for i := 5; i < 10; i++ {
  114. writeAPI.WritePoint(points[i])
  115. }
  116. writeAPI.waitForFlushing()
  117. require.Len(t, service.Lines(), 0)
  118. <-time.After(time.Second + 50*time.Millisecond)
  119. for i := 10; i < 15; i++ {
  120. writeAPI.WritePoint(points[i])
  121. }
  122. writeAPI.waitForFlushing()
  123. require.Len(t, service.Lines(), 15)
  124. assert.True(t, strings.HasPrefix(service.Lines()[7], "test,hostname=host_7"))
  125. assert.True(t, strings.HasPrefix(service.Lines()[14], "test,hostname=host_14"))
  126. writeAPI.Close()
  127. }
  128. func TestWriteError(t *testing.T) {
  129. service := test.NewTestService(t, "http://localhost:8888")
  130. log.Log.SetLogLevel(log.DebugLevel)
  131. service.SetReplyError(&http.Error{
  132. StatusCode: 400,
  133. Code: "write",
  134. Message: "error",
  135. })
  136. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  137. errCh := writeAPI.Errors()
  138. var recErr error
  139. var wg sync.WaitGroup
  140. wg.Add(1)
  141. go func() {
  142. recErr = <-errCh
  143. wg.Done()
  144. }()
  145. points := test.GenPoints(15)
  146. for i := 0; i < 5; i++ {
  147. writeAPI.WritePoint(points[i])
  148. }
  149. writeAPI.waitForFlushing()
  150. wg.Wait()
  151. require.NotNil(t, recErr)
  152. writeAPI.Close()
  153. }
  154. func TestWriteErrorCallback(t *testing.T) {
  155. service := test.NewTestService(t, "http://localhost:8888")
  156. log.Log.SetLogLevel(log.DebugLevel)
  157. service.SetReplyError(&http.Error{
  158. StatusCode: 429,
  159. Code: "write",
  160. Message: "error",
  161. })
  162. // sleep takes at least more than 10ms (sometimes 15ms) on Windows https://github.com/golang/go/issues/44343
  163. retryInterval := uint(1)
  164. if runtime.GOOS == "windows" {
  165. retryInterval = 20
  166. }
  167. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(1).SetRetryInterval(retryInterval))
  168. writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
  169. return retryAttempts < 2
  170. })
  171. points := test.GenPoints(10)
  172. // first batch will be discarded by callback after 3 write attempts, second batch should survive with only one failed attempt
  173. for i, j := 0, 0; i < 6; i++ {
  174. writeAPI.WritePoint(points[i])
  175. writeAPI.waitForFlushing()
  176. w := int(math.Pow(5, float64(j)) * float64(retryInterval))
  177. fmt.Printf("Waiting %dms\n", w)
  178. <-time.After(time.Duration(w) * time.Millisecond)
  179. j++
  180. if j == 3 {
  181. j = 0
  182. }
  183. }
  184. service.SetReplyError(nil)
  185. writeAPI.SetWriteFailedCallback(func(batch string, error http.Error, retryAttempts uint) bool {
  186. return true
  187. })
  188. for i := 6; i < 10; i++ {
  189. writeAPI.WritePoint(points[i])
  190. }
  191. writeAPI.waitForFlushing()
  192. assert.Len(t, service.Lines(), 9)
  193. writeAPI.Close()
  194. }
  195. func TestClosing(t *testing.T) {
  196. service := test.NewTestService(t, "http://localhost:8888")
  197. log.Log.SetLogLevel(log.DebugLevel)
  198. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
  199. points := test.GenPoints(15)
  200. for i := 0; i < 5; i++ {
  201. writeAPI.WritePoint(points[i])
  202. }
  203. writeAPI.Close()
  204. require.Len(t, service.Lines(), 5)
  205. writeAPI = NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
  206. service.Close()
  207. service.SetReplyError(&http.Error{
  208. StatusCode: 425,
  209. })
  210. _ = writeAPI.Errors()
  211. for i := 0; i < 15; i++ {
  212. writeAPI.WritePoint(points[i])
  213. }
  214. start := time.Now()
  215. writeAPI.Close()
  216. diff := time.Since(start)
  217. fmt.Println("Diff", diff)
  218. assert.Len(t, service.Lines(), 0)
  219. }
  220. func TestFlushWithRetries(t *testing.T) {
  221. service := test.NewTestService(t, "http://localhost:8888")
  222. log.Log.SetLogLevel(log.DebugLevel)
  223. writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetRetryInterval(200).SetBatchSize(1))
  224. points := test.GenPoints(5)
  225. fails := 0
  226. var mu sync.Mutex
  227. service.SetRequestHandler(func(url string, body io.Reader) error {
  228. mu.Lock()
  229. defer mu.Unlock()
  230. // fail 4 times, then succeed on the 5th try - maxRetries default is 5
  231. if fails >= 4 {
  232. _ = service.DecodeLines(body)
  233. return nil
  234. }
  235. fails++
  236. return fmt.Errorf("spurious failure")
  237. })
  238. // write will try first batch and others will be put to the retry queue of retry delay caused by first write error
  239. for i := 0; i < len(points); i++ {
  240. writeAPI.WritePoint(points[i])
  241. }
  242. // Flush will try sending first batch again and then others
  243. // 1st, 2nd and 3rd will fail, because test service rejects 4 writes
  244. writeAPI.Flush()
  245. writeAPI.Close()
  246. // two remained
  247. assert.Equal(t, 2, len(service.Lines()))
  248. }