gcron_entry.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gcron
  7. import (
  8. "context"
  9. "fmt"
  10. "reflect"
  11. "runtime"
  12. "time"
  13. "github.com/gogf/gf/v2/container/gtype"
  14. "github.com/gogf/gf/v2/errors/gcode"
  15. "github.com/gogf/gf/v2/errors/gerror"
  16. "github.com/gogf/gf/v2/os/glog"
  17. "github.com/gogf/gf/v2/os/gtimer"
  18. "github.com/gogf/gf/v2/util/gconv"
  19. )
  20. // JobFunc is the timing called job function in cron.
  21. type JobFunc = gtimer.JobFunc
  22. // Entry is timing task entry.
  23. type Entry struct {
  24. cron *Cron // Cron object belonged to.
  25. timerEntry *gtimer.Entry // Associated timer Entry.
  26. schedule *cronSchedule // Timed schedule object.
  27. jobName string // Callback function name(address info).
  28. times *gtype.Int // Running times limit.
  29. infinite *gtype.Bool // No times limit.
  30. Name string // Entry name.
  31. Job JobFunc `json:"-"` // Callback function.
  32. Time time.Time // Registered time.
  33. }
  34. type doAddEntryInput struct {
  35. Name string // Name names this entry for manual control.
  36. Job JobFunc // Job is the callback function for timed task execution.
  37. Ctx context.Context // The context for the job.
  38. Times int // Times specifies the running limit times for the entry.
  39. Pattern string // Pattern is the crontab style string for scheduler.
  40. IsSingleton bool // Singleton specifies whether timed task executing in singleton mode.
  41. Infinite bool // Infinite specifies whether this entry is running with no times limit.
  42. }
  43. // doAddEntry creates and returns a new Entry object.
  44. func (c *Cron) doAddEntry(in doAddEntryInput) (*Entry, error) {
  45. if in.Name != "" {
  46. if c.Search(in.Name) != nil {
  47. return nil, gerror.NewCodef(gcode.CodeInvalidOperation, `cron job "%s" already exists`, in.Name)
  48. }
  49. }
  50. schedule, err := newSchedule(in.Pattern)
  51. if err != nil {
  52. return nil, err
  53. }
  54. // No limit for `times`, for timer checking scheduling every second.
  55. entry := &Entry{
  56. cron: c,
  57. schedule: schedule,
  58. jobName: runtime.FuncForPC(reflect.ValueOf(in.Job).Pointer()).Name(),
  59. times: gtype.NewInt(in.Times),
  60. infinite: gtype.NewBool(in.Infinite),
  61. Job: in.Job,
  62. Time: time.Now(),
  63. }
  64. if in.Name != "" {
  65. entry.Name = in.Name
  66. } else {
  67. entry.Name = "cron-" + gconv.String(c.idGen.Add(1))
  68. }
  69. // When you add a scheduled task, you cannot allow it to run.
  70. // It cannot start running when added to timer.
  71. // It should start running after the entry is added to the Cron entries map, to avoid the task
  72. // from running during adding where the entries do not have the entry information, which might cause panic.
  73. entry.timerEntry = gtimer.AddEntry(
  74. in.Ctx,
  75. time.Second,
  76. entry.checkAndRun,
  77. in.IsSingleton,
  78. -1,
  79. gtimer.StatusStopped,
  80. )
  81. c.entries.Set(entry.Name, entry)
  82. entry.timerEntry.Start()
  83. return entry, nil
  84. }
  85. // IsSingleton return whether this entry is a singleton timed task.
  86. func (entry *Entry) IsSingleton() bool {
  87. return entry.timerEntry.IsSingleton()
  88. }
  89. // SetSingleton sets the entry running in singleton mode.
  90. func (entry *Entry) SetSingleton(enabled bool) {
  91. entry.timerEntry.SetSingleton(enabled)
  92. }
  93. // SetTimes sets the times which the entry can run.
  94. func (entry *Entry) SetTimes(times int) {
  95. entry.times.Set(times)
  96. entry.infinite.Set(false)
  97. }
  98. // Status returns the status of entry.
  99. func (entry *Entry) Status() int {
  100. return entry.timerEntry.Status()
  101. }
  102. // SetStatus sets the status of the entry.
  103. func (entry *Entry) SetStatus(status int) int {
  104. return entry.timerEntry.SetStatus(status)
  105. }
  106. // Start starts running the entry.
  107. func (entry *Entry) Start() {
  108. entry.timerEntry.Start()
  109. }
  110. // Stop stops running the entry.
  111. func (entry *Entry) Stop() {
  112. entry.timerEntry.Stop()
  113. }
  114. // Close stops and removes the entry from cron.
  115. func (entry *Entry) Close() {
  116. entry.cron.entries.Remove(entry.Name)
  117. entry.timerEntry.Close()
  118. }
  119. // checkAndRun is the core timing task check logic.
  120. func (entry *Entry) checkAndRun(ctx context.Context) {
  121. currentTime := time.Now()
  122. if !entry.schedule.checkMeetAndUpdateLastSeconds(ctx, currentTime) {
  123. return
  124. }
  125. switch entry.cron.status.Val() {
  126. case StatusStopped:
  127. return
  128. case StatusClosed:
  129. entry.logDebugf(ctx, `cron job "%s" is removed`, entry.getJobNameWithPattern())
  130. entry.Close()
  131. case StatusReady, StatusRunning:
  132. defer func() {
  133. if exception := recover(); exception != nil {
  134. // Exception caught, it logs the error content to logger in default behavior.
  135. entry.logErrorf(ctx,
  136. `cron job "%s(%s)" end with error: %+v`,
  137. entry.jobName, entry.schedule.pattern, exception,
  138. )
  139. } else {
  140. entry.logDebugf(ctx, `cron job "%s" ends`, entry.getJobNameWithPattern())
  141. }
  142. if entry.timerEntry.Status() == StatusClosed {
  143. entry.Close()
  144. }
  145. }()
  146. // Running times check.
  147. if !entry.infinite.Val() {
  148. times := entry.times.Add(-1)
  149. if times <= 0 {
  150. if entry.timerEntry.SetStatus(StatusClosed) == StatusClosed || times < 0 {
  151. return
  152. }
  153. }
  154. }
  155. entry.logDebugf(ctx, `cron job "%s" starts`, entry.getJobNameWithPattern())
  156. entry.Job(ctx)
  157. }
  158. }
  159. func (entry *Entry) getJobNameWithPattern() string {
  160. return fmt.Sprintf(`%s(%s)`, entry.jobName, entry.schedule.pattern)
  161. }
  162. func (entry *Entry) logDebugf(ctx context.Context, format string, v ...interface{}) {
  163. if logger := entry.cron.GetLogger(); logger != nil {
  164. logger.Debugf(ctx, format, v...)
  165. }
  166. }
  167. func (entry *Entry) logErrorf(ctx context.Context, format string, v ...interface{}) {
  168. logger := entry.cron.GetLogger()
  169. if logger == nil {
  170. logger = glog.DefaultLogger()
  171. }
  172. logger.Errorf(ctx, format, v...)
  173. }