store.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. /*
  2. * Copyright (c) 2013 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * which accompanies this distribution, and is available at
  7. * http://www.eclipse.org/legal/epl-v10.html
  8. *
  9. * Contributors:
  10. * Seth Hoenig
  11. * Allan Stockdill-Mander
  12. * Mike Robertson
  13. */
  14. package mqtt
  15. import (
  16. "fmt"
  17. "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
  18. "strconv"
  19. )
  20. const (
  21. inboundPrefix = "i."
  22. outboundPrefix = "o."
  23. )
  24. // Store is an interface which can be used to provide implementations
  25. // for message persistence.
  26. // Because we may have to store distinct messages with the same
  27. // message ID, we need a unique key for each message. This is
  28. // possible by prepending "i." or "o." to each message id
  29. type Store interface {
  30. Open()
  31. Put(string, packets.ControlPacket)
  32. Get(string) packets.ControlPacket
  33. All() []string
  34. Del(string)
  35. Close()
  36. Reset()
  37. }
  38. // A key MUST have the form "X.[messageid]"
  39. // where X is 'i' or 'o'
  40. func mIDFromKey(key string) uint16 {
  41. s := key[2:]
  42. i, err := strconv.Atoi(s)
  43. chkerr(err)
  44. return uint16(i)
  45. }
  46. // Return a string of the form "i.[id]"
  47. func inboundKeyFromMID(id uint16) string {
  48. return fmt.Sprintf("%s%d", inboundPrefix, id)
  49. }
  50. // Return a string of the form "o.[id]"
  51. func outboundKeyFromMID(id uint16) string {
  52. return fmt.Sprintf("%s%d", outboundPrefix, id)
  53. }
  54. // govern which outgoing messages are persisted
  55. func persistOutbound(s Store, m packets.ControlPacket) {
  56. switch m.Details().Qos {
  57. case 0:
  58. switch m.(type) {
  59. case *packets.PubackPacket, *packets.PubcompPacket:
  60. // Sending puback. delete matching publish
  61. // from ibound
  62. s.Del(inboundKeyFromMID(m.Details().MessageID))
  63. }
  64. case 1:
  65. switch m.(type) {
  66. case *packets.PublishPacket, *packets.PubrelPacket, *packets.SubscribePacket, *packets.UnsubscribePacket:
  67. // Sending publish. store in obound
  68. // until puback received
  69. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  70. default:
  71. chkcond(false)
  72. }
  73. case 2:
  74. switch m.(type) {
  75. case *packets.PublishPacket:
  76. // Sending publish. store in obound
  77. // until pubrel received
  78. s.Put(outboundKeyFromMID(m.Details().MessageID), m)
  79. default:
  80. chkcond(false)
  81. }
  82. }
  83. }
  84. // govern which incoming messages are persisted
  85. func persistInbound(s Store, m packets.ControlPacket) {
  86. switch m.Details().Qos {
  87. case 0:
  88. switch m.(type) {
  89. case *packets.PubackPacket, *packets.SubackPacket, *packets.UnsubackPacket, *packets.PubcompPacket:
  90. // Received a puback. delete matching publish
  91. // from obound
  92. s.Del(outboundKeyFromMID(m.Details().MessageID))
  93. case *packets.PublishPacket, *packets.PubrecPacket, *packets.PingrespPacket, *packets.ConnackPacket:
  94. default:
  95. chkcond(false)
  96. }
  97. case 1:
  98. switch m.(type) {
  99. case *packets.PublishPacket, *packets.PubrelPacket:
  100. // Received a publish. store it in ibound
  101. // until puback sent
  102. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  103. default:
  104. chkcond(false)
  105. }
  106. case 2:
  107. switch m.(type) {
  108. case *packets.PublishPacket:
  109. // Received a publish. store it in ibound
  110. // until pubrel received
  111. s.Put(inboundKeyFromMID(m.Details().MessageID), m)
  112. default:
  113. chkcond(false)
  114. }
  115. }
  116. }