backoff.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Matt Brittan
  15. * Daichi Tomaru
  16. */
  17. package mqtt
  18. import (
  19. "sync"
  20. "time"
  21. )
  22. // Controller for sleep with backoff when the client attempts reconnection
  23. // It has statuses for each situations cause reconnection.
  24. type backoffController struct {
  25. sync.RWMutex
  26. statusMap map[string]*backoffStatus
  27. }
  28. type backoffStatus struct {
  29. lastSleepPeriod time.Duration
  30. lastErrorTime time.Time
  31. }
  32. func newBackoffController() *backoffController {
  33. return &backoffController{
  34. statusMap: map[string]*backoffStatus{},
  35. }
  36. }
  37. // Calculate next sleep period from the specified parameters.
  38. // Returned values are next sleep period and whether the error situation is continual.
  39. // If connection errors continuouslly occurs, its sleep period is exponentially increased.
  40. // Also if there is a lot of time between last and this error, sleep period is initialized.
  41. func (b *backoffController) getBackoffSleepTime(
  42. situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
  43. ) (time.Duration, bool) {
  44. // Decide first sleep time if the situation is not continual.
  45. var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
  46. if skip {
  47. status.lastSleepPeriod = 0
  48. return 0, false
  49. }
  50. status.lastSleepPeriod = init
  51. return init, false
  52. }
  53. // Prioritize maxSleep.
  54. if initSleepPeriod > maxSleepPeriod {
  55. initSleepPeriod = maxSleepPeriod
  56. }
  57. b.Lock()
  58. defer b.Unlock()
  59. status, exist := b.statusMap[situation]
  60. if !exist {
  61. b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
  62. return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
  63. }
  64. oldTime := status.lastErrorTime
  65. status.lastErrorTime = time.Now()
  66. // When there is a lot of time between last and this error, sleep period is initialized.
  67. if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
  68. return firstProcess(status, initSleepPeriod, skipFirst)
  69. }
  70. if status.lastSleepPeriod == 0 {
  71. status.lastSleepPeriod = initSleepPeriod
  72. return initSleepPeriod, true
  73. }
  74. if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
  75. status.lastSleepPeriod = nextSleepPeriod
  76. } else {
  77. status.lastSleepPeriod = maxSleepPeriod
  78. }
  79. return status.lastSleepPeriod, true
  80. }
  81. // Execute sleep the time returned from getBackoffSleepTime.
  82. func (b *backoffController) sleepWithBackoff(
  83. situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
  84. ) (time.Duration, bool) {
  85. sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
  86. if sleep != 0 {
  87. time.Sleep(sleep)
  88. }
  89. return sleep, isFirst
  90. }