rate_limiter.go 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. // Copyright (c) 2017 Uber Technologies, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package utils
  15. import (
  16. "sync"
  17. "time"
  18. )
  19. // RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits.
  20. type RateLimiter interface {
  21. CheckCredit(itemCost float64) bool
  22. }
  23. type rateLimiter struct {
  24. sync.Mutex
  25. creditsPerSecond float64
  26. balance float64
  27. maxBalance float64
  28. lastTick time.Time
  29. timeNow func() time.Time
  30. }
  31. // NewRateLimiter creates a new rate limiter based on leaky bucket algorithm, formulated in terms of a
  32. // credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional
  33. // to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost
  34. // of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased"
  35. // and the balance reduced, indicated by returned value of true. Otherwise the balance is unchanged and return false.
  36. //
  37. // This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the
  38. // max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message
  39. // to determine if the message is within the rate limit.
  40. //
  41. // It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput
  42. // as bytes/second, and calling CheckCredit() with the actual message size.
  43. func NewRateLimiter(creditsPerSecond, maxBalance float64) RateLimiter {
  44. return &rateLimiter{
  45. creditsPerSecond: creditsPerSecond,
  46. balance: maxBalance,
  47. maxBalance: maxBalance,
  48. lastTick: time.Now(),
  49. timeNow: time.Now}
  50. }
  51. func (b *rateLimiter) CheckCredit(itemCost float64) bool {
  52. b.Lock()
  53. defer b.Unlock()
  54. // calculate how much time passed since the last tick, and update current tick
  55. currentTime := b.timeNow()
  56. elapsedTime := currentTime.Sub(b.lastTick)
  57. b.lastTick = currentTime
  58. // calculate how much credit have we accumulated since the last tick
  59. b.balance += elapsedTime.Seconds() * b.creditsPerSecond
  60. if b.balance > b.maxBalance {
  61. b.balance = b.maxBalance
  62. }
  63. // if we have enough credits to pay for current item, then reduce balance and allow
  64. if b.balance >= itemCost {
  65. b.balance -= itemCost
  66. return true
  67. }
  68. return false
  69. }