gtimer_entry.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. // Copyright GoFrame Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtimer
  7. import (
  8. "time"
  9. "github.com/gogf/gf/container/gtype"
  10. )
  11. // Entry is the timing job entry to wheel.
  12. type Entry struct {
  13. wheel *wheel // Belonged wheel.
  14. job JobFunc // The job function.
  15. singleton *gtype.Bool // Singleton mode.
  16. status *gtype.Int // Job status.
  17. times *gtype.Int // Limit running times.
  18. create int64 // Timer ticks when the job installed.
  19. interval int64 // The interval ticks of the job.
  20. createMs int64 // The timestamp in milliseconds when job installed.
  21. intervalMs int64 // The interval milliseconds of the job.
  22. rawIntervalMs int64 // Raw input interval in milliseconds.
  23. }
  24. // JobFunc is the job function.
  25. type JobFunc = func()
  26. // addEntry adds a timing job to the wheel.
  27. func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
  28. if times <= 0 {
  29. times = defaultTimes
  30. }
  31. var (
  32. ms = interval.Nanoseconds() / 1e6
  33. num = ms / w.intervalMs
  34. )
  35. if num == 0 {
  36. // If the given interval is lesser than the one of the wheel,
  37. // then sets it to one tick, which means it will be run in one interval.
  38. num = 1
  39. }
  40. nowMs := time.Now().UnixNano() / 1e6
  41. ticks := w.ticks.Val()
  42. entry := &Entry{
  43. wheel: w,
  44. job: job,
  45. times: gtype.NewInt(times),
  46. status: gtype.NewInt(status),
  47. create: ticks,
  48. interval: num,
  49. singleton: gtype.NewBool(singleton),
  50. createMs: nowMs,
  51. intervalMs: ms,
  52. rawIntervalMs: ms,
  53. }
  54. // Install the job to the list of the slot.
  55. w.slots[(ticks+num)%w.number].PushBack(entry)
  56. return entry
  57. }
  58. // addEntryByParent adds a timing job with parent entry.
  59. func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry {
  60. num := interval / w.intervalMs
  61. if num == 0 {
  62. num = 1
  63. }
  64. nowMs := time.Now().UnixNano() / 1e6
  65. ticks := w.ticks.Val()
  66. entry := &Entry{
  67. wheel: w,
  68. job: parent.job,
  69. times: parent.times,
  70. status: parent.status,
  71. create: ticks,
  72. interval: num,
  73. singleton: parent.singleton,
  74. createMs: nowMs,
  75. intervalMs: interval,
  76. rawIntervalMs: parent.rawIntervalMs,
  77. }
  78. w.slots[(ticks+num)%w.number].PushBack(entry)
  79. return entry
  80. }
  81. // Status returns the status of the job.
  82. func (entry *Entry) Status() int {
  83. return entry.status.Val()
  84. }
  85. // SetStatus custom sets the status for the job.
  86. func (entry *Entry) SetStatus(status int) int {
  87. return entry.status.Set(status)
  88. }
  89. // Start starts the job.
  90. func (entry *Entry) Start() {
  91. entry.status.Set(StatusReady)
  92. }
  93. // Stop stops the job.
  94. func (entry *Entry) Stop() {
  95. entry.status.Set(StatusStopped)
  96. }
  97. //Reset reset the job.
  98. func (entry *Entry) Reset() {
  99. entry.status.Set(StatusReset)
  100. }
  101. // Close closes the job, and then it will be removed from the timer.
  102. func (entry *Entry) Close() {
  103. entry.status.Set(StatusClosed)
  104. }
  105. // IsSingleton checks and returns whether the job in singleton mode.
  106. func (entry *Entry) IsSingleton() bool {
  107. return entry.singleton.Val()
  108. }
  109. // SetSingleton sets the job singleton mode.
  110. func (entry *Entry) SetSingleton(enabled bool) {
  111. entry.singleton.Set(enabled)
  112. }
  113. // SetTimes sets the limit running times for the job.
  114. func (entry *Entry) SetTimes(times int) {
  115. entry.times.Set(times)
  116. }
  117. // Run runs the job.
  118. func (entry *Entry) Run() {
  119. entry.job()
  120. }
  121. // check checks if the job should be run in given ticks and timestamp milliseconds.
  122. func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) {
  123. switch entry.status.Val() {
  124. case StatusStopped:
  125. return false, true
  126. case StatusClosed:
  127. return false, false
  128. case StatusReset:
  129. return false, true
  130. }
  131. // Firstly checks using the ticks, this may be low precision as one tick is a little bit long.
  132. if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
  133. // If not the lowest level wheel.
  134. if entry.wheel.level > 0 {
  135. diffMs := nowMs - entry.createMs
  136. switch {
  137. // Add it to the next slot, which means it will run on next interval.
  138. case diffMs < entry.wheel.timer.intervalMs:
  139. entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry)
  140. return false, false
  141. // Normal rolls on the job.
  142. case diffMs >= entry.wheel.timer.intervalMs:
  143. // Calculate the leftover milliseconds,
  144. // if it is greater than the minimum interval, then re-install it.
  145. if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
  146. // Re-calculate and re-installs the job proper slot.
  147. entry.wheel.timer.doAddEntryByParent(leftMs, entry)
  148. return false, false
  149. }
  150. }
  151. }
  152. // Singleton mode check.
  153. if entry.IsSingleton() {
  154. // Note that it is atomic operation to ensure concurrent safety.
  155. if entry.status.Set(StatusRunning) == StatusRunning {
  156. return false, true
  157. }
  158. }
  159. // Limit running times.
  160. times := entry.times.Add(-1)
  161. if times <= 0 {
  162. // Note that it is atomic operation to ensure concurrent safety.
  163. if entry.status.Set(StatusClosed) == StatusClosed || times < 0 {
  164. return false, false
  165. }
  166. }
  167. // This means it does not limit the running times.
  168. // I know it's ugly, but it is surely high performance for running times limit.
  169. if times < 2000000000 && times > 1000000000 {
  170. entry.times.Set(defaultTimes)
  171. }
  172. return true, true
  173. }
  174. return false, true
  175. }