| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511 | // Copyright 2020-2021 InfluxData, Inc. All rights reserved.// Use of this source code is governed by MIT// license that can be found in the LICENSE file.package apiimport (	"context"	"fmt"	"time"	"github.com/influxdata/influxdb-client-go/v2/domain")// TaskFilter defines filtering options for FindTasks functions.type TaskFilter struct {	// Returns task with a specific name	Name string	// Filter tasks to a specific organization name.	OrgName string	// Filter tasks to a specific organization ID.	OrgID string	// Filter tasks to a specific user ID.	User string	// Filter tasks by a status--"inactive" or "active".	Status domain.TaskStatusType	// Return tasks after a specified ID.	After string	// The number of tasks to return.	// Default 100, minimum: 1, maximum 500	Limit int}// RunFilter defines filtering options for FindRun* functions.type RunFilter struct {	// Return runs after a specified ID.	After string	// The number of runs to return.	// Default 100, minimum 1, maximum 500.	Limit int	// Filter runs to those scheduled before this time.	BeforeTime time.Time	// Filter runs to those scheduled after this time.	AfterTime time.Time}// TasksAPI provides methods for managing tasks and task runs in an InfluxDB server.type TasksAPI interface {	// FindTasks retrieves tasks according to the filter. More fields can be applied. Filter can be nil.	FindTasks(ctx context.Context, filter *TaskFilter) ([]domain.Task, error)	// GetTask retrieves a refreshed instance of task.	GetTask(ctx context.Context, task *domain.Task) (*domain.Task, error)	// GetTaskByID retrieves a task found using taskID.	GetTaskByID(ctx context.Context, taskID string) (*domain.Task, error)	// CreateTask creates a new task according the task object.	// It copies OrgId, Name, Description, Flux, Status and Every or Cron properties. Every and Cron are mutually exclusive.	// Every has higher priority.	CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error)	// CreateTaskWithEvery creates a new task with the name, flux script and every repetition setting, in the org orgID.	// Every means duration values.	CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error)	// CreateTaskWithCron creates a new task with the name, flux script and cron repetition setting, in the org orgID	// Cron holds cron-like setting, e.g. once an hour at beginning of the hour "0 * * * *".	CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error)	// CreateTaskByFlux creates a new task with complete definition in flux script, in the org orgID	CreateTaskByFlux(ctx context.Context, flux, orgID string) (*domain.Task, error)	// UpdateTask updates a task.	// It copies Description, Flux, Status, Offset and Every or Cron properties. Every and Cron are mutually exclusive.	// Every has higher priority.	UpdateTask(ctx context.Context, task *domain.Task) (*domain.Task, error)	// DeleteTask deletes a task.	DeleteTask(ctx context.Context, task *domain.Task) error	// DeleteTaskWithID deletes a task with taskID.	DeleteTaskWithID(ctx context.Context, taskID string) error	// FindMembers retrieves members of a task.	FindMembers(ctx context.Context, task *domain.Task) ([]domain.ResourceMember, error)	// FindMembersWithID retrieves members of a task with taskID.	FindMembersWithID(ctx context.Context, taskID string) ([]domain.ResourceMember, error)	// AddMember adds a member to a task.	AddMember(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceMember, error)	// AddMemberWithID adds a member with id memberID to a task with taskID.	AddMemberWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceMember, error)	// RemoveMember removes a member from a task.	RemoveMember(ctx context.Context, task *domain.Task, user *domain.User) error	// RemoveMemberWithID removes a member with id memberID from a task with taskID.	RemoveMemberWithID(ctx context.Context, taskID, memberID string) error	// FindOwners retrieves owners of a task.	FindOwners(ctx context.Context, task *domain.Task) ([]domain.ResourceOwner, error)	// FindOwnersWithID retrieves owners of a task with taskID.	FindOwnersWithID(ctx context.Context, taskID string) ([]domain.ResourceOwner, error)	// AddOwner adds an owner to a task.	AddOwner(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceOwner, error)	// AddOwnerWithID adds an owner with id memberID to a task with taskID.	AddOwnerWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceOwner, error)	// RemoveOwner removes an owner from a task.	RemoveOwner(ctx context.Context, task *domain.Task, user *domain.User) error	// RemoveOwnerWithID removes a member with id memberID from a task with taskID.	RemoveOwnerWithID(ctx context.Context, taskID, memberID string) error	// FindRuns retrieves a task runs according the filter. More fields can be applied. Filter can be nil.	FindRuns(ctx context.Context, task *domain.Task, filter *RunFilter) ([]domain.Run, error)	// FindRunsWithID retrieves runs of a task with taskID according the filter. More fields can be applied. Filter can be nil.	FindRunsWithID(ctx context.Context, taskID string, filter *RunFilter) ([]domain.Run, error)	// GetRun retrieves a refreshed instance if a task run.	GetRun(ctx context.Context, run *domain.Run) (*domain.Run, error)	// GetRunByID retrieves a specific task run by taskID and runID	GetRunByID(ctx context.Context, taskID, runID string) (*domain.Run, error)	// FindRunLogs return all log events for a task run.	FindRunLogs(ctx context.Context, run *domain.Run) ([]domain.LogEvent, error)	// FindRunLogsWithID return all log events for a run with runID of a task with taskID.	FindRunLogsWithID(ctx context.Context, taskID, runID string) ([]domain.LogEvent, error)	// RunManually manually start a run of the task now, overriding the current schedule.	RunManually(ctx context.Context, task *domain.Task) (*domain.Run, error)	// RunManuallyWithID manually start a run of a task with taskID now, overriding the current schedule.	RunManuallyWithID(ctx context.Context, taskID string) (*domain.Run, error)	// RetryRun retry a task run.	RetryRun(ctx context.Context, run *domain.Run) (*domain.Run, error)	// RetryRunWithID retry a run with runID of a task with taskID.	RetryRunWithID(ctx context.Context, taskID, runID string) (*domain.Run, error)	// CancelRun cancels a running task.	CancelRun(ctx context.Context, run *domain.Run) error	// CancelRunWithID cancels a running task.	CancelRunWithID(ctx context.Context, taskID, runID string) error	// FindLogs retrieves all logs for a task.	FindLogs(ctx context.Context, task *domain.Task) ([]domain.LogEvent, error)	// FindLogsWithID retrieves all logs for a task with taskID.	FindLogsWithID(ctx context.Context, taskID string) ([]domain.LogEvent, error)	// FindLabels retrieves labels of a task.	FindLabels(ctx context.Context, task *domain.Task) ([]domain.Label, error)	// FindLabelsWithID retrieves labels of a task with taskID.	FindLabelsWithID(ctx context.Context, taskID string) ([]domain.Label, error)	// AddLabel adds a label to a task.	AddLabel(ctx context.Context, task *domain.Task, label *domain.Label) (*domain.Label, error)	// AddLabelWithID adds a label with id labelID to a task with taskID.	AddLabelWithID(ctx context.Context, taskID, labelID string) (*domain.Label, error)	// RemoveLabel removes a label from a task.	RemoveLabel(ctx context.Context, task *domain.Task, label *domain.Label) error	// RemoveLabelWithID removes a label with id labelID from a task with taskID.	RemoveLabelWithID(ctx context.Context, taskID, labelID string) error}// tasksAPI implements TasksAPItype tasksAPI struct {	apiClient *domain.Client}// NewTasksAPI creates new instance of TasksAPIfunc NewTasksAPI(apiClient *domain.Client) TasksAPI {	return &tasksAPI{		apiClient: apiClient,	}}func (t *tasksAPI) FindTasks(ctx context.Context, filter *TaskFilter) ([]domain.Task, error) {	params := &domain.GetTasksParams{}	if filter != nil {		if filter.Name != "" {			params.Name = &filter.Name		}		if filter.User != "" {			params.User = &filter.User		}		if filter.OrgID != "" {			params.OrgID = &filter.OrgID		}		if filter.OrgName != "" {			params.Org = &filter.OrgName		}		if filter.Status != "" {			status := domain.GetTasksParamsStatus(filter.Status)			params.Status = &status		}		if filter.Limit > 0 {			params.Limit = &filter.Limit		}		if filter.After != "" {			params.After = &filter.After		}	}	response, err := t.apiClient.GetTasks(ctx, params)	if err != nil {		return nil, err	}	return *response.Tasks, nil}func (t *tasksAPI) GetTask(ctx context.Context, task *domain.Task) (*domain.Task, error) {	return t.GetTaskByID(ctx, task.Id)}func (t *tasksAPI) GetTaskByID(ctx context.Context, taskID string) (*domain.Task, error) {	params := &domain.GetTasksIDAllParams{		TaskID: taskID,	}	return t.apiClient.GetTasksID(ctx, params)}func (t *tasksAPI) createTask(ctx context.Context, taskReq *domain.TaskCreateRequest) (*domain.Task, error) {	params := &domain.PostTasksAllParams{		Body: domain.PostTasksJSONRequestBody(*taskReq),	}	return t.apiClient.PostTasks(ctx, params)}func createTaskReqDetailed(name, flux string, every, cron *string, orgID string) *domain.TaskCreateRequest {	repetition := ""	if every != nil {		repetition = fmt.Sprintf("every: %s", *every)	} else if cron != nil {		repetition = fmt.Sprintf(`cron: "%s"`, *cron)	}	fullFlux := fmt.Sprintf(`option task = { name: "%s", %s } %s`, name, repetition, flux)	return createTaskReq(fullFlux, orgID)}func createTaskReq(flux string, orgID string) *domain.TaskCreateRequest {	status := domain.TaskStatusTypeActive	taskReq := &domain.TaskCreateRequest{		Flux:   flux,		Status: &status,		OrgID:  &orgID,	}	return taskReq}func (t *tasksAPI) CreateTask(ctx context.Context, task *domain.Task) (*domain.Task, error) {	taskReq := createTaskReqDetailed(task.Name, task.Flux, task.Every, task.Cron, task.OrgID)	taskReq.Description = task.Description	taskReq.Status = task.Status	return t.createTask(ctx, taskReq)}func (t *tasksAPI) CreateTaskWithEvery(ctx context.Context, name, flux, every, orgID string) (*domain.Task, error) {	taskReq := createTaskReqDetailed(name, flux, &every, nil, orgID)	return t.createTask(ctx, taskReq)}func (t *tasksAPI) CreateTaskWithCron(ctx context.Context, name, flux, cron, orgID string) (*domain.Task, error) {	taskReq := createTaskReqDetailed(name, flux, nil, &cron, orgID)	return t.createTask(ctx, taskReq)}func (t *tasksAPI) CreateTaskByFlux(ctx context.Context, flux, orgID string) (*domain.Task, error) {	taskReq := createTaskReq(flux, orgID)	return t.createTask(ctx, taskReq)}func (t *tasksAPI) DeleteTask(ctx context.Context, task *domain.Task) error {	return t.DeleteTaskWithID(ctx, task.Id)}func (t *tasksAPI) DeleteTaskWithID(ctx context.Context, taskID string) error {	params := &domain.DeleteTasksIDAllParams{		TaskID: taskID,	}	return t.apiClient.DeleteTasksID(ctx, params)}func (t *tasksAPI) UpdateTask(ctx context.Context, task *domain.Task) (*domain.Task, error) {	params := &domain.PatchTasksIDAllParams{		Body: domain.PatchTasksIDJSONRequestBody(domain.TaskUpdateRequest{			Description: task.Description,			Flux:        &task.Flux,			Name:        &task.Name,			Offset:      task.Offset,			Status:      task.Status,		}),		TaskID: task.Id,	}	if task.Every != nil {		params.Body.Every = task.Every	} else {		params.Body.Cron = task.Cron	}	return t.apiClient.PatchTasksID(ctx, params)}func (t *tasksAPI) FindMembers(ctx context.Context, task *domain.Task) ([]domain.ResourceMember, error) {	return t.FindMembersWithID(ctx, task.Id)}func (t *tasksAPI) FindMembersWithID(ctx context.Context, taskID string) ([]domain.ResourceMember, error) {	params := &domain.GetTasksIDMembersAllParams{		TaskID: taskID,	}	response, err := t.apiClient.GetTasksIDMembers(ctx, params)	if err != nil {		return nil, err	}	return *response.Users, nil}func (t *tasksAPI) AddMember(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceMember, error) {	return t.AddMemberWithID(ctx, task.Id, *user.Id)}func (t *tasksAPI) AddMemberWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceMember, error) {	params := &domain.PostTasksIDMembersAllParams{		TaskID: taskID,		Body:   domain.PostTasksIDMembersJSONRequestBody{Id: memberID},	}	return t.apiClient.PostTasksIDMembers(ctx, params)}func (t *tasksAPI) RemoveMember(ctx context.Context, task *domain.Task, user *domain.User) error {	return t.RemoveMemberWithID(ctx, task.Id, *user.Id)}func (t *tasksAPI) RemoveMemberWithID(ctx context.Context, taskID, memberID string) error {	params := &domain.DeleteTasksIDMembersIDAllParams{		TaskID: taskID,		UserID: memberID,	}	return t.apiClient.DeleteTasksIDMembersID(ctx, params)}func (t *tasksAPI) FindOwners(ctx context.Context, task *domain.Task) ([]domain.ResourceOwner, error) {	return t.FindOwnersWithID(ctx, task.Id)}func (t *tasksAPI) FindOwnersWithID(ctx context.Context, taskID string) ([]domain.ResourceOwner, error) {	params := &domain.GetTasksIDOwnersAllParams{		TaskID: taskID,	}	response, err := t.apiClient.GetTasksIDOwners(ctx, params)	if err != nil {		return nil, err	}	return *response.Users, nil}func (t *tasksAPI) AddOwner(ctx context.Context, task *domain.Task, user *domain.User) (*domain.ResourceOwner, error) {	return t.AddOwnerWithID(ctx, task.Id, *user.Id)}func (t *tasksAPI) AddOwnerWithID(ctx context.Context, taskID, memberID string) (*domain.ResourceOwner, error) {	params := &domain.PostTasksIDOwnersAllParams{		Body:   domain.PostTasksIDOwnersJSONRequestBody{Id: memberID},		TaskID: taskID,	}	return t.apiClient.PostTasksIDOwners(ctx, params)}func (t *tasksAPI) RemoveOwner(ctx context.Context, task *domain.Task, user *domain.User) error {	return t.RemoveOwnerWithID(ctx, task.Id, *user.Id)}func (t *tasksAPI) RemoveOwnerWithID(ctx context.Context, taskID, memberID string) error {	params := &domain.DeleteTasksIDOwnersIDAllParams{		TaskID: taskID,		UserID: memberID,	}	return t.apiClient.DeleteTasksIDOwnersID(ctx, params)}func (t *tasksAPI) FindRuns(ctx context.Context, task *domain.Task, filter *RunFilter) ([]domain.Run, error) {	return t.FindRunsWithID(ctx, task.Id, filter)}func (t *tasksAPI) FindRunsWithID(ctx context.Context, taskID string, filter *RunFilter) ([]domain.Run, error) {	params := &domain.GetTasksIDRunsAllParams{TaskID: taskID}	if filter != nil {		if !filter.AfterTime.IsZero() {			params.AfterTime = &filter.AfterTime		}		if !filter.BeforeTime.IsZero() {			params.BeforeTime = &filter.BeforeTime		}		if filter.Limit > 0 {			params.Limit = &filter.Limit		}		if filter.After != "" {			params.After = &filter.After		}	}	response, err := t.apiClient.GetTasksIDRuns(ctx, params)	if err != nil {		return nil, err	}	return *response.Runs, nil}func (t *tasksAPI) GetRun(ctx context.Context, run *domain.Run) (*domain.Run, error) {	return t.GetRunByID(ctx, *run.TaskID, *run.Id)}func (t *tasksAPI) GetRunByID(ctx context.Context, taskID, runID string) (*domain.Run, error) {	params := &domain.GetTasksIDRunsIDAllParams{		TaskID: taskID,		RunID:  runID,	}	return t.apiClient.GetTasksIDRunsID(ctx, params)}func (t *tasksAPI) FindRunLogs(ctx context.Context, run *domain.Run) ([]domain.LogEvent, error) {	return t.FindRunLogsWithID(ctx, *run.TaskID, *run.Id)}func (t *tasksAPI) FindRunLogsWithID(ctx context.Context, taskID, runID string) ([]domain.LogEvent, error) {	params := &domain.GetTasksIDRunsIDLogsAllParams{		TaskID: taskID,		RunID:  runID,	}	response, err := t.apiClient.GetTasksIDRunsIDLogs(ctx, params)	if err != nil {		return nil, err	}	if response.Events == nil {		return nil, fmt.Errorf("logs for task '%s' run '%s 'not found", taskID, runID)	}	return *response.Events, nil}func (t *tasksAPI) RunManually(ctx context.Context, task *domain.Task) (*domain.Run, error) {	return t.RunManuallyWithID(ctx, task.Id)}func (t *tasksAPI) RunManuallyWithID(ctx context.Context, taskID string) (*domain.Run, error) {	params := &domain.PostTasksIDRunsAllParams{		TaskID: taskID,	}	return t.apiClient.PostTasksIDRuns(ctx, params)}func (t *tasksAPI) RetryRun(ctx context.Context, run *domain.Run) (*domain.Run, error) {	return t.RetryRunWithID(ctx, *run.TaskID, *run.Id)}func (t *tasksAPI) RetryRunWithID(ctx context.Context, taskID, runID string) (*domain.Run, error) {	params := &domain.PostTasksIDRunsIDRetryAllParams{		TaskID: taskID,		RunID:  runID,	}	return t.apiClient.PostTasksIDRunsIDRetry(ctx, params)}func (t *tasksAPI) CancelRun(ctx context.Context, run *domain.Run) error {	return t.CancelRunWithID(ctx, *run.TaskID, *run.Id)}func (t *tasksAPI) CancelRunWithID(ctx context.Context, taskID, runID string) error {	params := &domain.DeleteTasksIDRunsIDAllParams{		TaskID: taskID,		RunID:  runID,	}	return t.apiClient.DeleteTasksIDRunsID(ctx, params)}func (t *tasksAPI) FindLogs(ctx context.Context, task *domain.Task) ([]domain.LogEvent, error) {	return t.FindLogsWithID(ctx, task.Id)}func (t *tasksAPI) FindLogsWithID(ctx context.Context, taskID string) ([]domain.LogEvent, error) {	params := &domain.GetTasksIDLogsAllParams{		TaskID: taskID,	}	response, err := t.apiClient.GetTasksIDLogs(ctx, params)	if err != nil {		return nil, err	}	if response.Events == nil {		return nil, fmt.Errorf("logs for task '%s' not found", taskID)	}	return *response.Events, nil}func (t *tasksAPI) FindLabels(ctx context.Context, task *domain.Task) ([]domain.Label, error) {	return t.FindLabelsWithID(ctx, task.Id)}func (t *tasksAPI) FindLabelsWithID(ctx context.Context, taskID string) ([]domain.Label, error) {	params := &domain.GetTasksIDLabelsAllParams{		TaskID: taskID,	}	response, err := t.apiClient.GetTasksIDLabels(ctx, params)	if err != nil {		return nil, err	}	if response.Labels == nil {		return nil, fmt.Errorf("lables for task '%s' not found", taskID)	}	return *response.Labels, nil}func (t *tasksAPI) AddLabel(ctx context.Context, task *domain.Task, label *domain.Label) (*domain.Label, error) {	return t.AddLabelWithID(ctx, task.Id, *label.Id)}func (t *tasksAPI) AddLabelWithID(ctx context.Context, taskID, labelID string) (*domain.Label, error) {	params := &domain.PostTasksIDLabelsAllParams{		Body:   domain.PostTasksIDLabelsJSONRequestBody{LabelID: &labelID},		TaskID: taskID,	}	response, err := t.apiClient.PostTasksIDLabels(ctx, params)	if err != nil {		return nil, err	}	return response.Label, nil}func (t *tasksAPI) RemoveLabel(ctx context.Context, task *domain.Task, label *domain.Label) error {	return t.RemoveLabelWithID(ctx, task.Id, *label.Id)}func (t *tasksAPI) RemoveLabelWithID(ctx context.Context, taskID, labelID string) error {	params := &domain.DeleteTasksIDLabelsIDAllParams{		TaskID:  taskID,		LabelID: labelID,	}	return t.apiClient.DeleteTasksIDLabelsID(ctx, params)}
 |