ping.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. */
  18. package mqtt
  19. import (
  20. "errors"
  21. "io"
  22. "sync/atomic"
  23. "time"
  24. "github.com/eclipse/paho.mqtt.golang/packets"
  25. )
  26. // keepalive - Send ping when connection unused for set period
  27. // connection passed in to avoid race condition on shutdown
  28. func keepalive(c *client, conn io.Writer) {
  29. defer c.workers.Done()
  30. DEBUG.Println(PNG, "keepalive starting")
  31. var checkInterval time.Duration
  32. var pingSent time.Time
  33. if c.options.KeepAlive > 10 {
  34. checkInterval = 5 * time.Second
  35. } else {
  36. checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 2
  37. }
  38. intervalTicker := time.NewTicker(checkInterval)
  39. defer intervalTicker.Stop()
  40. for {
  41. select {
  42. case <-c.stop:
  43. DEBUG.Println(PNG, "keepalive stopped")
  44. return
  45. case <-intervalTicker.C:
  46. lastSent := c.lastSent.Load().(time.Time)
  47. lastReceived := c.lastReceived.Load().(time.Time)
  48. DEBUG.Println(PNG, "ping check", time.Since(lastSent).Seconds())
  49. if time.Since(lastSent) >= time.Duration(c.options.KeepAlive*int64(time.Second)) || time.Since(lastReceived) >= time.Duration(c.options.KeepAlive*int64(time.Second)) {
  50. if atomic.LoadInt32(&c.pingOutstanding) == 0 {
  51. DEBUG.Println(PNG, "keepalive sending ping")
  52. ping := packets.NewControlPacket(packets.Pingreq).(*packets.PingreqPacket)
  53. // We don't want to wait behind large messages being sent, the `Write` call
  54. // will block until it is able to send the packet.
  55. atomic.StoreInt32(&c.pingOutstanding, 1)
  56. if err := ping.Write(conn); err != nil {
  57. ERROR.Println(PNG, err)
  58. }
  59. c.lastSent.Store(time.Now())
  60. pingSent = time.Now()
  61. }
  62. }
  63. if atomic.LoadInt32(&c.pingOutstanding) > 0 && time.Since(pingSent) >= c.options.PingTimeout {
  64. CRITICAL.Println(PNG, "pingresp not received, disconnecting")
  65. c.internalConnLost(errors.New("pingresp not received, disconnecting")) // no harm in calling this if the connection is already down (or shutdown is in progress)
  66. return
  67. }
  68. }
  69. }
  70. }