//go:build e2e // +build e2e // Copyright 2020-2021 InfluxData, Inc. All rights reserved. // Use of this source code is governed by MIT // license that can be found in the LICENSE file. package influxdb2_test import ( "context" "fmt" "os" "strconv" "strings" "sync" "testing" "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/domain" "github.com/influxdata/influxdb-client-go/v2/internal/test" "github.com/influxdata/influxdb-client-go/v2/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var authToken string var serverURL string var serverV1URL string var onboardingURL string func getEnvValue(key, defVal string) string { if val, ok := os.LookupEnv(key); ok { return val } else { return defVal } } func init() { authToken = getEnvValue("INFLUXDB2_TOKEN", "my-token") serverURL = getEnvValue("INFLUXDB2_URL", "http://localhost:8086") serverV1URL = getEnvValue("INFLUXDB_URL", "http://localhost:8087") onboardingURL = getEnvValue("INFLUXDB2_ONBOARDING_URL", "http://localhost:8089") } func TestSetup(t *testing.T) { client := influxdb2.NewClientWithOptions(onboardingURL, "", influxdb2.DefaultOptions().SetLogLevel(2)) response, err := client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 24) if err != nil { t.Error(err) } require.NotNil(t, response) require.NotNil(t, response.Auth) require.NotNil(t, response.Auth.Token) require.NotNil(t, response.Bucket) require.NotNil(t, response.Bucket.RetentionRules) require.Len(t, response.Bucket.RetentionRules, 1) assert.Equal(t, int64(24*3600), response.Bucket.RetentionRules[0].EverySeconds) _, err = client.Setup(context.Background(), "my-user", "my-password", "my-org", "my-bucket", 0) require.NotNil(t, err) assert.Equal(t, "conflict: onboarding has already been completed", err.Error()) } func TestReady(t *testing.T) { client := influxdb2.NewClient(serverURL, "") ready, err := client.Ready(context.Background()) require.NoError(t, err) require.NotNil(t, ready) require.NotNil(t, ready.Started) assert.True(t, ready.Started.Before(time.Now())) dur, err := time.ParseDuration(*ready.Up) require.NoError(t, err) assert.True(t, dur.Seconds() > 0) } func TestHealth(t *testing.T) { client := influxdb2.NewClient(serverURL, "") health, err := client.Health(context.Background()) if err != nil { t.Error(err) } require.NotNil(t, health) assert.Equal(t, domain.HealthCheckStatusPass, health.Status) } func TestPing(t *testing.T) { client := influxdb2.NewClient(serverURL, "") ok, err := client.Ping(context.Background()) require.NoError(t, err) assert.True(t, ok) } func TestWrite(t *testing.T) { client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(3)) writeAPI := client.WriteAPI("my-org", "my-bucket") errCh := writeAPI.Errors() errorsCount := 0 var wg sync.WaitGroup wg.Add(1) go func() { for err := range errCh { errorsCount++ fmt.Println("Error proc: write error: ", err.Error()) } fmt.Println("Error proc: finished ") wg.Done() }() timestamp := time.Now() for i, f := 0, 3.3; i < 10; i++ { writeAPI.WriteRecord(fmt.Sprintf("test,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano())) //writeAPI.Flush() f += 3.3 timestamp = timestamp.Add(time.Nanosecond) } for i, f := int64(10), 33.0; i < 20; i++ { p := influxdb2.NewPoint("test", map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, map[string]interface{}{"f": f, "i": i}, timestamp) writeAPI.WritePoint(p) f += 3.3 timestamp = timestamp.Add(time.Nanosecond) } err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test"). AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) assert.NoError(t, err) client.Close() wg.Wait() assert.Equal(t, 0, errorsCount) } func TestQueryRaw(t *testing.T) { client := influxdb2.NewClient(serverURL, authToken) queryAPI := client.QueryAPI("my-org") res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect()) if err != nil { t.Error(err) } else { fmt.Println("QueryResult:") fmt.Println(res) } } func TestQuery(t *testing.T) { client := influxdb2.NewClient(serverURL, authToken) queryAPI := client.QueryAPI("my-org") fmt.Println("QueryResult") result, err := queryAPI.Query(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`) if err != nil { t.Error(err) } else { for result.Next() { if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } fmt.Printf("row: %sv\n", result.Record().String()) } if result.Err() != nil { t.Error(result.Err()) } } } func TestPingV1(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") ok, err := client.Ping(context.Background()) require.NoError(t, err) assert.True(t, ok) } func TestHealthV1Compatibility(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") health, err := client.Health(context.Background()) if err != nil { t.Error(err) } require.NotNil(t, health) assert.Equal(t, domain.HealthCheckStatusPass, health.Status) } func TestWriteV1Compatibility(t *testing.T) { client := influxdb2.NewClientWithOptions(serverV1URL, "", influxdb2.DefaultOptions().SetLogLevel(log.DebugLevel)) writeAPI := client.WriteAPI("", "mydb/autogen") errCh := writeAPI.Errors() errorsCount := 0 var wg sync.WaitGroup wg.Add(1) go func() { for err := range errCh { errorsCount++ fmt.Println("Error proc: write error: ", err.Error()) } wg.Done() }() timestamp := time.Now() for i, f := 0, 3.3; i < 10; i++ { writeAPI.WriteRecord(fmt.Sprintf("testv1,a=%d,b=local f=%.2f,i=%di %d", i%2, f, i, timestamp.UnixNano())) //writeAPI.Flush() f += 3.3 timestamp = timestamp.Add(time.Nanosecond) } for i, f := int64(10), 33.0; i < 20; i++ { p := influxdb2.NewPoint("testv1", map[string]string{"a": strconv.FormatInt(i%2, 10), "b": "static"}, map[string]interface{}{"f": f, "i": i}, timestamp) writeAPI.WritePoint(p) f += 3.3 timestamp = timestamp.Add(time.Nanosecond) } err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1"). AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) assert.NoError(t, err) client.Close() wg.Wait() assert.Equal(t, 0, errorsCount) } func TestQueryRawV1Compatibility(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") queryAPI := client.QueryAPI("") res, err := queryAPI.QueryRaw(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`, influxdb2.DefaultDialect()) if err != nil { t.Error(err) } else { fmt.Println("QueryResult:") fmt.Println(res) } } func TestQueryV1Compatibility(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") queryAPI := client.QueryAPI("") fmt.Println("QueryResult") result, err := queryAPI.Query(context.Background(), `from(bucket:"mydb/autogen")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "testv1")`) if err != nil { t.Error(err) } else { rows := 0 for result.Next() { rows++ if result.TableChanged() { fmt.Printf("table: %s\n", result.TableMetadata().String()) } fmt.Printf("row: %sv\n", result.Record().String()) } if result.Err() != nil { t.Error(result.Err()) } assert.True(t, rows > 0) } } func TestV2APIAgainstV1Server(t *testing.T) { client := influxdb2.NewClient(serverV1URL, "") ctx := context.Background() _, err := client.AuthorizationsAPI().GetAuthorizations(ctx) require.Error(t, err) _, err = client.UsersAPI().GetUsers(ctx) require.Error(t, err) _, err = client.OrganizationsAPI().GetOrganizations(ctx) require.Error(t, err) _, err = client.TasksAPI().FindTasks(ctx, nil) require.Error(t, err) _, err = client.LabelsAPI().GetLabels(ctx) require.Error(t, err) _, err = client.BucketsAPI().GetBuckets(ctx) require.Error(t, err) err = client.DeleteAPI().DeleteWithName(ctx, "org", "bucket", time.Now(), time.Now(), "") require.Error(t, err) } func TestHTTPService(t *testing.T) { client := influxdb2.NewClient(serverURL, authToken) apiClient := client.APIClient() org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org") if err != nil { //return err t.Fatal(err) } taskDescription := "Example task" taskFlux := `option task = { name: "My task", every: 1h } from(bucket:"my-bucket") |> range(start: -1m) |> last()` taskStatus := domain.TaskStatusTypeActive taskRequest := domain.TaskCreateRequest{ Org: &org.Name, OrgID: org.Id, Description: &taskDescription, Flux: taskFlux, Status: &taskStatus, } params := &domain.PostTasksAllParams{ Body: domain.PostTasksJSONRequestBody(taskRequest), } resp, err := apiClient.PostTasks(context.Background(), params) if err != nil { //return err t.Error(err) } if assert.NotNil(t, resp) { assert.Equal(t, "My task", resp.Name) deleteParams := &domain.DeleteTasksIDAllParams{ TaskID: resp.Id, } err := apiClient.DeleteTasksID(context.Background(), deleteParams) if err != nil { //return err t.Error(err) } } } func TestLogsConcurrent(t *testing.T) { var wg sync.WaitGroup w := func(loc string, temp float32) { client1 := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(log.ErrorLevel)) for i := 0; i < 10000; i++ { client1.WriteAPI("my-org", "my-bucket").WriteRecord(fmt.Sprintf("room,location=%s temp=%f", loc, temp)) } client1.Close() wg.Done() } for i := 0; i < 2; i++ { wg.Add(1) go w(fmt.Sprintf("T%d", i), 23.3+float32(i)) <-time.After(time.Nanosecond) } wg.Wait() } func TestWriteCustomBatch(t *testing.T) { client := influxdb2.NewClientWithOptions(serverURL, authToken, influxdb2.DefaultOptions().SetLogLevel(0)) now := time.Now() lines := test.GenRecords(10) err := client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(context.Background(), strings.Join(lines, "\n")) assert.NoError(t, err) result, err := client.QueryAPI("my-org").Query(context.Background(), fmt.Sprintf(`from(bucket:"my-bucket")|> range(start: %s) |> filter(fn: (r) => r._measurement == "test") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")`, now.Format(time.RFC3339Nano))) assert.NoError(t, err) l := 0 for result.Next() { l++ } assert.Equal(t, 10, l) }