123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532 |
- // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
- // Use of this source code is governed by MIT
- // license that can be found in the LICENSE file.
- package api
- import (
- "bytes"
- "compress/gzip"
- "context"
- "encoding/base64"
- "encoding/csv"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "net/url"
- "path"
- "reflect"
- "strconv"
- "strings"
- "sync"
- "time"
- http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
- "github.com/influxdata/influxdb-client-go/v2/api/query"
- "github.com/influxdata/influxdb-client-go/v2/domain"
- "github.com/influxdata/influxdb-client-go/v2/internal/log"
- ilog "github.com/influxdata/influxdb-client-go/v2/log"
- )
- const (
- stringDatatype = "string"
- doubleDatatype = "double"
- boolDatatype = "boolean"
- longDatatype = "long"
- uLongDatatype = "unsignedLong"
- durationDatatype = "duration"
- base64BinaryDataType = "base64Binary"
- timeDatatypeRFC = "dateTime:RFC3339"
- timeDatatypeRFCNano = "dateTime:RFC3339Nano"
- )
- // QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
- //
- // Flux query can contain reference to parameters, which must be passed via queryParams.
- // it can be a struct or map. Param values can be only simple types or time.Time.
- // The name of a struct field or a map key (must be a string) will be a param name.
- // The name of the parameter represented by a struct field can be specified by JSON annotation:
- //
- // type Condition struct {
- // Start time.Time `json:"start"`
- // Field string `json:"field"`
- // Value float64 `json:"value"`
- // }
- //
- // Parameters are then accessed via the Flux params object:
- //
- // query:= `from(bucket: "environment")
- // |> range(start: time(v: params.start))
- // |> filter(fn: (r) => r._measurement == "air")
- // |> filter(fn: (r) => r._field == params.field)
- // |> filter(fn: (r) => r._value > params.value)`
- //
- type QueryAPI interface {
- // QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
- QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
- // QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
- QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
- // Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
- Query(ctx context.Context, query string) (*QueryTableResult, error)
- // QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
- QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
- }
- // NewQueryAPI returns new query client for querying buckets belonging to org
- func NewQueryAPI(org string, service http2.Service) QueryAPI {
- return &queryAPI{
- org: org,
- httpService: service,
- }
- }
- // QueryTableResult parses streamed flux query response into structures representing flux table parts
- // Walking though the result is done by repeatedly calling Next() until returns false.
- // Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
- // Data are acquired by Record() method.
- // Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
- type QueryTableResult struct {
- io.Closer
- csvReader *csv.Reader
- tablePosition int
- tableChanged bool
- table *query.FluxTableMetadata
- record *query.FluxRecord
- err error
- }
- // NewQueryTableResult returns new QueryTableResult
- func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
- csvReader := csv.NewReader(rawResponse)
- csvReader.FieldsPerRecord = -1
- return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
- }
- // queryAPI implements QueryAPI interface
- type queryAPI struct {
- org string
- httpService http2.Service
- url string
- lock sync.Mutex
- }
- // queryBody holds the body for an HTTP query request.
- type queryBody struct {
- Dialect *domain.Dialect `json:"dialect,omitempty"`
- Query string `json:"query"`
- Type domain.QueryType `json:"type"`
- Params interface{} `json:"params,omitempty"`
- }
- func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
- return q.QueryRawWithParams(ctx, query, dialect, nil)
- }
- func (q *queryAPI) QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) {
- if err := checkParamsType(params); err != nil {
- return "", err
- }
- queryURL, err := q.queryURL()
- if err != nil {
- return "", err
- }
- qr := queryBody{
- Query: query,
- Type: domain.QueryTypeFlux,
- Dialect: dialect,
- Params: params,
- }
- qrJSON, err := json.Marshal(qr)
- if err != nil {
- return "", err
- }
- if log.Level() >= ilog.DebugLevel {
- log.Debugf("Query: %s", qrJSON)
- }
- var body string
- perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("Accept-Encoding", "gzip")
- },
- func(resp *http.Response) error {
- if resp.Header.Get("Content-Encoding") == "gzip" {
- resp.Body, err = gzip.NewReader(resp.Body)
- if err != nil {
- return err
- }
- }
- respBody, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- return err
- }
- body = string(respBody)
- return nil
- })
- if perror != nil {
- return "", perror
- }
- return body, nil
- }
- // DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter
- func DefaultDialect() *domain.Dialect {
- annotations := []domain.DialectAnnotations{domain.DialectAnnotationsDatatype, domain.DialectAnnotationsGroup, domain.DialectAnnotationsDefault}
- delimiter := ","
- header := true
- return &domain.Dialect{
- Annotations: &annotations,
- Delimiter: &delimiter,
- Header: &header,
- }
- }
- func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
- return q.QueryWithParams(ctx, query, nil)
- }
- func (q *queryAPI) QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) {
- var queryResult *QueryTableResult
- if err := checkParamsType(params); err != nil {
- return nil, err
- }
- queryURL, err := q.queryURL()
- if err != nil {
- return nil, err
- }
- qr := queryBody{
- Query: query,
- Type: domain.QueryTypeFlux,
- Dialect: DefaultDialect(),
- Params: params,
- }
- qrJSON, err := json.Marshal(qr)
- if err != nil {
- return nil, err
- }
- if log.Level() >= ilog.DebugLevel {
- log.Debugf("Query: %s", qrJSON)
- }
- perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
- req.Header.Set("Content-Type", "application/json")
- req.Header.Set("Accept-Encoding", "gzip")
- },
- func(resp *http.Response) error {
- if resp.Header.Get("Content-Encoding") == "gzip" {
- resp.Body, err = gzip.NewReader(resp.Body)
- if err != nil {
- return err
- }
- }
- csvReader := csv.NewReader(resp.Body)
- csvReader.FieldsPerRecord = -1
- queryResult = &QueryTableResult{Closer: resp.Body, csvReader: csvReader}
- return nil
- })
- if perror != nil {
- return queryResult, perror
- }
- return queryResult, nil
- }
- func (q *queryAPI) queryURL() (string, error) {
- if q.url == "" {
- u, err := url.Parse(q.httpService.ServerAPIURL())
- if err != nil {
- return "", err
- }
- u.Path = path.Join(u.Path, "query")
- params := u.Query()
- params.Set("org", q.org)
- u.RawQuery = params.Encode()
- q.lock.Lock()
- q.url = u.String()
- q.lock.Unlock()
- }
- return q.url, nil
- }
- // checkParamsType validates the value is struct with simple type fields
- // or a map with key as string and value as a simple type
- func checkParamsType(p interface{}) error {
- if p == nil {
- return nil
- }
- t := reflect.TypeOf(p)
- v := reflect.ValueOf(p)
- if t.Kind() == reflect.Ptr {
- t = t.Elem()
- v = v.Elem()
- }
- if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
- return fmt.Errorf("cannot use %v as query params", t)
- }
- switch t.Kind() {
- case reflect.Struct:
- fields := reflect.VisibleFields(t)
- for _, f := range fields {
- fv := v.FieldByIndex(f.Index)
- t := getFieldType(fv)
- if !validParamType(t) {
- return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
- }
- }
- case reflect.Map:
- key := t.Key()
- if key.Kind() != reflect.String {
- return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
- }
- for _, k := range v.MapKeys() {
- f := v.MapIndex(k)
- t := getFieldType(f)
- if !validParamType(t) {
- return fmt.Errorf("cannot use map value type '%v' as a query param", t)
- }
- }
- }
- return nil
- }
- // getFieldType extracts type of value
- func getFieldType(v reflect.Value) reflect.Type {
- t := v.Type()
- if t.Kind() == reflect.Ptr {
- t = t.Elem()
- v = v.Elem()
- }
- if t.Kind() == reflect.Interface && !v.IsNil() {
- t = reflect.ValueOf(v.Interface()).Type()
- }
- return t
- }
- // timeType is the exact type for the Time
- var timeType = reflect.TypeOf(time.Time{})
- // validParamType validates that t is primitive type or string or interface
- func validParamType(t reflect.Type) bool {
- return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
- t.Kind() == reflect.String ||
- t == timeType
- }
- // TablePosition returns actual flux table position in the result, or -1 if no table was found yet
- // Each new table is introduced by an annotation in csv
- func (q *QueryTableResult) TablePosition() int {
- if q.table != nil {
- return q.table.Position()
- }
- return -1
- }
- // TableMetadata returns actual flux table metadata
- func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata {
- return q.table
- }
- // TableChanged returns true if last call of Next() found also new result table
- // Table information is available via TableMetadata method
- func (q *QueryTableResult) TableChanged() bool {
- return q.tableChanged
- }
- // Record returns last parsed flux table data row
- // Use Record methods to access value and row properties
- func (q *QueryTableResult) Record() *query.FluxRecord {
- return q.record
- }
- type parsingState int
- const (
- parsingStateNormal parsingState = iota
- parsingStateAnnotation
- parsingStateNameRow
- parsingStateError
- )
- // Next advances to next row in query result.
- // During the first time it is called, Next creates also table metadata
- // Actual parsed row is available through Record() function
- // Returns false in case of end or an error, otherwise true
- func (q *QueryTableResult) Next() bool {
- var row []string
- // set closing query in case of preliminary return
- closer := func() {
- if err := q.Close(); err != nil {
- message := err.Error()
- if q.err != nil {
- message = fmt.Sprintf("%s,%s", message, q.err.Error())
- }
- q.err = errors.New(message)
- }
- }
- defer func() {
- closer()
- }()
- parsingState := parsingStateNormal
- q.tableChanged = false
- dataTypeAnnotationFound := false
- readRow:
- row, q.err = q.csvReader.Read()
- if q.err == io.EOF {
- q.err = nil
- return false
- }
- if q.err != nil {
- return false
- }
- if len(row) <= 1 {
- goto readRow
- }
- if len(row[0]) > 0 && row[0][0] == '#' {
- if parsingState == parsingStateNormal {
- q.table = query.NewFluxTableMetadata(q.tablePosition)
- q.tablePosition++
- q.tableChanged = true
- for i := range row[1:] {
- q.table.AddColumn(query.NewFluxColumn(i))
- }
- parsingState = parsingStateAnnotation
- }
- }
- if q.table == nil {
- q.err = errors.New("parsing error, annotations not found")
- return false
- }
- if len(row)-1 != len(q.table.Columns()) {
- q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns()))
- return false
- }
- switch row[0] {
- case "":
- switch parsingState {
- case parsingStateAnnotation:
- if !dataTypeAnnotationFound {
- q.err = errors.New("parsing error, datatype annotation not found")
- return false
- }
- parsingState = parsingStateNameRow
- fallthrough
- case parsingStateNameRow:
- if row[1] == "error" {
- parsingState = parsingStateError
- } else {
- for i, n := range row[1:] {
- if q.table.Column(i) != nil {
- q.table.Column(i).SetName(n)
- }
- }
- parsingState = parsingStateNormal
- }
- goto readRow
- case parsingStateError:
- var message string
- if len(row) > 1 && len(row[1]) > 0 {
- message = row[1]
- } else {
- message = "unknown query error"
- }
- reference := ""
- if len(row) > 2 && len(row[2]) > 0 {
- reference = fmt.Sprintf(",%s", row[2])
- }
- q.err = fmt.Errorf("%s%s", message, reference)
- return false
- }
- values := make(map[string]interface{})
- for i, v := range row[1:] {
- if q.table.Column(i) != nil {
- values[q.table.Column(i).Name()], q.err = toValue(stringTernary(v, q.table.Column(i).DefaultValue()), q.table.Column(i).DataType(), q.table.Column(i).Name())
- if q.err != nil {
- return false
- }
- }
- }
- q.record = query.NewFluxRecord(q.table.Position(), values)
- case "#datatype":
- dataTypeAnnotationFound = true
- for i, d := range row[1:] {
- if q.table.Column(i) != nil {
- q.table.Column(i).SetDataType(d)
- }
- }
- goto readRow
- case "#group":
- for i, g := range row[1:] {
- if q.table.Column(i) != nil {
- q.table.Column(i).SetGroup(g == "true")
- }
- }
- goto readRow
- case "#default":
- for i, c := range row[1:] {
- if q.table.Column(i) != nil {
- q.table.Column(i).SetDefaultValue(c)
- }
- }
- goto readRow
- }
- // don't close query
- closer = func() {}
- return true
- }
- // Err returns an error raised during flux query response parsing
- func (q *QueryTableResult) Err() error {
- return q.err
- }
- // Close reads remaining data and closes underlying Closer
- func (q *QueryTableResult) Close() error {
- var err error
- for err == nil {
- _, err = q.csvReader.Read()
- }
- return q.Closer.Close()
- }
- // stringTernary returns a if not empty, otherwise b
- func stringTernary(a, b string) string {
- if a == "" {
- return b
- }
- return a
- }
- // toValues converts s into type by t
- func toValue(s, t, name string) (interface{}, error) {
- if s == "" {
- return nil, nil
- }
- switch t {
- case stringDatatype:
- return s, nil
- case timeDatatypeRFC:
- return time.Parse(time.RFC3339, s)
- case timeDatatypeRFCNano:
- return time.Parse(time.RFC3339Nano, s)
- case durationDatatype:
- return time.ParseDuration(s)
- case doubleDatatype:
- return strconv.ParseFloat(s, 64)
- case boolDatatype:
- if strings.ToLower(s) == "false" {
- return false, nil
- }
- return true, nil
- case longDatatype:
- return strconv.ParseInt(s, 10, 64)
- case uLongDatatype:
- return strconv.ParseUint(s, 10, 64)
- case base64BinaryDataType:
- return base64.StdEncoding.DecodeString(s)
- default:
- return nil, fmt.Errorf("%s has unknown data type %s", name, t)
- }
- }
|