client_e2e_test.go 10 KB


  1. //go:build e2e
  2. // +build e2e
  3. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  4. // Use of this source code is governed by MIT
  5. // license that can be found in the LICENSE file.
  6. package influxdb2_test
  7. import (
  8. "context"
  9. "fmt"
  10. "os"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "testing"
  15. "time"
  16. influxdb2 "github.com/influxdata/influxdb-client-go/v2"
  17. "github.com/influxdata/influxdb-client-go/v2/domain"
  18. "github.com/influxdata/influxdb-client-go/v2/internal/test"
  19. "github.com/influxdata/influxdb-client-go/v2/log"
  20. "github.com/stretchr/testify/assert"
  21. "github.com/stretchr/testify/require"
  22. )
  23. var authToken string
  24. var serverURL string
  25. var serverV1URL string
  26. var onboardingURL string
  27. func getEnvValue(key, defVal string) string {
  28. if val, ok := os.LookupEnv(key); ok {
  29. return val
  30. } else {
  31. return defVal
  32. }
  33. }
  34. func init() {
  35. authToken = getEnvValue("INFLUXDB2_TOKEN", "my-token")
  36. serverURL = getEnvValue("INFLUXDB2_URL", "http://localhost:8086")
  37. serverV1URL = getEnvValue("INFLUXDB_URL", "http://localhost:8087")
  38. onboardingURL = getEnvValue("INFLUXDB2_ONBOARDING_URL", "http://localhost:8089")
  39. }
  40. func TestSetup(t *testing.T) {
  41. client := influxdb2.NewClientWithOptions(onboardingURL, "", influxdb2.DefaultOptions().SetLogLevel(2))
  42. response, err := client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 24)
  43. if err != nil {
  44. t.Error(err)
  45. }
  46. require.NotNil(t, response)
  47. require.NotNil(t, response.Auth)
  48. require.NotNil(t, response.Auth.Token)
  49. require.NotNil(t, response.Bucket)
  50. require.NotNil(t, response.Bucket.RetentionRules)
  51. require.Len(t, response.Bucket.RetentionRules, 1)
  52. assert.Equal(t, int64(24*3600), response.Bucket.RetentionRules[0].EverySeconds)
  53. _, err = client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 0)
  54. require.NotNil(t, err)
  55. assert.Equal(t, "conflict: onboarding has already been completed", err.Error())
  56. }
  57. func TestReady(t *testing.T) {
  58. client := influxdb2.NewClient(serverURL, "")
  59. ready, err := client.Ready(context.Background())
  60. require.NoError(t, err)
  61. require.NotNil(t, ready)
  62. require.NotNil(t, ready.Started)
  63. assert.True(t, ready.Started.Before(time.Now()))
  64. dur, err := time.ParseDuration(*ready.Up)
  65. require.NoError(t, err)
  66. assert.True(t, dur.Seconds() > 0)
  67. }
  68. func TestHealth(t *testing.T) {
  69. client := influxdb2.NewClient(serverURL, "")
  70. health, err := client.Health(context.Background())
  71. if err != nil {
  72. t.Error(err)
  73. }
  74. require.NotNil(t, health)
  75. assert.Equal(t, domain.HealthCheckStatusPass, health.Status)
  76. }
  77. func TestPing(t *testing.T) {
  78. client := influxdb2.NewClient(serverURL, "")
  79. ok, err := client.Ping(context.Background())
  80. require.NoError(t, err)
  81. assert.True(t, ok)
  82. }
  83. func TestWrite(t *testing.T) {
  84. client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(3))
  85. writeAPI := client.WriteAPI("my-org", "my-bucket")
  86. errCh := writeAPI.Errors()
  87. errorsCount := 0
  88. var wg sync.WaitGroup
  89. wg.Add(1)
  90. go func() {
  91. for err := range errCh {
  92. errorsCount++
  93. fmt.Println("Error proc: write error: ", err.Error())
  94. }
  95. fmt.Println("Error proc: finished ")
  96. wg.Done()
  97. }()
  98. timestamp := time.Now()
  99. for i, f := 0, 3.3; i < 10; i++ {
  100. writeAPI.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano()))
  101. //writeAPI.Flush()
  102. f += 3.3
  103. timestamp = timestamp.Add(time.Nanosecond)
  104. }
  105. for i, f := int64(10), 33.0; i < 20; i++ {
  106. p := influxdb2.NewPoint("test",
  107. map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"},
  108. map[string]interface{}{"f": f, "i": i},
  109. timestamp)
  110. writeAPI.WritePoint(p)
  111. f += 3.3
  112. timestamp = timestamp.Add(time.Nanosecond)
  113. }
  114. err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test").
  115. AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
  116. assert.NoError(t, err)
  117. client.Close()
  118. wg.Wait()
  119. assert.Equal(t, 0, errorsCount)
  120. }
  121. func TestQueryRaw(t *testing.T) {
  122. client := influxdb2.NewClient(serverURL, authToken)
  123. queryAPI := client.QueryAPI("my-org")
  124. res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect())
  125. if err != nil {
  126. t.Error(err)
  127. } else {
  128. fmt.Println("QueryResult:")
  129. fmt.Println(res)
  130. }
  131. }
  132. func TestQuery(t *testing.T) {
  133. client := influxdb2.NewClient(serverURL, authToken)
  134. queryAPI := client.QueryAPI("my-org")
  135. fmt.Println("QueryResult")
  136. result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`)
  137. if err != nil {
  138. t.Error(err)
  139. } else {
  140. for result.Next() {
  141. if result.TableChanged() {
  142. fmt.Printf("table: %s\n", result.TableMetadata().String())
  143. }
  144. fmt.Printf("row: %sv\n", result.Record().String())
  145. }
  146. if result.Err() != nil {
  147. t.Error(result.Err())
  148. }
  149. }
  150. }
  151. func TestPingV1(t *testing.T) {
  152. client := influxdb2.NewClient(serverV1URL, "")
  153. ok, err := client.Ping(context.Background())
  154. require.NoError(t, err)
  155. assert.True(t, ok)
  156. }
  157. func TestHealthV1Compatibility(t *testing.T) {
  158. client := influxdb2.NewClient(serverV1URL, "")
  159. health, err := client.Health(context.Background())
  160. if err != nil {
  161. t.Error(err)
  162. }
  163. require.NotNil(t, health)
  164. assert.Equal(t, domain.HealthCheckStatusPass, health.Status)
  165. }
  166. func TestWriteV1Compatibility(t *testing.T) {
  167. client := influxdb2.NewClientWithOptions(serverV1URL, "", influxdb2.DefaultOptions().SetLogLevel(log.DebugLevel))
  168. writeAPI := client.WriteAPI("", "mydb/autogen")
  169. errCh := writeAPI.Errors()
  170. errorsCount := 0
  171. var wg sync.WaitGroup
  172. wg.Add(1)
  173. go func() {
  174. for err := range errCh {
  175. errorsCount++
  176. fmt.Println("Error proc: write error: ", err.Error())
  177. }
  178. wg.Done()
  179. }()
  180. timestamp := time.Now()
  181. for i, f := 0, 3.3; i < 10; i++ {
  182. writeAPI.WriteRecord(fmt.Sprintf("testv1,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano()))
  183. //writeAPI.Flush()
  184. f += 3.3
  185. timestamp = timestamp.Add(time.Nanosecond)
  186. }
  187. for i, f := int64(10), 33.0; i < 20; i++ {
  188. p := influxdb2.NewPoint("testv1",
  189. map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"},
  190. map[string]interface{}{"f": f, "i": i},
  191. timestamp)
  192. writeAPI.WritePoint(p)
  193. f += 3.3
  194. timestamp = timestamp.Add(time.Nanosecond)
  195. }
  196. err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1").
  197. AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
  198. assert.NoError(t, err)
  199. client.Close()
  200. wg.Wait()
  201. assert.Equal(t, 0, errorsCount)
  202. }
  203. func TestQueryRawV1Compatibility(t *testing.T) {
  204. client := influxdb2.NewClient(serverV1URL, "")
  205. queryAPI := client.QueryAPI("")
  206. res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`, influxdb2.DefaultDialect())
  207. if err != nil {
  208. t.Error(err)
  209. } else {
  210. fmt.Println("QueryResult:")
  211. fmt.Println(res)
  212. }
  213. }
  214. func TestQueryV1Compatibility(t *testing.T) {
  215. client := influxdb2.NewClient(serverV1URL, "")
  216. queryAPI := client.QueryAPI("")
  217. fmt.Println("QueryResult")
  218. result, err := queryAPI.Query(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`)
  219. if err != nil {
  220. t.Error(err)
  221. } else {
  222. rows := 0
  223. for result.Next() {
  224. rows++
  225. if result.TableChanged() {
  226. fmt.Printf("table: %s\n", result.TableMetadata().String())
  227. }
  228. fmt.Printf("row: %sv\n", result.Record().String())
  229. }
  230. if result.Err() != nil {
  231. t.Error(result.Err())
  232. }
  233. assert.True(t, rows > 0)
  234. }
  235. }
  236. func TestV2APIAgainstV1Server(t *testing.T) {
  237. client := influxdb2.NewClient(serverV1URL, "")
  238. ctx := context.Background()
  239. _, err := client.AuthorizationsAPI().GetAuthorizations(ctx)
  240. require.Error(t, err)
  241. _, err = client.UsersAPI().GetUsers(ctx)
  242. require.Error(t, err)
  243. _, err = client.OrganizationsAPI().GetOrganizations(ctx)
  244. require.Error(t, err)
  245. _, err = client.TasksAPI().FindTasks(ctx, nil)
  246. require.Error(t, err)
  247. _, err = client.LabelsAPI().GetLabels(ctx)
  248. require.Error(t, err)
  249. _, err = client.BucketsAPI().GetBuckets(ctx)
  250. require.Error(t, err)
  251. err = client.DeleteAPI().DeleteWithName(ctx, "org", "bucket", time.Now(), time.Now(), "")
  252. require.Error(t, err)
  253. }
  254. func TestHTTPService(t *testing.T) {
  255. client := influxdb2.NewClient(serverURL, authToken)
  256. apiClient := client.APIClient()
  257. org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
  258. if err != nil {
  259. //return err
  260. t.Fatal(err)
  261. }
  262. taskDescription := "Example task"
  263. taskFlux := `option task = {
  264. name: "My task",
  265. every: 1h
  266. }
  267. from(bucket:"my-bucket") |> range(start: -1m) |> last()`
  268. taskStatus := domain.TaskStatusTypeActive
  269. taskRequest := domain.TaskCreateRequest{
  270. Org: &org.Name,
  271. OrgID: org.Id,
  272. Description: &taskDescription,
  273. Flux: taskFlux,
  274. Status: &taskStatus,
  275. }
  276. params := &domain.PostTasksAllParams{
  277. Body: domain.PostTasksJSONRequestBody(taskRequest),
  278. }
  279. resp, err := apiClient.PostTasks(context.Background(), params)
  280. if err != nil {
  281. //return err
  282. t.Error(err)
  283. }
  284. if assert.NotNil(t, resp) {
  285. assert.Equal(t, "My task", resp.Name)
  286. deleteParams := &domain.DeleteTasksIDAllParams{
  287. TaskID: resp.Id,
  288. }
  289. err := apiClient.DeleteTasksID(context.Background(), deleteParams)
  290. if err != nil {
  291. //return err
  292. t.Error(err)
  293. }
  294. }
  295. }
  296. func TestLogsConcurrent(t *testing.T) {
  297. var wg sync.WaitGroup
  298. w := func(loc string, temp float32) {
  299. client1 := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(log.ErrorLevel))
  300. for i := 0; i < 10000; i++ {
  301. client1.WriteAPI("my-org", "my-bucket").WriteRecord(fmt.Sprintf("room,location=%s temp=%f", loc, temp))
  302. }
  303. client1.Close()
  304. wg.Done()
  305. }
  306. for i := 0; i < 2; i++ {
  307. wg.Add(1)
  308. go w(fmt.Sprintf("T%d", i), 23.3+float32(i))
  309. <-time.After(time.Nanosecond)
  310. }
  311. wg.Wait()
  312. }
  313. func TestWriteCustomBatch(t *testing.T) {
  314. client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(0))
  315. now := time.Now()
  316. lines := test.GenRecords(10)
  317. err := client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(context.Background(), strings.Join(lines, "\n"))
  318. assert.NoError(t, err)
  319. result, err := client.QueryAPI("my-org").Query(context.Background(), fmt.Sprintf(`from(bucket:"my-bucket")|> range(start: %s) |> filter(fn: (r) => r._measurement == "test") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")`, now.Format(time.RFC3339Nano)))
  320. assert.NoError(t, err)
  321. l := 0
  322. for result.Next() {
  323. l++
  324. }
  325. assert.Equal(t, 10, l)
  326. }