client.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. // Copyright 2020-2021 InfluxData, Inc. All rights reserved.
  2. // Use of this source code is governed by MIT
  3. // license that can be found in the LICENSE file.
  4. // Package influxdb2 provides API for using InfluxDB client in Go.
  5. // It's intended to use with InfluxDB 2 server. WriteAPI, QueryAPI and Health work also with InfluxDB 1.8
  6. package influxdb2
  7. import (
  8. "context"
  9. "errors"
  10. httpnet "net/http"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/influxdata/influxdb-client-go/v2/api"
  15. "github.com/influxdata/influxdb-client-go/v2/api/http"
  16. "github.com/influxdata/influxdb-client-go/v2/domain"
  17. ilog "github.com/influxdata/influxdb-client-go/v2/internal/log"
  18. "github.com/influxdata/influxdb-client-go/v2/log"
  19. )
  20. // Client provides API to communicate with InfluxDBServer.
  21. // There two APIs for writing, WriteAPI and WriteAPIBlocking.
  22. // WriteAPI provides asynchronous, non-blocking, methods for writing time series data.
  23. // WriteAPIBlocking provides blocking methods for writing time series data.
  24. type Client interface {
  25. // Setup sends request to initialise new InfluxDB server with user, org and bucket, and data retention period
  26. // and returns details about newly created entities along with the authorization object.
  27. // Retention period of zero will result to infinite retention.
  28. Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error)
  29. // SetupWithToken sends request to initialise new InfluxDB server with user, org and bucket, data retention period and token
  30. // and returns details about newly created entities along with the authorization object.
  31. // Retention period of zero will result to infinite retention.
  32. SetupWithToken(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int, token string) (*domain.OnboardingResponse, error)
  33. // Ready returns InfluxDB uptime info of server. It doesn't validate authentication params.
  34. Ready(ctx context.Context) (*domain.Ready, error)
  35. // Health returns an InfluxDB server health check result. Read the HealthCheck.Status field to get server status.
  36. // Health doesn't validate authentication params.
  37. Health(ctx context.Context) (*domain.HealthCheck, error)
  38. // Ping validates whether InfluxDB server is running. It doesn't validate authentication params.
  39. Ping(ctx context.Context) (bool, error)
  40. // Close ensures all ongoing asynchronous write clients finish.
  41. // Also closes all idle connections, in case of HTTP client was created internally.
  42. Close()
  43. // Options returns the options associated with client
  44. Options() *Options
  45. // ServerURL returns the url of the server url client talks to
  46. ServerURL() string
  47. // HTTPService returns underlying HTTP service object used by client
  48. HTTPService() http.Service
  49. // WriteAPI returns the asynchronous, non-blocking, Write client.
  50. // Ensures using a single WriteAPI instance for each org/bucket pair.
  51. WriteAPI(org, bucket string) api.WriteAPI
  52. // WriteAPIBlocking returns the synchronous, blocking, Write client.
  53. // Ensures using a single WriteAPIBlocking instance for each org/bucket pair.
  54. WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking
  55. // QueryAPI returns Query client.
  56. // Ensures using a single QueryAPI instance each org.
  57. QueryAPI(org string) api.QueryAPI
  58. // AuthorizationsAPI returns Authorizations API client.
  59. AuthorizationsAPI() api.AuthorizationsAPI
  60. // OrganizationsAPI returns Organizations API client
  61. OrganizationsAPI() api.OrganizationsAPI
  62. // UsersAPI returns Users API client.
  63. UsersAPI() api.UsersAPI
  64. // DeleteAPI returns Delete API client
  65. DeleteAPI() api.DeleteAPI
  66. // BucketsAPI returns Buckets API client
  67. BucketsAPI() api.BucketsAPI
  68. // LabelsAPI returns Labels API client
  69. LabelsAPI() api.LabelsAPI
  70. // TasksAPI returns Tasks API client
  71. TasksAPI() api.TasksAPI
  72. APIClient() *domain.Client
  73. }
  74. // clientImpl implements Client interface
  75. type clientImpl struct {
  76. serverURL string
  77. options *Options
  78. writeAPIs map[string]api.WriteAPI
  79. syncWriteAPIs map[string]api.WriteAPIBlocking
  80. lock sync.Mutex
  81. httpService http.Service
  82. apiClient *domain.Client
  83. authAPI api.AuthorizationsAPI
  84. orgAPI api.OrganizationsAPI
  85. usersAPI api.UsersAPI
  86. deleteAPI api.DeleteAPI
  87. bucketsAPI api.BucketsAPI
  88. labelsAPI api.LabelsAPI
  89. tasksAPI api.TasksAPI
  90. }
  91. type clientDoer struct {
  92. service http.Service
  93. }
  94. // NewClient creates Client for connecting to given serverURL with provided authentication token, with the default options.
  95. // serverURL is the InfluxDB server base URL, e.g. http://localhost:8086,
  96. // authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet.
  97. // In such case, calling Setup() will set the authentication token.
  98. func NewClient(serverURL string, authToken string) Client {
  99. return NewClientWithOptions(serverURL, authToken, DefaultOptions())
  100. }
  101. // NewClientWithOptions creates Client for connecting to given serverURL with provided authentication token
  102. // and configured with custom Options.
  103. // serverURL is the InfluxDB server base URL, e.g. http://localhost:8086,
  104. // authToken is an authentication token. It can be empty in case of connecting to newly installed InfluxDB server, which has not been set up yet.
  105. // In such case, calling Setup() will set authentication token
  106. func NewClientWithOptions(serverURL string, authToken string, options *Options) Client {
  107. normServerURL := serverURL
  108. if !strings.HasSuffix(normServerURL, "/") {
  109. // For subsequent path parts concatenation, url has to end with '/'
  110. normServerURL = serverURL + "/"
  111. }
  112. authorization := ""
  113. if len(authToken) > 0 {
  114. authorization = "Token " + authToken
  115. }
  116. service := http.NewService(normServerURL, authorization, options.httpOptions)
  117. doer := &clientDoer{service}
  118. apiClient, _ := domain.NewClient(service.ServerURL(), doer)
  119. client := &clientImpl{
  120. serverURL: serverURL,
  121. options: options,
  122. writeAPIs: make(map[string]api.WriteAPI, 5),
  123. syncWriteAPIs: make(map[string]api.WriteAPIBlocking, 5),
  124. httpService: service,
  125. apiClient: apiClient,
  126. }
  127. if log.Log != nil {
  128. log.Log.SetLogLevel(options.LogLevel())
  129. }
  130. if ilog.Level() >= log.InfoLevel {
  131. tokenStr := ""
  132. if len(authToken) > 0 {
  133. tokenStr = ", token '******'"
  134. }
  135. ilog.Infof("Using URL '%s'%s", serverURL, tokenStr)
  136. }
  137. if options.ApplicationName() == "" {
  138. ilog.Warn("Application name is not set")
  139. }
  140. return client
  141. }
  142. func (c *clientImpl) APIClient() *domain.Client {
  143. return c.apiClient
  144. }
  145. func (c *clientImpl) Options() *Options {
  146. return c.options
  147. }
  148. func (c *clientImpl) ServerURL() string {
  149. return c.serverURL
  150. }
  151. func (c *clientImpl) HTTPService() http.Service {
  152. return c.httpService
  153. }
  154. func (c *clientDoer) Do(req *httpnet.Request) (*httpnet.Response, error) {
  155. return c.service.DoHTTPRequestWithResponse(req, nil)
  156. }
  157. func (c *clientImpl) Ready(ctx context.Context) (*domain.Ready, error) {
  158. params := &domain.GetReadyParams{}
  159. return c.apiClient.GetReady(ctx, params)
  160. }
  161. func (c *clientImpl) Setup(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int) (*domain.OnboardingResponse, error) {
  162. return c.SetupWithToken(ctx, username, password, org, bucket, retentionPeriodHours, "")
  163. }
  164. func (c *clientImpl) SetupWithToken(ctx context.Context, username, password, org, bucket string, retentionPeriodHours int, token string) (*domain.OnboardingResponse, error) {
  165. if username == "" || password == "" {
  166. return nil, errors.New("a username and a password is required for a setup")
  167. }
  168. c.lock.Lock()
  169. defer c.lock.Unlock()
  170. params := &domain.PostSetupAllParams{}
  171. retentionPeriodSeconds := int64(retentionPeriodHours * 3600)
  172. retentionPeriodHrs := int(time.Duration(retentionPeriodSeconds) * time.Second)
  173. params.Body = domain.PostSetupJSONRequestBody{
  174. Bucket: bucket,
  175. Org: org,
  176. Password: &password,
  177. RetentionPeriodSeconds: &retentionPeriodSeconds,
  178. RetentionPeriodHrs: &retentionPeriodHrs,
  179. Username: username,
  180. }
  181. if token != "" {
  182. params.Body.Token = &token
  183. }
  184. return c.apiClient.PostSetup(ctx, params)
  185. }
  186. func (c *clientImpl) Health(ctx context.Context) (*domain.HealthCheck, error) {
  187. params := &domain.GetHealthParams{}
  188. return c.apiClient.GetHealth(ctx, params)
  189. }
  190. func (c *clientImpl) Ping(ctx context.Context) (bool, error) {
  191. err := c.apiClient.GetPing(ctx)
  192. if err != nil {
  193. return false, err
  194. }
  195. return true, nil
  196. }
  197. func createKey(org, bucket string) string {
  198. return org + "\t" + bucket
  199. }
  200. func (c *clientImpl) WriteAPI(org, bucket string) api.WriteAPI {
  201. c.lock.Lock()
  202. defer c.lock.Unlock()
  203. key := createKey(org, bucket)
  204. if _, ok := c.writeAPIs[key]; !ok {
  205. w := api.NewWriteAPI(org, bucket, c.httpService, c.options.writeOptions)
  206. c.writeAPIs[key] = w
  207. }
  208. return c.writeAPIs[key]
  209. }
  210. func (c *clientImpl) WriteAPIBlocking(org, bucket string) api.WriteAPIBlocking {
  211. c.lock.Lock()
  212. defer c.lock.Unlock()
  213. key := createKey(org, bucket)
  214. if _, ok := c.syncWriteAPIs[key]; !ok {
  215. w := api.NewWriteAPIBlocking(org, bucket, c.httpService, c.options.writeOptions)
  216. c.syncWriteAPIs[key] = w
  217. }
  218. return c.syncWriteAPIs[key]
  219. }
  220. func (c *clientImpl) Close() {
  221. for key, w := range c.writeAPIs {
  222. wa := w.(*api.WriteAPIImpl)
  223. wa.Close()
  224. delete(c.writeAPIs, key)
  225. }
  226. for key := range c.syncWriteAPIs {
  227. delete(c.syncWriteAPIs, key)
  228. }
  229. if c.options.HTTPOptions().OwnHTTPClient() {
  230. c.options.HTTPOptions().HTTPClient().CloseIdleConnections()
  231. }
  232. }
  233. func (c *clientImpl) QueryAPI(org string) api.QueryAPI {
  234. return api.NewQueryAPI(org, c.httpService)
  235. }
  236. func (c *clientImpl) AuthorizationsAPI() api.AuthorizationsAPI {
  237. c.lock.Lock()
  238. defer c.lock.Unlock()
  239. if c.authAPI == nil {
  240. c.authAPI = api.NewAuthorizationsAPI(c.apiClient)
  241. }
  242. return c.authAPI
  243. }
  244. func (c *clientImpl) OrganizationsAPI() api.OrganizationsAPI {
  245. c.lock.Lock()
  246. defer c.lock.Unlock()
  247. if c.orgAPI == nil {
  248. c.orgAPI = api.NewOrganizationsAPI(c.apiClient)
  249. }
  250. return c.orgAPI
  251. }
  252. func (c *clientImpl) UsersAPI() api.UsersAPI {
  253. c.lock.Lock()
  254. defer c.lock.Unlock()
  255. if c.usersAPI == nil {
  256. c.usersAPI = api.NewUsersAPI(c.apiClient, c.httpService, c.options.HTTPClient())
  257. }
  258. return c.usersAPI
  259. }
  260. func (c *clientImpl) DeleteAPI() api.DeleteAPI {
  261. c.lock.Lock()
  262. defer c.lock.Unlock()
  263. if c.deleteAPI == nil {
  264. c.deleteAPI = api.NewDeleteAPI(c.apiClient)
  265. }
  266. return c.deleteAPI
  267. }
  268. func (c *clientImpl) BucketsAPI() api.BucketsAPI {
  269. c.lock.Lock()
  270. defer c.lock.Unlock()
  271. if c.bucketsAPI == nil {
  272. c.bucketsAPI = api.NewBucketsAPI(c.apiClient)
  273. }
  274. return c.bucketsAPI
  275. }
  276. func (c *clientImpl) LabelsAPI() api.LabelsAPI {
  277. c.lock.Lock()
  278. defer c.lock.Unlock()
  279. if c.labelsAPI == nil {
  280. c.labelsAPI = api.NewLabelsAPI(c.apiClient)
  281. }
  282. return c.labelsAPI
  283. }
  284. func (c *clientImpl) TasksAPI() api.TasksAPI {
  285. c.lock.Lock()
  286. defer c.lock.Unlock()
  287. if c.tasksAPI == nil {
  288. c.tasksAPI = api.NewTasksAPI(c.apiClient)
  289. }
  290. return c.tasksAPI
  291. }