query.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "bytes"
  7. "compress/gzip"
  8. "context"
  9. "encoding/base64"
  10. "encoding/csv"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "net/http"
  16. "net/url"
  17. "path"
  18. "reflect"
  19. "strconv"
  20. "strings"
  21. "sync"
  22. "time"
  23. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  24. "github.com/influxdata/influxdb-client-go/v2/api/query"
  25. "github.com/influxdata/influxdb-client-go/v2/domain"
  26. "github.com/influxdata/influxdb-client-go/v2/internal/log"
  27. ilog "github.com/influxdata/influxdb-client-go/v2/log"
  28. )
  29. const (
  30. stringDatatype = "string"
  31. doubleDatatype = "double"
  32. boolDatatype = "boolean"
  33. longDatatype = "long"
  34. uLongDatatype = "unsignedLong"
  35. durationDatatype = "duration"
  36. base64BinaryDataType = "base64Binary"
  37. timeDatatypeRFC = "dateTime:RFC3339"
  38. timeDatatypeRFCNano = "dateTime:RFC3339Nano"
  39. )
  40. // QueryAPI provides methods for performing synchronously flux query against InfluxDB server.
  41. //
  42. // Flux query can contain reference to parameters, which must be passed via queryParams.
  43. // it can be a struct or map. Param values can be only simple types or time.Time.
  44. // The name of a struct field or a map key (must be a string) will be a param name.
  45. // The name of the parameter represented by a struct field can be specified by JSON annotation:
  46. //
  47. // type Condition struct {
  48. // Start time.Time `json:"start"`
  49. // Field string `json:"field"`
  50. // Value float64 `json:"value"`
  51. // }
  52. //
  53. // Parameters are then accessed via the Flux params object:
  54. //
  55. // query:= `from(bucket: "environment")
  56. // |> range(start: time(v: params.start))
  57. // |> filter(fn: (r) => r._measurement == "air")
  58. // |> filter(fn: (r) => r._field == params.field)
  59. // |> filter(fn: (r) => r._value > params.value)`
  60. type QueryAPI interface {
  61. // QueryRaw executes flux query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
  62. QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error)
  63. // QueryRawWithParams executes flux parametrized query on the InfluxDB server and returns complete query result as a string with table annotations according to dialect
  64. QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error)
  65. // Query executes flux query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
  66. Query(ctx context.Context, query string) (*QueryTableResult, error)
  67. // QueryWithParams executes flux parametrized query on the InfluxDB server and returns QueryTableResult which parses streamed response into structures representing flux table parts
  68. QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error)
  69. }
  70. // NewQueryAPI returns new query client for querying buckets belonging to org
  71. func NewQueryAPI(org string, service http2.Service) QueryAPI {
  72. return &queryAPI{
  73. org: org,
  74. httpService: service,
  75. }
  76. }
  77. // QueryTableResult parses streamed flux query response into structures representing flux table parts
  78. // Walking though the result is done by repeatedly calling Next() until returns false.
  79. // Actual flux table info (columns with names, data types, etc) is returned by TableMetadata() method.
  80. // Data are acquired by Record() method.
  81. // Preliminary end can be caused by an error, so when Next() return false, check Err() for an error
  82. type QueryTableResult struct {
  83. io.Closer
  84. csvReader *csv.Reader
  85. tablePosition int
  86. tableChanged bool
  87. table *query.FluxTableMetadata
  88. record *query.FluxRecord
  89. err error
  90. }
  91. // NewQueryTableResult returns new QueryTableResult
  92. func NewQueryTableResult(rawResponse io.ReadCloser) *QueryTableResult {
  93. csvReader := csv.NewReader(rawResponse)
  94. csvReader.FieldsPerRecord = -1
  95. return &QueryTableResult{Closer: rawResponse, csvReader: csvReader}
  96. }
  97. // queryAPI implements QueryAPI interface
  98. type queryAPI struct {
  99. org string
  100. httpService http2.Service
  101. url string
  102. lock sync.Mutex
  103. }
  104. // queryBody holds the body for an HTTP query request.
  105. type queryBody struct {
  106. Dialect *domain.Dialect `json:"dialect,omitempty"`
  107. Query string `json:"query"`
  108. Type domain.QueryType `json:"type"`
  109. Params interface{} `json:"params,omitempty"`
  110. }
  111. func (q *queryAPI) QueryRaw(ctx context.Context, query string, dialect *domain.Dialect) (string, error) {
  112. return q.QueryRawWithParams(ctx, query, dialect, nil)
  113. }
  114. func (q *queryAPI) QueryRawWithParams(ctx context.Context, query string, dialect *domain.Dialect, params interface{}) (string, error) {
  115. if err := checkParamsType(params); err != nil {
  116. return "", err
  117. }
  118. queryURL, err := q.queryURL()
  119. if err != nil {
  120. return "", err
  121. }
  122. qr := queryBody{
  123. Query: query,
  124. Type: domain.QueryTypeFlux,
  125. Dialect: dialect,
  126. Params: params,
  127. }
  128. qrJSON, err := json.Marshal(qr)
  129. if err != nil {
  130. return "", err
  131. }
  132. if log.Level() >= ilog.DebugLevel {
  133. log.Debugf("Query: %s", qrJSON)
  134. }
  135. var body string
  136. perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
  137. req.Header.Set("Content-Type", "application/json")
  138. req.Header.Set("Accept-Encoding", "gzip")
  139. },
  140. func(resp *http.Response) error {
  141. if resp.Header.Get("Content-Encoding") == "gzip" {
  142. resp.Body, err = gzip.NewReader(resp.Body)
  143. if err != nil {
  144. return err
  145. }
  146. }
  147. respBody, err := io.ReadAll(resp.Body)
  148. if err != nil {
  149. return err
  150. }
  151. body = string(respBody)
  152. return nil
  153. })
  154. if perror != nil {
  155. return "", perror
  156. }
  157. return body, nil
  158. }
  159. // DefaultDialect return flux query Dialect with full annotations (datatype, group, default), header and comma char as a delimiter
  160. func DefaultDialect() *domain.Dialect {
  161. annotations := []domain.DialectAnnotations{domain.DialectAnnotationsDatatype, domain.DialectAnnotationsGroup, domain.DialectAnnotationsDefault}
  162. delimiter := ","
  163. header := true
  164. return &domain.Dialect{
  165. Annotations: &annotations,
  166. Delimiter: &delimiter,
  167. Header: &header,
  168. }
  169. }
  170. func (q *queryAPI) Query(ctx context.Context, query string) (*QueryTableResult, error) {
  171. return q.QueryWithParams(ctx, query, nil)
  172. }
  173. func (q *queryAPI) QueryWithParams(ctx context.Context, query string, params interface{}) (*QueryTableResult, error) {
  174. var queryResult *QueryTableResult
  175. if err := checkParamsType(params); err != nil {
  176. return nil, err
  177. }
  178. queryURL, err := q.queryURL()
  179. if err != nil {
  180. return nil, err
  181. }
  182. qr := queryBody{
  183. Query: query,
  184. Type: domain.QueryTypeFlux,
  185. Dialect: DefaultDialect(),
  186. Params: params,
  187. }
  188. qrJSON, err := json.Marshal(qr)
  189. if err != nil {
  190. return nil, err
  191. }
  192. if log.Level() >= ilog.DebugLevel {
  193. log.Debugf("Query: %s", qrJSON)
  194. }
  195. perror := q.httpService.DoPostRequest(ctx, queryURL, bytes.NewReader(qrJSON), func(req *http.Request) {
  196. req.Header.Set("Content-Type", "application/json")
  197. req.Header.Set("Accept-Encoding", "gzip")
  198. },
  199. func(resp *http.Response) error {
  200. if resp.Header.Get("Content-Encoding") == "gzip" {
  201. resp.Body, err = gzip.NewReader(resp.Body)
  202. if err != nil {
  203. return err
  204. }
  205. }
  206. csvReader := csv.NewReader(resp.Body)
  207. csvReader.FieldsPerRecord = -1
  208. queryResult = &QueryTableResult{Closer: resp.Body, csvReader: csvReader}
  209. return nil
  210. })
  211. if perror != nil {
  212. return queryResult, perror
  213. }
  214. return queryResult, nil
  215. }
  216. func (q *queryAPI) queryURL() (string, error) {
  217. if q.url == "" {
  218. u, err := url.Parse(q.httpService.ServerAPIURL())
  219. if err != nil {
  220. return "", err
  221. }
  222. u.Path = path.Join(u.Path, "query")
  223. params := u.Query()
  224. params.Set("org", q.org)
  225. u.RawQuery = params.Encode()
  226. q.lock.Lock()
  227. q.url = u.String()
  228. q.lock.Unlock()
  229. }
  230. return q.url, nil
  231. }
  232. // checkParamsType validates the value is struct with simple type fields
  233. // or a map with key as string and value as a simple type
  234. func checkParamsType(p interface{}) error {
  235. if p == nil {
  236. return nil
  237. }
  238. t := reflect.TypeOf(p)
  239. v := reflect.ValueOf(p)
  240. if t.Kind() == reflect.Ptr {
  241. t = t.Elem()
  242. v = v.Elem()
  243. }
  244. if t.Kind() != reflect.Struct && t.Kind() != reflect.Map {
  245. return fmt.Errorf("cannot use %v as query params", t)
  246. }
  247. switch t.Kind() {
  248. case reflect.Struct:
  249. fields := reflect.VisibleFields(t)
  250. for _, f := range fields {
  251. fv := v.FieldByIndex(f.Index)
  252. t := getFieldType(fv)
  253. if !validParamType(t) {
  254. return fmt.Errorf("cannot use field '%s' of type '%v' as a query param", f.Name, t)
  255. }
  256. }
  257. case reflect.Map:
  258. key := t.Key()
  259. if key.Kind() != reflect.String {
  260. return fmt.Errorf("cannot use map key of type '%v' for query param name", key)
  261. }
  262. for _, k := range v.MapKeys() {
  263. f := v.MapIndex(k)
  264. t := getFieldType(f)
  265. if !validParamType(t) {
  266. return fmt.Errorf("cannot use map value type '%v' as a query param", t)
  267. }
  268. }
  269. }
  270. return nil
  271. }
  272. // validParamType validates that t is primitive type or string or interface
  273. func validParamType(t reflect.Type) bool {
  274. return (t.Kind() > reflect.Invalid && t.Kind() < reflect.Complex64) ||
  275. t.Kind() == reflect.String ||
  276. t == timeType
  277. }
  278. // TablePosition returns actual flux table position in the result, or -1 if no table was found yet
  279. // Each new table is introduced by an annotation in csv
  280. func (q *QueryTableResult) TablePosition() int {
  281. if q.table != nil {
  282. return q.table.Position()
  283. }
  284. return -1
  285. }
  286. // TableMetadata returns actual flux table metadata
  287. func (q *QueryTableResult) TableMetadata() *query.FluxTableMetadata {
  288. return q.table
  289. }
  290. // TableChanged returns true if last call of Next() found also new result table
  291. // Table information is available via TableMetadata method
  292. func (q *QueryTableResult) TableChanged() bool {
  293. return q.tableChanged
  294. }
  295. // Record returns last parsed flux table data row
  296. // Use Record methods to access value and row properties
  297. func (q *QueryTableResult) Record() *query.FluxRecord {
  298. return q.record
  299. }
  300. type parsingState int
  301. const (
  302. parsingStateNormal parsingState = iota
  303. parsingStateAnnotation
  304. parsingStateNameRow
  305. parsingStateError
  306. )
  307. // Next advances to next row in query result.
  308. // During the first time it is called, Next creates also table metadata
  309. // Actual parsed row is available through Record() function
  310. // Returns false in case of end or an error, otherwise true
  311. func (q *QueryTableResult) Next() bool {
  312. var row []string
  313. // set closing query in case of preliminary return
  314. closer := func() {
  315. if err := q.Close(); err != nil {
  316. message := err.Error()
  317. if q.err != nil {
  318. message = fmt.Sprintf("%s,%s", message, q.err.Error())
  319. }
  320. q.err = errors.New(message)
  321. }
  322. }
  323. defer func() {
  324. closer()
  325. }()
  326. parsingState := parsingStateNormal
  327. q.tableChanged = false
  328. dataTypeAnnotationFound := false
  329. readRow:
  330. row, q.err = q.csvReader.Read()
  331. if q.err == io.EOF {
  332. q.err = nil
  333. return false
  334. }
  335. if q.err != nil {
  336. return false
  337. }
  338. if len(row) <= 1 {
  339. goto readRow
  340. }
  341. if len(row[0]) > 0 && row[0][0] == '#' {
  342. if parsingState == parsingStateNormal {
  343. q.table = query.NewFluxTableMetadata(q.tablePosition)
  344. q.tablePosition++
  345. q.tableChanged = true
  346. for i := range row[1:] {
  347. q.table.AddColumn(query.NewFluxColumn(i))
  348. }
  349. parsingState = parsingStateAnnotation
  350. }
  351. }
  352. if q.table == nil {
  353. q.err = errors.New("parsing error, annotations not found")
  354. return false
  355. }
  356. if len(row)-1 != len(q.table.Columns()) {
  357. q.err = fmt.Errorf("parsing error, row has different number of columns than the table: %d vs %d", len(row)-1, len(q.table.Columns()))
  358. return false
  359. }
  360. switch row[0] {
  361. case "":
  362. switch parsingState {
  363. case parsingStateAnnotation:
  364. if !dataTypeAnnotationFound {
  365. q.err = errors.New("parsing error, datatype annotation not found")
  366. return false
  367. }
  368. parsingState = parsingStateNameRow
  369. fallthrough
  370. case parsingStateNameRow:
  371. if row[1] == "error" {
  372. parsingState = parsingStateError
  373. } else {
  374. for i, n := range row[1:] {
  375. if q.table.Column(i) != nil {
  376. q.table.Column(i).SetName(n)
  377. }
  378. }
  379. parsingState = parsingStateNormal
  380. }
  381. goto readRow
  382. case parsingStateError:
  383. var message string
  384. if len(row) > 1 && len(row[1]) > 0 {
  385. message = row[1]
  386. } else {
  387. message = "unknown query error"
  388. }
  389. reference := ""
  390. if len(row) > 2 && len(row[2]) > 0 {
  391. reference = fmt.Sprintf(",%s", row[2])
  392. }
  393. q.err = fmt.Errorf("%s%s", message, reference)
  394. return false
  395. }
  396. values := make(map[string]interface{})
  397. for i, v := range row[1:] {
  398. if q.table.Column(i) != nil {
  399. values[q.table.Column(i).Name()], q.err = toValue(stringTernary(v, q.table.Column(i).DefaultValue()), q.table.Column(i).DataType(), q.table.Column(i).Name())
  400. if q.err != nil {
  401. return false
  402. }
  403. }
  404. }
  405. q.record = query.NewFluxRecord(q.table.Position(), values)
  406. case "#datatype":
  407. dataTypeAnnotationFound = true
  408. for i, d := range row[1:] {
  409. if q.table.Column(i) != nil {
  410. q.table.Column(i).SetDataType(d)
  411. }
  412. }
  413. goto readRow
  414. case "#group":
  415. for i, g := range row[1:] {
  416. if q.table.Column(i) != nil {
  417. q.table.Column(i).SetGroup(g == "true")
  418. }
  419. }
  420. goto readRow
  421. case "#default":
  422. for i, c := range row[1:] {
  423. if q.table.Column(i) != nil {
  424. q.table.Column(i).SetDefaultValue(c)
  425. }
  426. }
  427. goto readRow
  428. }
  429. // don't close query
  430. closer = func() {}
  431. return true
  432. }
  433. // Err returns an error raised during flux query response parsing
  434. func (q *QueryTableResult) Err() error {
  435. return q.err
  436. }
  437. // Close reads remaining data and closes underlying Closer
  438. func (q *QueryTableResult) Close() error {
  439. var err error
  440. for err == nil {
  441. _, err = q.csvReader.Read()
  442. }
  443. return q.Closer.Close()
  444. }
  445. // stringTernary returns a if not empty, otherwise b
  446. func stringTernary(a, b string) string {
  447. if a == "" {
  448. return b
  449. }
  450. return a
  451. }
  452. // toValues converts s into type by t
  453. func toValue(s, t, name string) (interface{}, error) {
  454. if s == "" {
  455. return nil, nil
  456. }
  457. switch t {
  458. case stringDatatype:
  459. return s, nil
  460. case timeDatatypeRFC:
  461. return time.Parse(time.RFC3339, s)
  462. case timeDatatypeRFCNano:
  463. return time.Parse(time.RFC3339Nano, s)
  464. case durationDatatype:
  465. return time.ParseDuration(s)
  466. case doubleDatatype:
  467. return strconv.ParseFloat(s, 64)
  468. case boolDatatype:
  469. if strings.ToLower(s) == "false" {
  470. return false, nil
  471. }
  472. return true, nil
  473. case longDatatype:
  474. return strconv.ParseInt(s, 10, 64)
  475. case uLongDatatype:
  476. return strconv.ParseUint(s, 10, 64)
  477. case base64BinaryDataType:
  478. return base64.StdEncoding.DecodeString(s)
  479. default:
  480. return nil, fmt.Errorf("%s has unknown data type %s", name, t)
  481. }
  482. }