stats.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package kafka
  2. import (
  3. "sync/atomic"
  4. "time"
  5. )
  6. // SummaryStats is a data structure that carries a summary of observed values.
  7. type SummaryStats struct {
  8. Avg int64 `metric:"avg" type:"gauge"`
  9. Min int64 `metric:"min" type:"gauge"`
  10. Max int64 `metric:"max" type:"gauge"`
  11. Count int64 `metric:"count" type:"counter"`
  12. Sum int64 `metric:"sum" type:"counter"`
  13. }
  14. // DurationStats is a data structure that carries a summary of observed duration values.
  15. type DurationStats struct {
  16. Avg time.Duration `metric:"avg" type:"gauge"`
  17. Min time.Duration `metric:"min" type:"gauge"`
  18. Max time.Duration `metric:"max" type:"gauge"`
  19. Count int64 `metric:"count" type:"counter"`
  20. Sum time.Duration `metric:"sum" type:"counter"`
  21. }
  22. // counter is an atomic incrementing counter which gets reset on snapshot.
  23. //
  24. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  25. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  26. type counter int64
  27. func (c *counter) ptr() *int64 {
  28. return (*int64)(c)
  29. }
  30. func (c *counter) observe(v int64) {
  31. atomic.AddInt64(c.ptr(), v)
  32. }
  33. func (c *counter) snapshot() int64 {
  34. return atomic.SwapInt64(c.ptr(), 0)
  35. }
  36. // gauge is an atomic integer that may be set to any arbitrary value, the value
  37. // does not change after a snapshot.
  38. //
  39. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  40. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  41. type gauge int64
  42. func (g *gauge) ptr() *int64 {
  43. return (*int64)(g)
  44. }
  45. func (g *gauge) observe(v int64) {
  46. atomic.StoreInt64(g.ptr(), v)
  47. }
  48. func (g *gauge) snapshot() int64 {
  49. return atomic.LoadInt64(g.ptr())
  50. }
  51. // minimum is an atomic integral type that keeps track of the minimum of all
  52. // values that it observed between snapshots.
  53. //
  54. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  55. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  56. type minimum int64
  57. func (m *minimum) ptr() *int64 {
  58. return (*int64)(m)
  59. }
  60. func (m *minimum) observe(v int64) {
  61. for {
  62. ptr := m.ptr()
  63. min := atomic.LoadInt64(ptr)
  64. if min >= 0 && min <= v {
  65. break
  66. }
  67. if atomic.CompareAndSwapInt64(ptr, min, v) {
  68. break
  69. }
  70. }
  71. }
  72. func (m *minimum) snapshot() int64 {
  73. p := m.ptr()
  74. v := atomic.LoadInt64(p)
  75. atomic.CompareAndSwapInt64(p, v, -1)
  76. if v < 0 {
  77. v = 0
  78. }
  79. return v
  80. }
  81. // maximum is an atomic integral type that keeps track of the maximum of all
  82. // values that it observed between snapshots.
  83. //
  84. // Since atomic is used to mutate the statistic the value must be 64-bit aligned.
  85. // See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
  86. type maximum int64
  87. func (m *maximum) ptr() *int64 {
  88. return (*int64)(m)
  89. }
  90. func (m *maximum) observe(v int64) {
  91. for {
  92. ptr := m.ptr()
  93. max := atomic.LoadInt64(ptr)
  94. if max >= 0 && max >= v {
  95. break
  96. }
  97. if atomic.CompareAndSwapInt64(ptr, max, v) {
  98. break
  99. }
  100. }
  101. }
  102. func (m *maximum) snapshot() int64 {
  103. p := m.ptr()
  104. v := atomic.LoadInt64(p)
  105. atomic.CompareAndSwapInt64(p, v, -1)
  106. if v < 0 {
  107. v = 0
  108. }
  109. return v
  110. }
  111. type summary struct {
  112. min minimum
  113. max maximum
  114. sum counter
  115. count counter
  116. }
  117. func makeSummary() summary {
  118. return summary{
  119. min: -1,
  120. max: -1,
  121. }
  122. }
  123. func (s *summary) observe(v int64) {
  124. s.min.observe(v)
  125. s.max.observe(v)
  126. s.sum.observe(v)
  127. s.count.observe(1)
  128. }
  129. func (s *summary) observeDuration(v time.Duration) {
  130. s.observe(int64(v))
  131. }
  132. func (s *summary) snapshot() SummaryStats {
  133. avg := int64(0)
  134. min := s.min.snapshot()
  135. max := s.max.snapshot()
  136. sum := s.sum.snapshot()
  137. count := s.count.snapshot()
  138. if count != 0 {
  139. avg = int64(float64(sum) / float64(count))
  140. }
  141. return SummaryStats{
  142. Avg: avg,
  143. Min: min,
  144. Max: max,
  145. Count: count,
  146. Sum: sum,
  147. }
  148. }
  149. func (s *summary) snapshotDuration() DurationStats {
  150. summary := s.snapshot()
  151. return DurationStats{
  152. Avg: time.Duration(summary.Avg),
  153. Min: time.Duration(summary.Min),
  154. Max: time.Duration(summary.Max),
  155. Count: summary.Count,
  156. Sum: time.Duration(summary.Sum),
  157. }
  158. }