gtimer_timer.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. // Copyright GoFrame Author(https://github.com/gogf/gf). All Rights Reserved.
  2. //
  3. // This Source Code Form is subject to the terms of the MIT License.
  4. // If a copy of the MIT was not distributed with this file,
  5. // You can obtain one at https://github.com/gogf/gf.
  6. package gtimer
  7. import (
  8. "fmt"
  9. "time"
  10. "github.com/gogf/gf/container/glist"
  11. "github.com/gogf/gf/container/gtype"
  12. )
  13. // Timer is a Hierarchical Timing Wheel manager for timing jobs.
  14. type Timer struct {
  15. status *gtype.Int // Timer status.
  16. wheels []*wheel // The underlying wheels.
  17. length int // Max level of the wheels.
  18. number int // Slot Number of each wheel.
  19. intervalMs int64 // Interval of the slot in milliseconds.
  20. }
  21. // Wheel is a slot wrapper for timing job install and uninstall.
  22. type wheel struct {
  23. timer *Timer // Belonged timer.
  24. level int // The level in the timer.
  25. slots []*glist.List // Slot array.
  26. number int64 // Slot Number=len(slots).
  27. ticks *gtype.Int64 // Ticked count of the wheel, one tick is one of its interval passed.
  28. totalMs int64 // Total duration in milliseconds=number*interval.
  29. createMs int64 // Created timestamp in milliseconds.
  30. intervalMs int64 // Interval in milliseconds, which is the duration of one slot.
  31. }
  32. // New creates and returns a Hierarchical Timing Wheel designed timer.
  33. // The parameter <interval> specifies the interval of the timer.
  34. // The optional parameter <level> specifies the wheels count of the timer,
  35. // which is defaultWheelLevel in default.
  36. func New(slot int, interval time.Duration, level ...int) *Timer {
  37. if slot <= 0 {
  38. panic(fmt.Sprintf("invalid slot number: %d", slot))
  39. }
  40. length := defaultWheelLevel
  41. if len(level) > 0 {
  42. length = level[0]
  43. }
  44. t := &Timer{
  45. status: gtype.NewInt(StatusRunning),
  46. wheels: make([]*wheel, length),
  47. length: length,
  48. number: slot,
  49. intervalMs: interval.Nanoseconds() / 1e6,
  50. }
  51. for i := 0; i < length; i++ {
  52. if i > 0 {
  53. n := time.Duration(t.wheels[i-1].totalMs) * time.Millisecond
  54. if n <= 0 {
  55. panic(fmt.Sprintf(`inteval is too large with level: %dms x %d`, interval, length))
  56. }
  57. w := t.newWheel(i, slot, n)
  58. t.wheels[i] = w
  59. t.wheels[i-1].addEntry(n, w.proceed, false, defaultTimes, StatusReady)
  60. } else {
  61. t.wheels[i] = t.newWheel(i, slot, interval)
  62. }
  63. }
  64. t.wheels[0].start()
  65. return t
  66. }
  67. // newWheel creates and returns a single wheel.
  68. func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel {
  69. w := &wheel{
  70. timer: t,
  71. level: level,
  72. slots: make([]*glist.List, slot),
  73. number: int64(slot),
  74. ticks: gtype.NewInt64(),
  75. totalMs: int64(slot) * interval.Nanoseconds() / 1e6,
  76. createMs: time.Now().UnixNano() / 1e6,
  77. intervalMs: interval.Nanoseconds() / 1e6,
  78. }
  79. for i := int64(0); i < w.number; i++ {
  80. w.slots[i] = glist.New(true)
  81. }
  82. return w
  83. }
  84. // Add adds a timing job to the timer, which runs in interval of <interval>.
  85. func (t *Timer) Add(interval time.Duration, job JobFunc) *Entry {
  86. return t.doAddEntry(interval, job, false, defaultTimes, StatusReady)
  87. }
  88. // AddEntry adds a timing job to the timer with detailed parameters.
  89. //
  90. // The parameter <interval> specifies the running interval of the job.
  91. //
  92. // The parameter <singleton> specifies whether the job running in singleton mode.
  93. // There's only one of the same job is allowed running when its a singleton mode job.
  94. //
  95. // The parameter <times> specifies limit for the job running times, which means the job
  96. // exits if its run times exceeds the <times>.
  97. //
  98. // The parameter <status> specifies the job status when it's firstly added to the timer.
  99. func (t *Timer) AddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
  100. return t.doAddEntry(interval, job, singleton, times, status)
  101. }
  102. // AddSingleton is a convenience function for add singleton mode job.
  103. func (t *Timer) AddSingleton(interval time.Duration, job JobFunc) *Entry {
  104. return t.doAddEntry(interval, job, true, defaultTimes, StatusReady)
  105. }
  106. // AddOnce is a convenience function for adding a job which only runs once and then exits.
  107. func (t *Timer) AddOnce(interval time.Duration, job JobFunc) *Entry {
  108. return t.doAddEntry(interval, job, true, 1, StatusReady)
  109. }
  110. // AddTimes is a convenience function for adding a job which is limited running times.
  111. func (t *Timer) AddTimes(interval time.Duration, times int, job JobFunc) *Entry {
  112. return t.doAddEntry(interval, job, true, times, StatusReady)
  113. }
  114. // DelayAdd adds a timing job after delay of <interval> duration.
  115. // Also see Add.
  116. func (t *Timer) DelayAdd(delay time.Duration, interval time.Duration, job JobFunc) {
  117. t.AddOnce(delay, func() {
  118. t.Add(interval, job)
  119. })
  120. }
  121. // DelayAddEntry adds a timing job after delay of <interval> duration.
  122. // Also see AddEntry.
  123. func (t *Timer) DelayAddEntry(delay time.Duration, interval time.Duration, job JobFunc, singleton bool, times int, status int) {
  124. t.AddOnce(delay, func() {
  125. t.AddEntry(interval, job, singleton, times, status)
  126. })
  127. }
  128. // DelayAddSingleton adds a timing job after delay of <interval> duration.
  129. // Also see AddSingleton.
  130. func (t *Timer) DelayAddSingleton(delay time.Duration, interval time.Duration, job JobFunc) {
  131. t.AddOnce(delay, func() {
  132. t.AddSingleton(interval, job)
  133. })
  134. }
  135. // DelayAddOnce adds a timing job after delay of <interval> duration.
  136. // Also see AddOnce.
  137. func (t *Timer) DelayAddOnce(delay time.Duration, interval time.Duration, job JobFunc) {
  138. t.AddOnce(delay, func() {
  139. t.AddOnce(interval, job)
  140. })
  141. }
  142. // DelayAddTimes adds a timing job after delay of <interval> duration.
  143. // Also see AddTimes.
  144. func (t *Timer) DelayAddTimes(delay time.Duration, interval time.Duration, times int, job JobFunc) {
  145. t.AddOnce(delay, func() {
  146. t.AddTimes(interval, times, job)
  147. })
  148. }
  149. // Start starts the timer.
  150. func (t *Timer) Start() {
  151. t.status.Set(StatusRunning)
  152. }
  153. // Stop stops the timer.
  154. func (t *Timer) Stop() {
  155. t.status.Set(StatusStopped)
  156. }
  157. // Close closes the timer.
  158. func (t *Timer) Close() {
  159. t.status.Set(StatusClosed)
  160. }
  161. // doAddEntry adds a timing job to timer for internal usage.
  162. func (t *Timer) doAddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
  163. return t.wheels[t.getLevelByIntervalMs(interval.Nanoseconds()/1e6)].addEntry(interval, job, singleton, times, status)
  164. }
  165. // doAddEntryByParent adds a timing job to timer with parent entry for internal usage.
  166. func (t *Timer) doAddEntryByParent(interval int64, parent *Entry) *Entry {
  167. return t.wheels[t.getLevelByIntervalMs(interval)].addEntryByParent(interval, parent)
  168. }
  169. // getLevelByIntervalMs calculates and returns the level of timer wheel with given milliseconds.
  170. func (t *Timer) getLevelByIntervalMs(intervalMs int64) int {
  171. pos, cmp := t.binSearchIndex(intervalMs)
  172. switch cmp {
  173. // If equals to the last comparison value, do not add it directly to this wheel,
  174. // but loop and continue comparison from the index to the first level,
  175. // and add it to the proper level wheel.
  176. case 0:
  177. fallthrough
  178. // If lesser than the last comparison value,
  179. // loop and continue comparison from the index to the first level,
  180. // and add it to the proper level wheel.
  181. case -1:
  182. i := pos
  183. for ; i > 0; i-- {
  184. if intervalMs > t.wheels[i].intervalMs && intervalMs <= t.wheels[i].totalMs {
  185. return i
  186. }
  187. }
  188. return i
  189. // If greater than the last comparison value,
  190. // loop and continue comparison from the index to the last level,
  191. // and add it to the proper level wheel.
  192. case 1:
  193. i := pos
  194. for ; i < t.length-1; i++ {
  195. if intervalMs > t.wheels[i].intervalMs && intervalMs <= t.wheels[i].totalMs {
  196. return i
  197. }
  198. }
  199. return i
  200. }
  201. return 0
  202. }
  203. // binSearchIndex uses binary search algorithm for finding the possible level of the wheel
  204. // for the interval value.
  205. func (t *Timer) binSearchIndex(n int64) (index int, result int) {
  206. min := 0
  207. max := t.length - 1
  208. mid := 0
  209. cmp := -2
  210. for min <= max {
  211. mid = min + int((max-min)/2)
  212. switch {
  213. case t.wheels[mid].intervalMs == n:
  214. cmp = 0
  215. case t.wheels[mid].intervalMs > n:
  216. cmp = -1
  217. case t.wheels[mid].intervalMs < n:
  218. cmp = 1
  219. }
  220. switch cmp {
  221. case -1:
  222. max = mid - 1
  223. case 1:
  224. min = mid + 1
  225. case 0:
  226. return mid, cmp
  227. }
  228. }
  229. return mid, cmp
  230. }