query_test.go 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. package api
  5. import (
  6. "context"
  7. "fmt"
  8. "io/ioutil"
  9. "net/http"
  10. "net/http/httptest"
  11. "strings"
  12. "testing"
  13. "time"
  14. http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
  15. "github.com/influxdata/influxdb-client-go/v2/api/query"
  16. "github.com/influxdata/influxdb-client-go/v2/internal/gzip"
  17. "github.com/stretchr/testify/assert"
  18. "github.com/stretchr/testify/require"
  19. )
  20. func mustParseTime(s string) time.Time {
  21. t, err := time.Parse(time.RFC3339, s)
  22. if err != nil {
  23. panic(err)
  24. }
  25. return t
  26. }
  27. func TestQueryCVSResultSingleTable(t *testing.T) {
  28. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  29. #group,false,false,true,true,false,false,true,true,true,true
  30. #default,_result,,,,,,,,,
  31. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  32. ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
  33. ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
  34. `
  35. expectedTable := query.NewFluxTableMetadataFull(0,
  36. []*query.FluxColumn{
  37. query.NewFluxColumnFull("string", "_result", "result", false, 0),
  38. query.NewFluxColumnFull("long", "", "table", false, 1),
  39. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  40. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  41. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  42. query.NewFluxColumnFull("double", "", "_value", false, 5),
  43. query.NewFluxColumnFull("string", "", "_field", true, 6),
  44. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  45. query.NewFluxColumnFull("string", "", "a", true, 8),
  46. query.NewFluxColumnFull("string", "", "b", true, 9),
  47. },
  48. )
  49. expectedRecord1 := query.NewFluxRecord(0,
  50. map[string]interface{}{
  51. "result": "_result",
  52. "table": int64(1),
  53. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  54. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  55. "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"),
  56. "_value": 1.4,
  57. "_field": "f",
  58. "_measurement": "test",
  59. "a": "1",
  60. "b": "adsfasdf",
  61. },
  62. )
  63. expectedRecord2 := query.NewFluxRecord(0,
  64. map[string]interface{}{
  65. "result": "_result",
  66. "table": int64(1),
  67. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  68. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  69. "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"),
  70. "_value": 6.6,
  71. "_field": "f",
  72. "_measurement": "test",
  73. "a": "1",
  74. "b": "adsfasdf",
  75. },
  76. )
  77. reader := strings.NewReader(csvTable)
  78. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  79. require.True(t, queryResult.Next(), queryResult.Err())
  80. require.Nil(t, queryResult.Err())
  81. require.Equal(t, queryResult.table, expectedTable)
  82. assert.True(t, queryResult.tableChanged)
  83. require.NotNil(t, queryResult.Record())
  84. require.Equal(t, queryResult.Record(), expectedRecord1)
  85. require.True(t, queryResult.Next(), queryResult.Err())
  86. require.Nil(t, queryResult.Err())
  87. assert.False(t, queryResult.tableChanged)
  88. require.NotNil(t, queryResult.Record())
  89. require.Equal(t, queryResult.Record(), expectedRecord2)
  90. assert.Equal(t, "_result", queryResult.Record().Result())
  91. assert.Equal(t, 1, queryResult.Record().Table())
  92. require.False(t, queryResult.Next())
  93. require.Nil(t, queryResult.Err())
  94. }
  95. func TestQueryCVSResultMultiTables(t *testing.T) {
  96. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  97. #group,false,false,true,true,false,false,true,true,true,true
  98. #default,_result1,,,,,,,,,
  99. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  100. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
  101. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
  102. #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string
  103. #group,false,false,true,true,false,false,true,true,true,true
  104. #default,_result2,,,,,,,,,
  105. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  106. ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf
  107. ,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,-1,i,test,1,adsfasdf
  108. #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,boolean,string,string,string,string
  109. #group,false,false,true,true,false,false,true,true,true,true
  110. #default,_result3,,,,,,,,,
  111. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  112. ,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,false,f,test,0,adsfasdf
  113. ,,2,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,true,f,test,0,adsfasdf
  114. #datatype,string,long,dateTime:RFC3339Nano,dateTime:RFC3339Nano,dateTime:RFC3339Nano,unsignedLong,string,string,string,string
  115. #group,false,false,true,true,false,false,true,true,true,true
  116. #default,_result4,,,,,,,,,
  117. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  118. ,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.62797864Z,0,i,test,0,adsfasdf
  119. ,,3,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.969100374Z,2,i,test,0,adsfasdf
  120. `
  121. expectedTable1 := query.NewFluxTableMetadataFull(0,
  122. []*query.FluxColumn{
  123. query.NewFluxColumnFull("string", "_result1", "result", false, 0),
  124. query.NewFluxColumnFull("long", "", "table", false, 1),
  125. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  126. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  127. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  128. query.NewFluxColumnFull("double", "", "_value", false, 5),
  129. query.NewFluxColumnFull("string", "", "_field", true, 6),
  130. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  131. query.NewFluxColumnFull("string", "", "a", true, 8),
  132. query.NewFluxColumnFull("string", "", "b", true, 9),
  133. },
  134. )
  135. expectedRecord11 := query.NewFluxRecord(0,
  136. map[string]interface{}{
  137. "result": "_result1",
  138. "table": int64(0),
  139. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  140. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  141. "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"),
  142. "_value": 1.4,
  143. "_field": "f",
  144. "_measurement": "test",
  145. "a": "1",
  146. "b": "adsfasdf",
  147. },
  148. )
  149. expectedRecord12 := query.NewFluxRecord(0,
  150. map[string]interface{}{
  151. "result": "_result1",
  152. "table": int64(0),
  153. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  154. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  155. "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"),
  156. "_value": 6.6,
  157. "_field": "f",
  158. "_measurement": "test",
  159. "a": "1",
  160. "b": "adsfasdf",
  161. },
  162. )
  163. expectedTable2 := query.NewFluxTableMetadataFull(1,
  164. []*query.FluxColumn{
  165. query.NewFluxColumnFull("string", "_result2", "result", false, 0),
  166. query.NewFluxColumnFull("long", "", "table", false, 1),
  167. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  168. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  169. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  170. query.NewFluxColumnFull("long", "", "_value", false, 5),
  171. query.NewFluxColumnFull("string", "", "_field", true, 6),
  172. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  173. query.NewFluxColumnFull("string", "", "a", true, 8),
  174. query.NewFluxColumnFull("string", "", "b", true, 9),
  175. },
  176. )
  177. expectedRecord21 := query.NewFluxRecord(1,
  178. map[string]interface{}{
  179. "result": "_result2",
  180. "table": int64(1),
  181. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  182. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  183. "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"),
  184. "_value": int64(4),
  185. "_field": "i",
  186. "_measurement": "test",
  187. "a": "1",
  188. "b": "adsfasdf",
  189. },
  190. )
  191. expectedRecord22 := query.NewFluxRecord(1,
  192. map[string]interface{}{
  193. "result": "_result2",
  194. "table": int64(1),
  195. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  196. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  197. "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"),
  198. "_value": int64(-1),
  199. "_field": "i",
  200. "_measurement": "test",
  201. "a": "1",
  202. "b": "adsfasdf",
  203. },
  204. )
  205. expectedTable3 := query.NewFluxTableMetadataFull(2,
  206. []*query.FluxColumn{
  207. query.NewFluxColumnFull("string", "_result3", "result", false, 0),
  208. query.NewFluxColumnFull("long", "", "table", false, 1),
  209. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  210. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  211. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  212. query.NewFluxColumnFull("boolean", "", "_value", false, 5),
  213. query.NewFluxColumnFull("string", "", "_field", true, 6),
  214. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  215. query.NewFluxColumnFull("string", "", "a", true, 8),
  216. query.NewFluxColumnFull("string", "", "b", true, 9),
  217. },
  218. )
  219. expectedRecord31 := query.NewFluxRecord(2,
  220. map[string]interface{}{
  221. "result": "_result3",
  222. "table": int64(2),
  223. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  224. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  225. "_time": mustParseTime("2020-02-18T22:08:44.62797864Z"),
  226. "_value": false,
  227. "_field": "f",
  228. "_measurement": "test",
  229. "a": "0",
  230. "b": "adsfasdf",
  231. },
  232. )
  233. expectedRecord32 := query.NewFluxRecord(2,
  234. map[string]interface{}{
  235. "result": "_result3",
  236. "table": int64(2),
  237. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  238. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  239. "_time": mustParseTime("2020-02-18T22:08:44.969100374Z"),
  240. "_value": true,
  241. "_field": "f",
  242. "_measurement": "test",
  243. "a": "0",
  244. "b": "adsfasdf",
  245. },
  246. )
  247. expectedTable4 := query.NewFluxTableMetadataFull(3,
  248. []*query.FluxColumn{
  249. query.NewFluxColumnFull("string", "_result4", "result", false, 0),
  250. query.NewFluxColumnFull("long", "", "table", false, 1),
  251. query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_start", true, 2),
  252. query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_stop", true, 3),
  253. query.NewFluxColumnFull("dateTime:RFC3339Nano", "", "_time", false, 4),
  254. query.NewFluxColumnFull("unsignedLong", "", "_value", false, 5),
  255. query.NewFluxColumnFull("string", "", "_field", true, 6),
  256. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  257. query.NewFluxColumnFull("string", "", "a", true, 8),
  258. query.NewFluxColumnFull("string", "", "b", true, 9),
  259. },
  260. )
  261. expectedRecord41 := query.NewFluxRecord(3,
  262. map[string]interface{}{
  263. "result": "_result4",
  264. "table": int64(3),
  265. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  266. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  267. "_time": mustParseTime("2020-02-18T22:08:44.62797864Z"),
  268. "_value": uint64(0),
  269. "_field": "i",
  270. "_measurement": "test",
  271. "a": "0",
  272. "b": "adsfasdf",
  273. },
  274. )
  275. expectedRecord42 := query.NewFluxRecord(3,
  276. map[string]interface{}{
  277. "result": "_result4",
  278. "table": int64(3),
  279. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  280. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  281. "_time": mustParseTime("2020-02-18T22:08:44.969100374Z"),
  282. "_value": uint64(2),
  283. "_field": "i",
  284. "_measurement": "test",
  285. "a": "0",
  286. "b": "adsfasdf",
  287. },
  288. )
  289. reader := strings.NewReader(csvTable)
  290. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  291. assert.Equal(t, -1, queryResult.TablePosition())
  292. require.True(t, queryResult.Next(), queryResult.Err())
  293. require.Nil(t, queryResult.Err())
  294. require.Equal(t, queryResult.TableMetadata(), expectedTable1)
  295. require.NotNil(t, queryResult.Record())
  296. require.Equal(t, queryResult.Record(), expectedRecord11)
  297. assert.True(t, queryResult.tableChanged)
  298. assert.Equal(t, 0, queryResult.TablePosition())
  299. require.True(t, queryResult.Next(), queryResult.Err())
  300. require.Nil(t, queryResult.Err())
  301. require.Equal(t, queryResult.TableMetadata(), expectedTable1)
  302. assert.False(t, queryResult.tableChanged)
  303. require.NotNil(t, queryResult.Record())
  304. require.Equal(t, queryResult.Record(), expectedRecord12)
  305. assert.Equal(t, 0, queryResult.TablePosition())
  306. require.True(t, queryResult.Next(), queryResult.Err())
  307. require.Nil(t, queryResult.Err())
  308. assert.True(t, queryResult.tableChanged)
  309. assert.Equal(t, 1, queryResult.TablePosition())
  310. require.Equal(t, queryResult.table, expectedTable2)
  311. require.NotNil(t, queryResult.Record())
  312. require.Equal(t, queryResult.Record(), expectedRecord21)
  313. require.True(t, queryResult.Next(), queryResult.Err())
  314. require.Nil(t, queryResult.Err())
  315. assert.False(t, queryResult.tableChanged)
  316. assert.Equal(t, 1, queryResult.TablePosition())
  317. require.Equal(t, queryResult.table, expectedTable2)
  318. require.NotNil(t, queryResult.Record())
  319. require.Equal(t, queryResult.Record(), expectedRecord22)
  320. require.True(t, queryResult.Next(), queryResult.Err())
  321. require.Nil(t, queryResult.Err(), queryResult.Err())
  322. assert.True(t, queryResult.tableChanged)
  323. assert.Equal(t, 2, queryResult.TablePosition())
  324. require.Equal(t, queryResult.table, expectedTable3)
  325. require.NotNil(t, queryResult.Record())
  326. require.Equal(t, queryResult.Record(), expectedRecord31)
  327. require.True(t, queryResult.Next(), queryResult.Err())
  328. require.Nil(t, queryResult.Err())
  329. assert.False(t, queryResult.tableChanged)
  330. assert.Equal(t, 2, queryResult.TablePosition())
  331. require.Equal(t, queryResult.table, expectedTable3)
  332. require.NotNil(t, queryResult.Record())
  333. require.Equal(t, queryResult.Record(), expectedRecord32)
  334. require.True(t, queryResult.Next(), queryResult.Err())
  335. require.Nil(t, queryResult.Err())
  336. assert.True(t, queryResult.tableChanged)
  337. assert.Equal(t, 3, queryResult.TablePosition())
  338. require.Equal(t, queryResult.table, expectedTable4)
  339. require.NotNil(t, queryResult.Record())
  340. require.Equal(t, queryResult.Record(), expectedRecord41)
  341. require.True(t, queryResult.Next(), queryResult.Err())
  342. require.Nil(t, queryResult.Err())
  343. assert.False(t, queryResult.tableChanged)
  344. assert.Equal(t, 3, queryResult.TablePosition())
  345. require.Equal(t, queryResult.table, expectedTable4)
  346. require.NotNil(t, queryResult.Record())
  347. require.Equal(t, queryResult.Record(), expectedRecord42)
  348. assert.Equal(t, "_result4", queryResult.Record().Result())
  349. assert.Equal(t, 3, queryResult.Record().Table())
  350. require.False(t, queryResult.Next())
  351. require.Nil(t, queryResult.Err())
  352. }
  353. func TestQueryCVSResultSingleTableMultiColumnsNoValue(t *testing.T) {
  354. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,duration,base64Binary,dateTime:RFC3339
  355. #group,false,false,true,true,false,true,true,false,false,false
  356. #default,_result,,,,,,,,,
  357. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  358. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
  359. ,,1,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
  360. `
  361. expectedTable := query.NewFluxTableMetadataFull(0,
  362. []*query.FluxColumn{
  363. query.NewFluxColumnFull("string", "_result", "result", false, 0),
  364. query.NewFluxColumnFull("long", "", "table", false, 1),
  365. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  366. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  367. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  368. query.NewFluxColumnFull("long", "", "deviceId", true, 5),
  369. query.NewFluxColumnFull("string", "", "sensor", true, 6),
  370. query.NewFluxColumnFull("duration", "", "elapsed", false, 7),
  371. query.NewFluxColumnFull("base64Binary", "", "note", false, 8),
  372. query.NewFluxColumnFull("dateTime:RFC3339", "", "start", false, 9),
  373. },
  374. )
  375. expectedRecord1 := query.NewFluxRecord(0,
  376. map[string]interface{}{
  377. "result": "_result",
  378. "table": int64(0),
  379. "_start": mustParseTime("2020-04-28T12:36:50.990018157Z"),
  380. "_stop": mustParseTime("2020-04-28T12:51:50.990018157Z"),
  381. "_time": mustParseTime("2020-04-28T12:38:11.480545389Z"),
  382. "deviceId": int64(1467463),
  383. "sensor": "BME280",
  384. "elapsed": time.Minute + time.Second,
  385. "note": []byte("datainbase64"),
  386. "start": time.Date(2020, 4, 27, 0, 0, 0, 0, time.UTC),
  387. },
  388. )
  389. expectedRecord2 := query.NewFluxRecord(0,
  390. map[string]interface{}{
  391. "result": "_result",
  392. "table": int64(1),
  393. "_start": mustParseTime("2020-04-28T12:36:50.990018157Z"),
  394. "_stop": mustParseTime("2020-04-28T12:51:50.990018157Z"),
  395. "_time": mustParseTime("2020-04-28T12:39:36.330153686Z"),
  396. "deviceId": int64(1467463),
  397. "sensor": "BME280",
  398. "elapsed": time.Hour + 20*time.Minute + 30*time.Second + 132450000*time.Nanosecond,
  399. "note": []byte("xxxxxccccccddddd"),
  400. "start": time.Date(2020, 4, 28, 0, 0, 0, 0, time.UTC),
  401. },
  402. )
  403. reader := strings.NewReader(csvTable)
  404. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  405. require.True(t, queryResult.Next(), queryResult.Err())
  406. require.Nil(t, queryResult.Err())
  407. require.Equal(t, queryResult.table, expectedTable)
  408. assert.True(t, queryResult.tableChanged)
  409. require.NotNil(t, queryResult.Record())
  410. assert.Equal(t, queryResult.Record(), expectedRecord1)
  411. assert.Nil(t, queryResult.Record().Value())
  412. assert.Equal(t, 0, queryResult.TablePosition())
  413. require.True(t, queryResult.Next(), queryResult.Err())
  414. require.Nil(t, queryResult.Err())
  415. assert.False(t, queryResult.tableChanged)
  416. assert.Equal(t, 0, queryResult.TablePosition())
  417. require.NotNil(t, queryResult.Record())
  418. assert.Equal(t, queryResult.Record(), expectedRecord2)
  419. require.False(t, queryResult.Next())
  420. require.Nil(t, queryResult.Err())
  421. }
  422. func TestQueryRawResult(t *testing.T) {
  423. csvRows := []string{`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string`,
  424. `#group,false,false,true,true,false,false,true,true,true,true`,
  425. `#default,_result,,,,,,,,,`,
  426. `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b`,
  427. `,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf`,
  428. `,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf`,
  429. ``,
  430. `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,long,string,string,string,string`,
  431. `#group,false,false,true,true,false,false,true,true,true,true`,
  432. `#default,_result2,,,,,,,,,`,
  433. `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b`,
  434. `,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,4,i,test,1,adsfasdf`,
  435. `,,1,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,1,i,test,1,adsfasdf`,
  436. ``,
  437. }
  438. csvTable := strings.Join(csvRows, "\r\n")
  439. csvTable = fmt.Sprintf("%s\r\n", csvTable)
  440. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  441. <-time.After(100 * time.Millisecond)
  442. if r.Method == http.MethodPost {
  443. rbody, _ := ioutil.ReadAll(r.Body)
  444. fmt.Printf("Req: %s\n", string(rbody))
  445. body, err := gzip.CompressWithGzip(strings.NewReader(csvTable))
  446. if err == nil {
  447. var bytes []byte
  448. bytes, err = ioutil.ReadAll(body)
  449. if err == nil {
  450. w.Header().Set("Content-Type", "text/csv")
  451. w.Header().Set("Content-Encoding", "gzip")
  452. w.WriteHeader(http.StatusOK)
  453. _, _ = w.Write(bytes)
  454. }
  455. }
  456. if err != nil {
  457. w.WriteHeader(http.StatusInternalServerError)
  458. _, _ = w.Write([]byte(err.Error()))
  459. }
  460. } else {
  461. w.WriteHeader(http.StatusNotFound)
  462. }
  463. }))
  464. defer server.Close()
  465. queryAPI := NewQueryAPI("org", http2.NewService(server.URL, "a", http2.DefaultOptions()))
  466. result, err := queryAPI.QueryRaw(context.Background(), "flux", nil)
  467. require.Nil(t, err)
  468. require.NotNil(t, result)
  469. assert.Equal(t, csvTable, result)
  470. }
  471. func TestErrorInRow(t *testing.T) {
  472. csvRowsError := []string{
  473. `#datatype,string,string`,
  474. `#group,true,true`,
  475. `#default,,`,
  476. `,error,reference`,
  477. `,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,897`}
  478. csvTable := makeCSVstring(csvRowsError)
  479. reader := strings.NewReader(csvTable)
  480. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  481. require.False(t, queryResult.Next())
  482. require.NotNil(t, queryResult.Err())
  483. assert.Equal(t, "failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,897", queryResult.Err().Error())
  484. csvRowsErrorNoReference := []string{
  485. `#datatype,string,string`,
  486. `#group,true,true`,
  487. `#default,,`,
  488. `,error,reference`,
  489. `,failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time,`}
  490. csvTable = makeCSVstring(csvRowsErrorNoReference)
  491. reader = strings.NewReader(csvTable)
  492. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  493. require.False(t, queryResult.Next())
  494. require.NotNil(t, queryResult.Err())
  495. assert.Equal(t, "failed to create physical plan: invalid time bounds from procedure from: bounds contain zero time", queryResult.Err().Error())
  496. csvRowsErrorNoMessage := []string{
  497. `#datatype,string,string`,
  498. `#group,true,true`,
  499. `#default,,`,
  500. `,error,reference`,
  501. `,,`}
  502. csvTable = makeCSVstring(csvRowsErrorNoMessage)
  503. reader = strings.NewReader(csvTable)
  504. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  505. require.False(t, queryResult.Next())
  506. require.NotNil(t, queryResult.Err())
  507. assert.Equal(t, "unknown query error", queryResult.Err().Error())
  508. }
  509. func TestInvalidDataType(t *testing.T) {
  510. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339
  511. #group,false,false,true,true,false,true,true,false,false,false
  512. #default,_result,,,,,,,,,
  513. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  514. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
  515. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
  516. `
  517. reader := strings.NewReader(csvTable)
  518. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  519. require.False(t, queryResult.Next())
  520. require.NotNil(t, queryResult.Err())
  521. assert.Equal(t, "deviceId has unknown data type int", queryResult.Err().Error())
  522. }
  523. func TestReorderedAnnotations(t *testing.T) {
  524. expectedTable := query.NewFluxTableMetadataFull(0,
  525. []*query.FluxColumn{
  526. query.NewFluxColumnFull("string", "_result", "result", false, 0),
  527. query.NewFluxColumnFull("long", "", "table", false, 1),
  528. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", true, 2),
  529. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", true, 3),
  530. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  531. query.NewFluxColumnFull("double", "", "_value", false, 5),
  532. query.NewFluxColumnFull("string", "", "_field", true, 6),
  533. query.NewFluxColumnFull("string", "", "_measurement", true, 7),
  534. query.NewFluxColumnFull("string", "", "a", true, 8),
  535. query.NewFluxColumnFull("string", "", "b", true, 9),
  536. },
  537. )
  538. expectedRecord1 := query.NewFluxRecord(0,
  539. map[string]interface{}{
  540. "result": "_result",
  541. "table": int64(0),
  542. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  543. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  544. "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"),
  545. "_value": 1.4,
  546. "_field": "f",
  547. "_measurement": "test",
  548. "a": "1",
  549. "b": "adsfasdf",
  550. },
  551. )
  552. expectedRecord2 := query.NewFluxRecord(0,
  553. map[string]interface{}{
  554. "result": "_result",
  555. "table": int64(0),
  556. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  557. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  558. "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"),
  559. "_value": 6.6,
  560. "_field": "f",
  561. "_measurement": "test",
  562. "a": "1",
  563. "b": "adsfasdf",
  564. },
  565. )
  566. csvTable1 := `#group,false,false,true,true,false,false,true,true,true,true
  567. #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  568. #default,_result,,,,,,,,,
  569. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  570. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
  571. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
  572. `
  573. reader := strings.NewReader(csvTable1)
  574. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  575. require.True(t, queryResult.Next(), queryResult.Err())
  576. require.Nil(t, queryResult.Err())
  577. require.Equal(t, queryResult.table, expectedTable)
  578. assert.True(t, queryResult.tableChanged)
  579. require.NotNil(t, queryResult.Record())
  580. require.Equal(t, queryResult.Record(), expectedRecord1)
  581. require.True(t, queryResult.Next(), queryResult.Err())
  582. require.Nil(t, queryResult.Err())
  583. assert.False(t, queryResult.tableChanged)
  584. require.NotNil(t, queryResult.Record())
  585. require.Equal(t, queryResult.Record(), expectedRecord2)
  586. require.False(t, queryResult.Next())
  587. require.Nil(t, queryResult.Err())
  588. csvTable2 := `#default,_result,,,,,,,,,
  589. #group,false,false,true,true,false,false,true,true,true,true
  590. #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  591. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  592. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
  593. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
  594. `
  595. reader = strings.NewReader(csvTable2)
  596. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  597. require.True(t, queryResult.Next(), queryResult.Err())
  598. require.Nil(t, queryResult.Err())
  599. require.Equal(t, queryResult.table, expectedTable)
  600. assert.True(t, queryResult.tableChanged)
  601. require.NotNil(t, queryResult.Record())
  602. require.Equal(t, queryResult.Record(), expectedRecord1)
  603. require.True(t, queryResult.Next(), queryResult.Err())
  604. require.Nil(t, queryResult.Err())
  605. assert.False(t, queryResult.tableChanged)
  606. require.NotNil(t, queryResult.Record())
  607. require.Equal(t, queryResult.Record(), expectedRecord2)
  608. require.False(t, queryResult.Next())
  609. require.Nil(t, queryResult.Err())
  610. }
  611. func TestDatatypeOnlyAnnotation(t *testing.T) {
  612. expectedTable := query.NewFluxTableMetadataFull(0,
  613. []*query.FluxColumn{
  614. query.NewFluxColumnFull("string", "", "result", false, 0),
  615. query.NewFluxColumnFull("long", "", "table", false, 1),
  616. query.NewFluxColumnFull("dateTime:RFC3339", "", "_start", false, 2),
  617. query.NewFluxColumnFull("dateTime:RFC3339", "", "_stop", false, 3),
  618. query.NewFluxColumnFull("dateTime:RFC3339", "", "_time", false, 4),
  619. query.NewFluxColumnFull("double", "", "_value", false, 5),
  620. query.NewFluxColumnFull("string", "", "_field", false, 6),
  621. query.NewFluxColumnFull("string", "", "_measurement", false, 7),
  622. query.NewFluxColumnFull("string", "", "a", false, 8),
  623. query.NewFluxColumnFull("string", "", "b", false, 9),
  624. },
  625. )
  626. expectedRecord1 := query.NewFluxRecord(0,
  627. map[string]interface{}{
  628. "result": nil,
  629. "table": int64(0),
  630. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  631. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  632. "_time": mustParseTime("2020-02-18T10:34:08.135814545Z"),
  633. "_value": 1.4,
  634. "_field": "f",
  635. "_measurement": "test",
  636. "a": "1",
  637. "b": "adsfasdf",
  638. },
  639. )
  640. expectedRecord2 := query.NewFluxRecord(0,
  641. map[string]interface{}{
  642. "result": nil,
  643. "table": int64(0),
  644. "_start": mustParseTime("2020-02-17T22:19:49.747562847Z"),
  645. "_stop": mustParseTime("2020-02-18T22:19:49.747562847Z"),
  646. "_time": mustParseTime("2020-02-18T22:08:44.850214724Z"),
  647. "_value": 6.6,
  648. "_field": "f",
  649. "_measurement": "test",
  650. "a": "1",
  651. "b": "adsfasdf",
  652. },
  653. )
  654. csvTable1 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  655. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  656. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
  657. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,1,adsfasdf
  658. `
  659. reader := strings.NewReader(csvTable1)
  660. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  661. require.True(t, queryResult.Next(), queryResult.Err())
  662. require.Nil(t, queryResult.Err())
  663. require.Equal(t, queryResult.table, expectedTable)
  664. assert.True(t, queryResult.tableChanged)
  665. require.NotNil(t, queryResult.Record())
  666. require.Equal(t, queryResult.Record(), expectedRecord1)
  667. require.True(t, queryResult.Next(), queryResult.Err())
  668. require.Nil(t, queryResult.Err())
  669. assert.False(t, queryResult.tableChanged)
  670. require.NotNil(t, queryResult.Record())
  671. require.Equal(t, queryResult.Record(), expectedRecord2)
  672. require.False(t, queryResult.Next())
  673. require.Nil(t, queryResult.Err())
  674. }
  675. func TestMissingDatatypeAnnotation(t *testing.T) {
  676. csvTable1 := `
  677. #group,false,false,true,true,false,true,true,false,false,false
  678. #default,_result,,,,,,,,,
  679. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  680. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
  681. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
  682. `
  683. reader := strings.NewReader(csvTable1)
  684. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  685. require.False(t, queryResult.Next())
  686. require.NotNil(t, queryResult.Err())
  687. assert.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error())
  688. csvTable2 := `
  689. #default,_result,,,,,,,,,
  690. #group,false,false,true,true,false,true,true,false,false,false
  691. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  692. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
  693. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
  694. `
  695. reader = strings.NewReader(csvTable2)
  696. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  697. require.False(t, queryResult.Next())
  698. require.NotNil(t, queryResult.Err())
  699. assert.Equal(t, "parsing error, datatype annotation not found", queryResult.Err().Error())
  700. }
  701. func TestMissingAnnotations(t *testing.T) {
  702. csvTable3 := `
  703. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  704. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z
  705. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:39:36.330153686Z,1467463,BME280,1h20m30.13245s,eHh4eHhjY2NjY2NkZGRkZA==,2020-04-28T00:00:00Z
  706. `
  707. reader := strings.NewReader(csvTable3)
  708. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  709. require.False(t, queryResult.Next())
  710. require.NotNil(t, queryResult.Err())
  711. assert.Equal(t, "parsing error, annotations not found", queryResult.Err().Error())
  712. }
  713. func TestDifferentNumberOfColumns(t *testing.T) {
  714. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339
  715. #group,false,false,true,true,false,true,true,false,false,
  716. #default,_result,,,,,,,,,
  717. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  718. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
  719. `
  720. reader := strings.NewReader(csvTable)
  721. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  722. require.False(t, queryResult.Next())
  723. require.NotNil(t, queryResult.Err())
  724. assert.Equal(t, "parsing error, row has different number of columns than the table: 11 vs 10", queryResult.Err().Error())
  725. csvTable2 := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339
  726. #group,false,false,true,true,false,true,true,false,false,
  727. #default,_result,,,,,,,
  728. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  729. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
  730. `
  731. reader = strings.NewReader(csvTable2)
  732. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  733. require.False(t, queryResult.Next())
  734. require.NotNil(t, queryResult.Err())
  735. assert.Equal(t, "parsing error, row has different number of columns than the table: 8 vs 10", queryResult.Err().Error())
  736. csvTable3 := `#default,_result,,,,,,,
  737. #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,int,string,duration,base64Binary,dateTime:RFC3339
  738. #group,false,false,true,true,false,true,true,false,false,
  739. ,result,table,_start,_stop,_time,deviceId,sensor,elapsed,note,start
  740. ,,0,2020-04-28T12:36:50.990018157Z,2020-04-28T12:51:50.990018157Z,2020-04-28T12:38:11.480545389Z,1467463,BME280,1m1s,ZGF0YWluYmFzZTY0,2020-04-27T00:00:00Z,2345234
  741. `
  742. reader = strings.NewReader(csvTable3)
  743. queryResult = NewQueryTableResult(ioutil.NopCloser(reader))
  744. require.False(t, queryResult.Next())
  745. require.NotNil(t, queryResult.Err())
  746. assert.Equal(t, "parsing error, row has different number of columns than the table: 10 vs 8", queryResult.Err().Error())
  747. }
  748. func TestEmptyValue(t *testing.T) {
  749. csvTable := `#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string,string
  750. #group,false,false,true,true,false,false,true,true,true,true
  751. #default,_result,,,,,,,,,
  752. ,result,table,_start,_stop,_time,_value,_field,_measurement,a,b
  753. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,,f,test,1,adsfasdf
  754. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:08:44.850214724Z,6.6,f,test,,adsfasdf
  755. ,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T22:11:32.225467895Z,1122.45,f,test,3,
  756. `
  757. reader := strings.NewReader(csvTable)
  758. queryResult := NewQueryTableResult(ioutil.NopCloser(reader))
  759. require.True(t, queryResult.Next(), queryResult.Err())
  760. require.Nil(t, queryResult.Err())
  761. require.NotNil(t, queryResult.Record())
  762. assert.Nil(t, queryResult.Record().Value())
  763. require.True(t, queryResult.Next(), queryResult.Err())
  764. require.NotNil(t, queryResult.Record())
  765. assert.Nil(t, queryResult.Record().ValueByKey("a"))
  766. require.True(t, queryResult.Next(), queryResult.Err())
  767. require.NotNil(t, queryResult.Record())
  768. assert.Nil(t, queryResult.Record().ValueByKey("b"))
  769. require.False(t, queryResult.Next())
  770. require.Nil(t, queryResult.Err())
  771. }
  772. func TestFluxError(t *testing.T) {
  773. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  774. <-time.After(100 * time.Millisecond)
  775. if r.Method == http.MethodPost {
  776. _, _ = ioutil.ReadAll(r.Body)
  777. w.Header().Set("Content-Type", "application/json")
  778. w.WriteHeader(http.StatusBadRequest)
  779. _, _ = w.Write([]byte(`{"code":"invalid","message":"compilation failed: loc 4:17-4:86: expected an operator between two expressions"}`))
  780. }
  781. }))
  782. defer server.Close()
  783. queryAPI := NewQueryAPI("org", http2.NewService(server.URL, "a", http2.DefaultOptions()))
  784. result, err := queryAPI.QueryRaw(context.Background(), "errored flux", nil)
  785. assert.Equal(t, "", result)
  786. require.NotNil(t, err)
  787. assert.Equal(t, "invalid: compilation failed: loc 4:17-4:86: expected an operator between two expressions", err.Error())
  788. tableRes, err := queryAPI.Query(context.Background(), "errored flux")
  789. assert.Nil(t, tableRes)
  790. require.NotNil(t, err)
  791. assert.Equal(t, "invalid: compilation failed: loc 4:17-4:86: expected an operator between two expressions", err.Error())
  792. }
  793. func TestQueryParamsTypes(t *testing.T) {
  794. var i int8 = 1
  795. var paramsTypeTests = []struct {
  796. testName string
  797. params interface{}
  798. expectError string
  799. }{
  800. {
  801. "structWillAllSupportedTypes",
  802. struct {
  803. B bool
  804. I int
  805. I8 int8
  806. I16 int16
  807. I32 int32
  808. I64 int64
  809. U uint
  810. U8 uint8
  811. U16 uint16
  812. U32 uint32
  813. U64 uint64
  814. F32 float32
  815. F64 float64
  816. D time.Duration
  817. T time.Time
  818. }{},
  819. "",
  820. },
  821. {
  822. "structWithInvalidFieldEmptyInterface",
  823. struct {
  824. F interface{}
  825. }{},
  826. "cannot use field 'F' of type 'interface {}' as a query param",
  827. },
  828. {
  829. "structWithFieldAsValidInterfaceValue",
  830. struct {
  831. F interface{}
  832. }{"string"},
  833. "",
  834. },
  835. {
  836. "structAsPointer",
  837. &struct {
  838. S string
  839. }{"a"},
  840. "",
  841. },
  842. {
  843. "structWithInvalidFieldAsMap",
  844. struct {
  845. M map[string]string
  846. }{},
  847. "cannot use field 'M' of type 'map[string]string' as a query param",
  848. },
  849. {
  850. "structWithFieldAsPointer",
  851. struct {
  852. P *int8
  853. }{&i},
  854. "",
  855. },
  856. {
  857. "mapOfBool",
  858. map[string]bool{},
  859. "",
  860. },
  861. {
  862. "mapOfFloat64",
  863. map[string]float64{},
  864. "",
  865. },
  866. {
  867. "mapOfString",
  868. map[string]string{},
  869. "",
  870. },
  871. {
  872. "mapOfTime",
  873. map[string]time.Time{},
  874. "",
  875. },
  876. {
  877. "mapOfInterfaceEmpty",
  878. map[string]interface{}{},
  879. "",
  880. },
  881. {
  882. "mapOfInterfaceWithValidValues",
  883. map[string]interface{}{"s": "s", "t": time.Now()},
  884. "",
  885. },
  886. {
  887. "mapOfInterfaceWithStructInvalid",
  888. map[string]interface{}{"s": struct {
  889. a int
  890. }{1}},
  891. "cannot use map value type 'struct { a int }' as a query param",
  892. },
  893. {
  894. "mapOfStructInvalid",
  895. map[string]struct {
  896. a int
  897. }{"a": {1}},
  898. "cannot use map value type 'struct { a int }' as a query param",
  899. },
  900. {
  901. "mapWithInvalidKey",
  902. map[int]string{},
  903. "cannot use map key of type 'int' for query param name",
  904. },
  905. {
  906. "invalidParamsType",
  907. 0,
  908. "cannot use int as query params",
  909. },
  910. }
  911. for _, test := range paramsTypeTests {
  912. t.Run(test.testName, func(t *testing.T) {
  913. err := checkParamsType(test.params)
  914. if test.expectError != "" {
  915. require.Error(t, err)
  916. require.Equal(t, test.expectError, err.Error())
  917. return
  918. }
  919. require.NoError(t, err)
  920. })
  921. }
  922. }
  923. func TestQueryParamsSerialized(t *testing.T) {
  924. expectedBody := `{"dialect":{"annotations":["datatype","group","default"],"delimiter":",","header":true},"query":"from(bucket: \"environment\") |\u003e range(start: time(v: params.start)) |\u003e filter(fn: (r) =\u003e r._measurement == \"air\") |\u003e filter(fn: (r) =\u003e r._field == params.field) |\u003e filter(fn: (r) =\u003e r._value \u003e params.value)","type":"flux","params":{"start":"2022-02-17T11:27:23+01:00","field":"field","value":24.4}}`
  925. server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
  926. if r.Method == http.MethodPost {
  927. body, err := ioutil.ReadAll(r.Body)
  928. if err != nil {
  929. w.Header().Set("Content-Type", "application/json")
  930. w.WriteHeader(http.StatusBadRequest)
  931. _, _ = w.Write([]byte(`{"code":"invalid","message":"` + err.Error() + `"}`))
  932. }
  933. if string(body) != expectedBody {
  934. fmt.Println("Error: Different bodies. Recv vs exp")
  935. fmt.Println(string(body))
  936. fmt.Println(expectedBody)
  937. w.Header().Set("Content-Type", "application/json")
  938. w.WriteHeader(http.StatusInternalServerError)
  939. _, _ = w.Write(body)
  940. } else {
  941. w.Header().Set("Content-Type", "text/csv")
  942. w.WriteHeader(http.StatusOK)
  943. }
  944. }
  945. }))
  946. defer server.Close()
  947. condition := &struct {
  948. Start time.Time `json:"start"`
  949. Field string `json:"field"`
  950. Value float64 `json:"value"`
  951. }{
  952. mustParseTime("2022-02-17T11:27:23+01:00"),
  953. "field",
  954. 24.4,
  955. }
  956. query := `from(bucket: "environment") |> range(start: time(v: params.start)) |> filter(fn: (r) => r._measurement == "air") |> filter(fn: (r) => r._field == params.field) |> filter(fn: (r) => r._value > params.value)`
  957. queryAPI := NewQueryAPI("org", http2.NewService(server.URL, "a", http2.DefaultOptions()))
  958. _, err := queryAPI.QueryRawWithParams(context.Background(), query, DefaultDialect(), condition)
  959. require.NoError(t, err, err)
  960. _, err = queryAPI.QueryWithParams(context.Background(), query, condition)
  961. require.NoError(t, err, err)
  962. }
  963. func makeCSVstring(rows []string) string {
  964. csvTable := strings.Join(rows, "\r\n")
  965. return fmt.Sprintf("%s\r\n", csvTable)
  966. }