store.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. */
  18. package mqtt
  19. import (
  20. "fmt"
  21. "strconv"
  22. "github.com/eclipse/paho.mqtt.golang/packets"
  23. )
  24. const (
  25. inboundPrefix = "i."
  26. outboundPrefix = "o."
  27. )
  28. // Store is an interface which can be used to provide implementations
  29. // for message persistence.
  30. // Because we may have to store distinct messages with the same
  31. // message ID, we need a unique key for each message. This is
  32. // possible by prepending "i." or "o." to each message id
  33. type Store interface {
  34. Open()
  35. Put(key string, message packets.ControlPacket)
  36. Get(key string) packets.ControlPacket
  37. All() []string
  38. Del(key string)
  39. Close()
  40. Reset()
  41. }
  42. // A key MUST have the form "X.[messageid]"
  43. // where X is 'i' or 'o'
  44. func mIDFromKey(key string) uint16 {
  45. s := key[2:]
  46. i, err := strconv.ParseUint(s, 10, 16)
  47. chkerr(err)
  48. return uint16(i)
  49. }
  50. // Return true if key prefix is outbound
  51. func isKeyOutbound(key string) bool {
  52. return key[:2] == outboundPrefix
  53. }
  54. // Return true if key prefix is inbound
  55. func isKeyInbound(key string) bool {
  56. return key[:2] == inboundPrefix
  57. }
  58. // Return a string of the form "i.[id]"
  59. func inboundKeyFromMID(id uint16) string {
  60. return fmt.Sprintf("%s%d", inboundPrefix, id)
  61. }
  62. // Return a string of the form "o.[id]"
  63. func outboundKeyFromMID(id uint16) string {
  64. return fmt.Sprintf("%s%d", outboundPrefix, id)
  65. }
  66. // govern which outgoing messages are persisted
  67. func persistOutbound(s Store, m packets.ControlPacket) {
  68. switch m.Details().Qos {
  69. case 0:
  70. switch m.(type) {
  71. case *packets.PubackPacket, *packets.PubcompPacket:
  72. // Sending puback. delete matching publish
  73. // from ibound
  74. s.Del(inboundKeyFromMID(m.Details().MessageID))
  75. }
  76. case 1:
  77. switch m.(type) {
  78. case *packets.PublishPacket, *packets.PubrelPacket, *packets.SubscribePacket, *packets.UnsubscribePacket:
  79. // Sending publish. store in obound
  80. // until puback received
  81. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  82. default:
  83. ERROR.Println(STR, "Asked to persist an invalid message type")
  84. }
  85. case 2:
  86. switch m.(type) {
  87. case *packets.PublishPacket:
  88. // Sending publish. store in obound
  89. // until pubrel received
  90. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  91. default:
  92. ERROR.Println(STR, "Asked to persist an invalid message type")
  93. }
  94. }
  95. }
  96. // govern which incoming messages are persisted
  97. func persistInbound(s Store, m packets.ControlPacket) {
  98. switch m.Details().Qos {
  99. case 0:
  100. switch m.(type) {
  101. case *packets.PubackPacket, *packets.SubackPacket, *packets.UnsubackPacket, *packets.PubcompPacket:
  102. // Received a puback. delete matching publish
  103. // from obound
  104. s.Del(outboundKeyFromMID(m.Details().MessageID))
  105. case *packets.PublishPacket, *packets.PubrecPacket, *packets.PingrespPacket, *packets.ConnackPacket:
  106. default:
  107. ERROR.Println(STR, "Asked to persist an invalid messages type")
  108. }
  109. case 1:
  110. switch m.(type) {
  111. case *packets.PublishPacket, *packets.PubrelPacket:
  112. // Received a publish. store it in ibound
  113. // until puback sent
  114. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  115. default:
  116. ERROR.Println(STR, "Asked to persist an invalid messages type")
  117. }
  118. case 2:
  119. switch m.(type) {
  120. case *packets.PublishPacket:
  121. // Received a publish. store it in ibound
  122. // until pubrel received
  123. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  124. default:
  125. ERROR.Println(STR, "Asked to persist an invalid messages type")
  126. }
  127. }
  128. }