writeAPIBlocking_test.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "context"
  7. "net"
  8. "net/http"
  9. "strings"
  10. "sync"
  11. "testing"
  12. "time"
  13. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  14. "github.com/influxdata/influxdb-client-go/v2/api/write"
  15. "github.com/influxdata/influxdb-client-go/v2/internal/test"
  16. "github.com/stretchr/testify/assert"
  17. "github.com/stretchr/testify/require"
  18. )
  19. func TestWritePoint(t *testing.T) {
  20. service := test.NewTestService(t, "http://localhost:8888")
  21. opts := write.DefaultOptions().SetBatchSize(5)
  22. writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, opts)
  23. points := test.GenPoints(10)
  24. err := writeAPI.WritePoint(context.Background(), points...)
  25. require.Nil(t, err)
  26. require.Len(t, service.Lines(), 10)
  27. for i, p := range points {
  28. line := write.PointToLineProtocol(p, opts.Precision())
  29. //cut off last \n char
  30. line = line[:len(line)-1]
  31. assert.Equal(t, service.Lines()[i], line)
  32. }
  33. }
  34. func TestWriteRecord(t *testing.T) {
  35. service := test.NewTestService(t, "http://localhost:8888")
  36. writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  37. lines := test.GenRecords(10)
  38. for _, line := range lines {
  39. err := writeAPI.WriteRecord(context.Background(), line)
  40. require.Nil(t, err)
  41. }
  42. require.Len(t, service.Lines(), 10)
  43. require.Equal(t, 10, service.Requests())
  44. for i, l := range lines {
  45. assert.Equal(t, l, service.Lines()[i])
  46. }
  47. service.Close()
  48. err := writeAPI.WriteRecord(context.Background(), lines...)
  49. require.Nil(t, err)
  50. require.Equal(t, 1, service.Requests())
  51. for i, l := range lines {
  52. assert.Equal(t, l, service.Lines()[i])
  53. }
  54. service.Close()
  55. err = writeAPI.WriteRecord(context.Background())
  56. require.Nil(t, err)
  57. require.Len(t, service.Lines(), 0)
  58. service.SetReplyError(&http2.Error{Code: "invalid", Message: "data"})
  59. err = writeAPI.WriteRecord(context.Background(), lines...)
  60. require.NotNil(t, err)
  61. require.Equal(t, "invalid: data", err.Error())
  62. }
  63. func TestWriteRecordBatch(t *testing.T) {
  64. service := test.NewTestService(t, "http://localhost:8888")
  65. writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  66. lines := test.GenRecords(10)
  67. batch := strings.Join(lines, "\n")
  68. err := writeAPI.WriteRecord(context.Background(), batch)
  69. require.Nil(t, err)
  70. require.Len(t, service.Lines(), 10)
  71. for i, l := range lines {
  72. assert.Equal(t, l, service.Lines()[i])
  73. }
  74. service.Close()
  75. }
  76. func TestWriteParallel(t *testing.T) {
  77. service := test.NewTestService(t, "http://localhost:8888")
  78. writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  79. lines := test.GenRecords(1000)
  80. chanLine := make(chan string)
  81. var wg sync.WaitGroup
  82. for i := 0; i < 10; i++ {
  83. wg.Add(1)
  84. go func() {
  85. for l := range chanLine {
  86. err := writeAPI.WriteRecord(context.Background(), l)
  87. assert.Nil(t, err)
  88. }
  89. wg.Done()
  90. }()
  91. }
  92. for _, l := range lines {
  93. chanLine <- l
  94. }
  95. close(chanLine)
  96. wg.Wait()
  97. assert.Len(t, service.Lines(), len(lines))
  98. service.Close()
  99. }
  100. func TestWriteErrors(t *testing.T) {
  101. service := http2.NewService("http://locl:866", "", http2.DefaultOptions().SetHTTPClient(&http.Client{
  102. Timeout: 100 * time.Millisecond,
  103. Transport: &http.Transport{
  104. DialContext: (&net.Dialer{
  105. Timeout: 100 * time.Millisecond,
  106. }).DialContext,
  107. },
  108. }))
  109. writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  110. points := test.GenPoints(10)
  111. errors := 0
  112. for _, p := range points {
  113. err := writeAPI.WritePoint(context.Background(), p)
  114. if assert.Error(t, err) {
  115. errors++
  116. }
  117. }
  118. require.Equal(t, 10, errors)
  119. }
  120. func TestWriteBatchIng(t *testing.T) {
  121. service := test.NewTestService(t, "http://localhost:8888")
  122. writeAPI := NewWriteAPIBlockingWithBatching("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
  123. lines := test.GenRecords(10)
  124. for i, line := range lines {
  125. err := writeAPI.WriteRecord(context.Background(), line)
  126. require.Nil(t, err)
  127. if i == 4 || i == 9 {
  128. assert.Equal(t, 1, service.Requests())
  129. require.Len(t, service.Lines(), 5)
  130. service.Close()
  131. }
  132. }
  133. for i := 0; i < 4; i++ {
  134. err := writeAPI.WriteRecord(context.Background(), lines[i])
  135. require.Nil(t, err)
  136. }
  137. assert.Equal(t, 0, service.Requests())
  138. require.Len(t, service.Lines(), 0)
  139. err := writeAPI.Flush(context.Background())
  140. require.Nil(t, err)
  141. assert.Equal(t, 1, service.Requests())
  142. require.Len(t, service.Lines(), 4)
  143. }