throttler.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright (c) 2018 The Jaeger Authors.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package remote
  15. import (
  16. "fmt"
  17. "net/url"
  18. "sync"
  19. "sync/atomic"
  20. "time"
  21. "github.com/pkg/errors"
  22. "github.com/uber/jaeger-client-go"
  23. "github.com/uber/jaeger-client-go/utils"
  24. )
  25. const (
  26. // minimumCredits is the minimum amount of credits necessary to not be throttled.
  27. // i.e. if currentCredits > minimumCredits, then the operation will not be throttled.
  28. minimumCredits = 1.0
  29. )
  30. var (
  31. errorUUIDNotSet = errors.New("Throttler UUID must be set")
  32. )
  33. type operationBalance struct {
  34. Operation string `json:"operation"`
  35. Balance float64 `json:"balance"`
  36. }
  37. type creditResponse struct {
  38. Balances []operationBalance `json:"balances"`
  39. }
  40. type httpCreditManagerProxy struct {
  41. hostPort string
  42. }
  43. func newHTTPCreditManagerProxy(hostPort string) *httpCreditManagerProxy {
  44. return &httpCreditManagerProxy{
  45. hostPort: hostPort,
  46. }
  47. }
  48. // N.B. Operations list must not be empty.
  49. func (m *httpCreditManagerProxy) FetchCredits(uuid, serviceName string, operations []string) (*creditResponse, error) {
  50. params := url.Values{}
  51. params.Set("service", serviceName)
  52. params.Set("uuid", uuid)
  53. for _, op := range operations {
  54. params.Add("operations", op)
  55. }
  56. var resp creditResponse
  57. if err := utils.GetJSON(fmt.Sprintf("http://%s/credits?%s", m.hostPort, params.Encode()), &resp); err != nil {
  58. return nil, errors.Wrap(err, "Failed to receive credits from agent")
  59. }
  60. return &resp, nil
  61. }
  62. // Throttler retrieves credits from agent and uses it to throttle operations.
  63. type Throttler struct {
  64. options
  65. mux sync.RWMutex
  66. service string
  67. uuid atomic.Value
  68. creditManager *httpCreditManagerProxy
  69. credits map[string]float64 // map of operation->credits
  70. close chan struct{}
  71. stopped sync.WaitGroup
  72. }
  73. // NewThrottler returns a Throttler that polls agent for credits and uses them to throttle
  74. // the service.
  75. func NewThrottler(service string, options ...Option) *Throttler {
  76. opts := applyOptions(options...)
  77. creditManager := newHTTPCreditManagerProxy(opts.hostPort)
  78. t := &Throttler{
  79. options: opts,
  80. creditManager: creditManager,
  81. service: service,
  82. credits: make(map[string]float64),
  83. close: make(chan struct{}),
  84. }
  85. t.stopped.Add(1)
  86. go t.pollManager()
  87. return t
  88. }
  89. // IsAllowed implements Throttler#IsAllowed.
  90. func (t *Throttler) IsAllowed(operation string) bool {
  91. t.mux.Lock()
  92. defer t.mux.Unlock()
  93. value, ok := t.credits[operation]
  94. if !ok || value == 0 {
  95. if !ok {
  96. // NOTE: This appears to be a no-op at first glance, but it stores
  97. // the operation key in the map. Necessary for functionality of
  98. // Throttler#operations method.
  99. t.credits[operation] = 0
  100. }
  101. if !t.synchronousInitialization {
  102. t.metrics.ThrottledDebugSpans.Inc(1)
  103. return false
  104. }
  105. // If it is the first time this operation is being checked, synchronously fetch
  106. // the credits.
  107. credits, err := t.fetchCredits([]string{operation})
  108. if err != nil {
  109. // Failed to receive credits from agent, try again next time
  110. t.logger.Error("Failed to fetch credits: " + err.Error())
  111. return false
  112. }
  113. if len(credits.Balances) == 0 {
  114. // This shouldn't happen but just in case
  115. return false
  116. }
  117. for _, opBalance := range credits.Balances {
  118. t.credits[opBalance.Operation] += opBalance.Balance
  119. }
  120. }
  121. return t.isAllowed(operation)
  122. }
  123. // Close stops the throttler from fetching credits from remote.
  124. func (t *Throttler) Close() error {
  125. close(t.close)
  126. t.stopped.Wait()
  127. return nil
  128. }
  129. // SetProcess implements ProcessSetter#SetProcess. It's imperative that the UUID is set before any remote
  130. // requests are made.
  131. func (t *Throttler) SetProcess(process jaeger.Process) {
  132. if process.UUID != "" {
  133. t.uuid.Store(process.UUID)
  134. }
  135. }
  136. // N.B. This function must be called with the Write Lock
  137. func (t *Throttler) isAllowed(operation string) bool {
  138. credits := t.credits[operation]
  139. if credits < minimumCredits {
  140. t.metrics.ThrottledDebugSpans.Inc(1)
  141. return false
  142. }
  143. t.credits[operation] = credits - minimumCredits
  144. return true
  145. }
  146. func (t *Throttler) pollManager() {
  147. defer t.stopped.Done()
  148. ticker := time.NewTicker(t.refreshInterval)
  149. defer ticker.Stop()
  150. for {
  151. select {
  152. case <-ticker.C:
  153. t.refreshCredits()
  154. case <-t.close:
  155. return
  156. }
  157. }
  158. }
  159. func (t *Throttler) operations() []string {
  160. t.mux.RLock()
  161. defer t.mux.RUnlock()
  162. operations := make([]string, 0, len(t.credits))
  163. for op := range t.credits {
  164. operations = append(operations, op)
  165. }
  166. return operations
  167. }
  168. func (t *Throttler) refreshCredits() {
  169. operations := t.operations()
  170. if len(operations) == 0 {
  171. return
  172. }
  173. newCredits, err := t.fetchCredits(operations)
  174. if err != nil {
  175. t.metrics.ThrottlerUpdateFailure.Inc(1)
  176. t.logger.Error("Failed to fetch credits: " + err.Error())
  177. return
  178. }
  179. t.metrics.ThrottlerUpdateSuccess.Inc(1)
  180. t.mux.Lock()
  181. defer t.mux.Unlock()
  182. for _, opBalance := range newCredits.Balances {
  183. t.credits[opBalance.Operation] += opBalance.Balance
  184. }
  185. }
  186. func (t *Throttler) fetchCredits(operations []string) (*creditResponse, error) {
  187. uuid := t.uuid.Load()
  188. uuidStr, _ := uuid.(string)
  189. if uuid == nil || uuidStr == "" {
  190. return nil, errorUUIDNotSet
  191. }
  192. return t.creditManager.FetchCredits(uuidStr, t.service, operations)
  193. }