query.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "bytes"
  7. "compress/gzip"
  8. "context"
  9. "encoding/base64"
  10. "encoding/csv"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "net/http"
  17. "net/url"
  18. "path"
  19. "reflect"
  20. "strconv"
  21. "strings"
  22. "sync"
  23. "time"
  24. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  25. "github.com/influxdata/influxdb-client-go/v2/api/query"
  26. "github.com/influxdata/influxdb-client-go/v2/domain"
  27. "github.com/influxdata/influxdb-client-go/v2/internal/log"
  28. ilog "github.com/influxdata/influxdb-client-go/v2/log"
  29. )
  30. const (
  31. stringDatatype = "string"
  32. doubleDatatype = "double"
  33. boolDatatype = "boolean"
  34. longDatatype = "long"
  35. uLongDatatype = "unsignedLong"
  36. durationDatatype = "duration"
  37. base64BinaryDataType = "base64Binary"
  38. timeDatatypeRFC = "dateTime:RFC3339"
  39. timeDatatypeRFCNano = "dateTime:RFC3339Nano"
  40. )
  41. // QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
  42. //
  43. // Flux query can contain reference to parameters, which must be passed via queryParams.
  44. // it can be a struct or map. Param values can be only simple types or time.Time.
  45. // The name of a struct field or a map key (must be a string) will be a param name.
  46. // The name of the parameter represented by a struct field can be specified by JSON annotation:
  47. //
  48. // type Condition struct {
  49. // Start time.Time `json:"start"`
  50. // Field string `json:"field"`
  51. // Value float64 `json:"value"`
  52. // }
  53. //
  54. // Parameters are then accessed via the Flux params object:
  55. //
  56. // query:= `from(bucket: "environment")
  57. // |> range(start: time(v: params.start))
  58. // |> filter(fn: (r) => r._measurement == "air")
  59. // |> filter(fn: (r) => r._field == params.field)
  60. // |> filter(fn: (r) => r._value > params.value)`
  61. //
  62. type QueryAPI interface {
  63. // QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
  64. QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
  65. // QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
  66. QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
  67. // Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
  68. Query(ctx context.Context, query string) (*QueryTableResult, error)
  69. // QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
  70. QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
  71. }
  72. // NewQueryAPI returns new query client for querying buckets belonging to org
  73. func NewQueryAPI(org string, service http2.Service) QueryAPI {
  74. return &queryAPI{
  75. org: org,
  76. httpService: service,
  77. }
  78. }
  79. // QueryTableResult parses streamed flux query response into structures representing flux table parts
  80. // Walking though the result is done by repeatedly calling Next() until returns false.
  81. // Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
  82. // Data are acquired by Record() method.
  83. // Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
  84. type QueryTableResult struct {
  85. io.Closer
  86. csvReader *csv.Reader
  87. tablePosition int
  88. tableChanged bool
  89. table *query.FluxTableMetadata
  90. record *query.FluxRecord
  91. err error
  92. }
  93. // NewQueryTableResult returns new QueryTableResult
  94. func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
  95. csvReader := csv.NewReader(rawResponse)
  96. csvReader.FieldsPerRecord = -1
  97. return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
  98. }
  99. // queryAPI implements QueryAPI interface
  100. type queryAPI struct {
  101. org string
  102. httpService http2.Service
  103. url string
  104. lock sync.Mutex
  105. }
  106. // queryBody holds the body for an HTTP query request.
  107. type queryBody struct {
  108. Dialect *domain.Dialect `json:"dialect,omitempty"`
  109. Query string `json:"query"`
  110. Type domain.QueryType `json:"type"`
  111. Params interface{} `json:"params,omitempty"`
  112. }
  113. func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
  114. return q.QueryRawWithParams(ctx, query, dialect, nil)
  115. }
  116. func (q *queryAPI) QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) {
  117. if err := checkParamsType(params); err != nil {
  118. return "", err
  119. }
  120. queryURL, err := q.queryURL()
  121. if err != nil {
  122. return "", err
  123. }
  124. qr := queryBody{
  125. Query: query,
  126. Type: domain.QueryTypeFlux,
  127. Dialect: dialect,
  128. Params: params,
  129. }
  130. qrJSON, err := json.Marshal(qr)
  131. if err != nil {
  132. return "", err
  133. }
  134. if log.Level() >= ilog.DebugLevel {
  135. log.Debugf("Query: %s", qrJSON)
  136. }
  137. var body string
  138. perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
  139. req.Header.Set("Content-Type", "application/json")
  140. req.Header.Set("Accept-Encoding", "gzip")
  141. },
  142. func(resp *http.Response) error {
  143. if resp.Header.Get("Content-Encoding") == "gzip" {
  144. resp.Body, err = gzip.NewReader(resp.Body)
  145. if err != nil {
  146. return err
  147. }
  148. }
  149. respBody, err := ioutil.ReadAll(resp.Body)
  150. if err != nil {
  151. return err
  152. }
  153. body = string(respBody)
  154. return nil
  155. })
  156. if perror != nil {
  157. return "", perror
  158. }
  159. return body, nil
  160. }
  161. // DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter
  162. func DefaultDialect() *domain.Dialect {
  163. annotations := []domain.DialectAnnotations{domain.DialectAnnotationsDatatype, domain.DialectAnnotationsGroup, domain.DialectAnnotationsDefault}
  164. delimiter := ","
  165. header := true
  166. return &domain.Dialect{
  167. Annotations: &annotations,
  168. Delimiter: &delimiter,
  169. Header: &header,
  170. }
  171. }
  172. func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
  173. return q.QueryWithParams(ctx, query, nil)
  174. }
  175. func (q *queryAPI) QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) {
  176. var queryResult *QueryTableResult
  177. if err := checkParamsType(params); err != nil {
  178. return nil, err
  179. }
  180. queryURL, err := q.queryURL()
  181. if err != nil {
  182. return nil, err
  183. }
  184. qr := queryBody{
  185. Query: query,
  186. Type: domain.QueryTypeFlux,
  187. Dialect: DefaultDialect(),
  188. Params: params,
  189. }
  190. qrJSON, err := json.Marshal(qr)
  191. if err != nil {
  192. return nil, err
  193. }
  194. if log.Level() >= ilog.DebugLevel {
  195. log.Debugf("Query: %s", qrJSON)
  196. }
  197. perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
  198. req.Header.Set("Content-Type", "application/json")
  199. req.Header.Set("Accept-Encoding", "gzip")
  200. },
  201. func(resp *http.Response) error {
  202. if resp.Header.Get("Content-Encoding") == "gzip" {
  203. resp.Body, err = gzip.NewReader(resp.Body)
  204. if err != nil {
  205. return err
  206. }
  207. }
  208. csvReader := csv.NewReader(resp.Body)
  209. csvReader.FieldsPerRecord = -1
  210. queryResult = &QueryTableResult{Closer: resp.Body, csvReader: csvReader}
  211. return nil
  212. })
  213. if perror != nil {
  214. return queryResult, perror
  215. }
  216. return queryResult, nil
  217. }
  218. func (q *queryAPI) queryURL() (string, error) {
  219. if q.url == "" {
  220. u, err := url.Parse(q.httpService.ServerAPIURL())
  221. if err != nil {
  222. return "", err
  223. }
  224. u.Path = path.Join(u.Path, "query")
  225. params := u.Query()
  226. params.Set("org", q.org)
  227. u.RawQuery = params.Encode()
  228. q.lock.Lock()
  229. q.url = u.String()
  230. q.lock.Unlock()
  231. }
  232. return q.url, nil
  233. }
  234. // checkParamsType validates the value is struct with simple type fields
  235. // or a map with key as string and value as a simple type
  236. func checkParamsType(p interface{}) error {
  237. if p == nil {
  238. return nil
  239. }
  240. t := reflect.TypeOf(p)
  241. v := reflect.ValueOf(p)
  242. if t.Kind() == reflect.Ptr {
  243. t = t.Elem()
  244. v = v.Elem()
  245. }
  246. if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
  247. return fmt.Errorf("cannot use %v as query params", t)
  248. }
  249. switch t.Kind() {
  250. case reflect.Struct:
  251. fields := reflect.VisibleFields(t)
  252. for _, f := range fields {
  253. fv := v.FieldByIndex(f.Index)
  254. t := getFieldType(fv)
  255. if !validParamType(t) {
  256. return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
  257. }
  258. }
  259. case reflect.Map:
  260. key := t.Key()
  261. if key.Kind() != reflect.String {
  262. return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
  263. }
  264. for _, k := range v.MapKeys() {
  265. f := v.MapIndex(k)
  266. t := getFieldType(f)
  267. if !validParamType(t) {
  268. return fmt.Errorf("cannot use map value type '%v' as a query param", t)
  269. }
  270. }
  271. }
  272. return nil
  273. }
  274. // getFieldType extracts type of value
  275. func getFieldType(v reflect.Value) reflect.Type {
  276. t := v.Type()
  277. if t.Kind() == reflect.Ptr {
  278. t = t.Elem()
  279. v = v.Elem()
  280. }
  281. if t.Kind() == reflect.Interface && !v.IsNil() {
  282. t = reflect.ValueOf(v.Interface()).Type()
  283. }
  284. return t
  285. }
  286. // timeType is the exact type for the Time
  287. var timeType = reflect.TypeOf(time.Time{})
  288. // validParamType validates that t is primitive type or string or interface
  289. func validParamType(t reflect.Type) bool {
  290. return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
  291. t.Kind() == reflect.String ||
  292. t == timeType
  293. }
  294. // TablePosition returns actual flux table position in the result, or -1 if no table was found yet
  295. // Each new table is introduced by an annotation in csv
  296. func (q *QueryTableResult) TablePosition() int {
  297. if q.table != nil {
  298. return q.table.Position()
  299. }
  300. return -1
  301. }
  302. // TableMetadata returns actual flux table metadata
  303. func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata {
  304. return q.table
  305. }
  306. // TableChanged returns true if last call of Next() found also new result table
  307. // Table information is available via TableMetadata method
  308. func (q *QueryTableResult) TableChanged() bool {
  309. return q.tableChanged
  310. }
  311. // Record returns last parsed flux table data row
  312. // Use Record methods to access value and row properties
  313. func (q *QueryTableResult) Record() *query.FluxRecord {
  314. return q.record
  315. }
  316. type parsingState int
  317. const (
  318. parsingStateNormal parsingState = iota
  319. parsingStateAnnotation
  320. parsingStateNameRow
  321. parsingStateError
  322. )
  323. // Next advances to next row in query result.
  324. // During the first time it is called, Next creates also table metadata
  325. // Actual parsed row is available through Record() function
  326. // Returns false in case of end or an error, otherwise true
  327. func (q *QueryTableResult) Next() bool {
  328. var row []string
  329. // set closing query in case of preliminary return
  330. closer := func() {
  331. if err := q.Close(); err != nil {
  332. message := err.Error()
  333. if q.err != nil {
  334. message = fmt.Sprintf("%s,%s", message, q.err.Error())
  335. }
  336. q.err = errors.New(message)
  337. }
  338. }
  339. defer func() {
  340. closer()
  341. }()
  342. parsingState := parsingStateNormal
  343. q.tableChanged = false
  344. dataTypeAnnotationFound := false
  345. readRow:
  346. row, q.err = q.csvReader.Read()
  347. if q.err == io.EOF {
  348. q.err = nil
  349. return false
  350. }
  351. if q.err != nil {
  352. return false
  353. }
  354. if len(row) <= 1 {
  355. goto readRow
  356. }
  357. if len(row[0]) > 0 && row[0][0] == '#' {
  358. if parsingState == parsingStateNormal {
  359. q.table = query.NewFluxTableMetadata(q.tablePosition)
  360. q.tablePosition++
  361. q.tableChanged = true
  362. for i := range row[1:] {
  363. q.table.AddColumn(query.NewFluxColumn(i))
  364. }
  365. parsingState = parsingStateAnnotation
  366. }
  367. }
  368. if q.table == nil {
  369. q.err = errors.New("parsing error, annotations not found")
  370. return false
  371. }
  372. if len(row)-1 != len(q.table.Columns()) {
  373. q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns()))
  374. return false
  375. }
  376. switch row[0] {
  377. case "":
  378. switch parsingState {
  379. case parsingStateAnnotation:
  380. if !dataTypeAnnotationFound {
  381. q.err = errors.New("parsing error, datatype annotation not found")
  382. return false
  383. }
  384. parsingState = parsingStateNameRow
  385. fallthrough
  386. case parsingStateNameRow:
  387. if row[1] == "error" {
  388. parsingState = parsingStateError
  389. } else {
  390. for i, n := range row[1:] {
  391. if q.table.Column(i) != nil {
  392. q.table.Column(i).SetName(n)
  393. }
  394. }
  395. parsingState = parsingStateNormal
  396. }
  397. goto readRow
  398. case parsingStateError:
  399. var message string
  400. if len(row) > 1 && len(row[1]) > 0 {
  401. message = row[1]
  402. } else {
  403. message = "unknown query error"
  404. }
  405. reference := ""
  406. if len(row) > 2 && len(row[2]) > 0 {
  407. reference = fmt.Sprintf(",%s", row[2])
  408. }
  409. q.err = fmt.Errorf("%s%s", message, reference)
  410. return false
  411. }
  412. values := make(map[string]interface{})
  413. for i, v := range row[1:] {
  414. if q.table.Column(i) != nil {
  415. values[q.table.Column(i).Name()], q.err = toValue(stringTernary(v, q.table.Column(i).DefaultValue()), q.table.Column(i).DataType(), q.table.Column(i).Name())
  416. if q.err != nil {
  417. return false
  418. }
  419. }
  420. }
  421. q.record = query.NewFluxRecord(q.table.Position(), values)
  422. case "#datatype":
  423. dataTypeAnnotationFound = true
  424. for i, d := range row[1:] {
  425. if q.table.Column(i) != nil {
  426. q.table.Column(i).SetDataType(d)
  427. }
  428. }
  429. goto readRow
  430. case "#group":
  431. for i, g := range row[1:] {
  432. if q.table.Column(i) != nil {
  433. q.table.Column(i).SetGroup(g == "true")
  434. }
  435. }
  436. goto readRow
  437. case "#default":
  438. for i, c := range row[1:] {
  439. if q.table.Column(i) != nil {
  440. q.table.Column(i).SetDefaultValue(c)
  441. }
  442. }
  443. goto readRow
  444. }
  445. // don't close query
  446. closer = func() {}
  447. return true
  448. }
  449. // Err returns an error raised during flux query response parsing
  450. func (q *QueryTableResult) Err() error {
  451. return q.err
  452. }
  453. // Close reads remaining data and closes underlying Closer
  454. func (q *QueryTableResult) Close() error {
  455. var err error
  456. for err == nil {
  457. _, err = q.csvReader.Read()
  458. }
  459. return q.Closer.Close()
  460. }
  461. // stringTernary returns a if not empty, otherwise b
  462. func stringTernary(a, b string) string {
  463. if a == "" {
  464. return b
  465. }
  466. return a
  467. }
  468. // toValues converts s into type by t
  469. func toValue(s, t, name string) (interface{}, error) {
  470. if s == "" {
  471. return nil, nil
  472. }
  473. switch t {
  474. case stringDatatype:
  475. return s, nil
  476. case timeDatatypeRFC:
  477. return time.Parse(time.RFC3339, s)
  478. case timeDatatypeRFCNano:
  479. return time.Parse(time.RFC3339Nano, s)
  480. case durationDatatype:
  481. return time.ParseDuration(s)
  482. case doubleDatatype:
  483. return strconv.ParseFloat(s, 64)
  484. case boolDatatype:
  485. if strings.ToLower(s) == "false" {
  486. return false, nil
  487. }
  488. return true, nil
  489. case longDatatype:
  490. return strconv.ParseInt(s, 10, 64)
  491. case uLongDatatype:
  492. return strconv.ParseUint(s, 10, 64)
  493. case base64BinaryDataType:
  494. return base64.StdEncoding.DecodeString(s)
  495. default:
  496. return nil, fmt.Errorf("%s has unknown data type %s", name, t)
  497. }
  498. }