packets.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package packets
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "errors"
  6. "fmt"
  7. "github.com/pborman/uuid"
  8. "io"
  9. )
  10. //ControlPacket defines the interface for structs intended to hold
  11. //decoded MQTT packets, either from being read or before being
  12. //written
  13. type ControlPacket interface {
  14. Write(io.Writer) error
  15. Unpack(io.Reader)
  16. String() string
  17. Details() Details
  18. UUID() uuid.UUID
  19. }
  20. //PacketNames maps the constants for each of the MQTT packet types
  21. //to a string representation of their name.
  22. var PacketNames = map[uint8]string{
  23. 1: "CONNECT",
  24. 2: "CONNACK",
  25. 3: "PUBLISH",
  26. 4: "PUBACK",
  27. 5: "PUBREC",
  28. 6: "PUBREL",
  29. 7: "PUBCOMP",
  30. 8: "SUBSCRIBE",
  31. 9: "SUBACK",
  32. 10: "UNSUBSCRIBE",
  33. 11: "UNSUBACK",
  34. 12: "PINGREQ",
  35. 13: "PINGRESP",
  36. 14: "DISCONNECT",
  37. }
  38. //Below are the constants assigned to each of the MQTT packet types
  39. const (
  40. Connect = 1
  41. Connack = 2
  42. Publish = 3
  43. Puback = 4
  44. Pubrec = 5
  45. Pubrel = 6
  46. Pubcomp = 7
  47. Subscribe = 8
  48. Suback = 9
  49. Unsubscribe = 10
  50. Unsuback = 11
  51. Pingreq = 12
  52. Pingresp = 13
  53. Disconnect = 14
  54. )
  55. //Below are the const definitions for error codes returned by
  56. //Connect()
  57. const (
  58. Accepted = 0x00
  59. ErrRefusedBadProtocolVersion = 0x01
  60. ErrRefusedIDRejected = 0x02
  61. ErrRefusedServerUnavailable = 0x03
  62. ErrRefusedBadUsernameOrPassword = 0x04
  63. ErrRefusedNotAuthorised = 0x05
  64. ErrNetworkError = 0xFE
  65. ErrProtocolViolation = 0xFF
  66. )
  67. //ConnackReturnCodes is a map of the error codes constants for Connect()
  68. //to a string representation of the error
  69. var ConnackReturnCodes = map[uint8]string{
  70. 0: "Connection Accepted",
  71. 1: "Connection Refused: Bad Protocol Version",
  72. 2: "Connection Refused: Client Identifier Rejected",
  73. 3: "Connection Refused: Server Unavailable",
  74. 4: "Connection Refused: Username or Password in unknown format",
  75. 5: "Connection Refused: Not Authorised",
  76. 254: "Connection Error",
  77. 255: "Connection Refused: Protocol Violation",
  78. }
  79. //ConnErrors is a map of the errors codes constants for Connect()
  80. //to a Go error
  81. var ConnErrors = map[byte]error{
  82. Accepted: nil,
  83. ErrRefusedBadProtocolVersion: errors.New("Unnacceptable protocol version"),
  84. ErrRefusedIDRejected: errors.New("Identifier rejected"),
  85. ErrRefusedServerUnavailable: errors.New("Server Unavailable"),
  86. ErrRefusedBadUsernameOrPassword: errors.New("Bad user name or password"),
  87. ErrRefusedNotAuthorised: errors.New("Not Authorized"),
  88. ErrNetworkError: errors.New("Network Error"),
  89. ErrProtocolViolation: errors.New("Protocol Violation"),
  90. }
  91. //ReadPacket takes an instance of an io.Reader (such as net.Conn) and attempts
  92. //to read an MQTT packet from the stream. It returns a ControlPacket
  93. //representing the decoded MQTT packet and an error. One of these returns will
  94. //always be nil, a nil ControlPacket indicating an error occurred.
  95. func ReadPacket(r io.Reader) (cp ControlPacket, err error) {
  96. var fh FixedHeader
  97. b := make([]byte, 1)
  98. _, err = io.ReadFull(r, b)
  99. if err != nil {
  100. return nil, err
  101. }
  102. fh.unpack(b[0], r)
  103. cp = NewControlPacketWithHeader(fh)
  104. if cp == nil {
  105. return nil, errors.New("Bad data from client")
  106. }
  107. packetBytes := make([]byte, fh.RemainingLength)
  108. _, err = io.ReadFull(r, packetBytes)
  109. if err != nil {
  110. return nil, err
  111. }
  112. cp.Unpack(bytes.NewBuffer(packetBytes))
  113. return cp, nil
  114. }
  115. //NewControlPacket is used to create a new ControlPacket of the type specified
  116. //by packetType, this is usually done by reference to the packet type constants
  117. //defined in packets.go. The newly created ControlPacket is empty and a pointer
  118. //is returned.
  119. func NewControlPacket(packetType byte) (cp ControlPacket) {
  120. switch packetType {
  121. case Connect:
  122. cp = &ConnectPacket{FixedHeader: FixedHeader{MessageType: Connect}, uuid: uuid.NewUUID()}
  123. case Connack:
  124. cp = &ConnackPacket{FixedHeader: FixedHeader{MessageType: Connack}, uuid: uuid.NewUUID()}
  125. case Disconnect:
  126. cp = &DisconnectPacket{FixedHeader: FixedHeader{MessageType: Disconnect}, uuid: uuid.NewUUID()}
  127. case Publish:
  128. cp = &PublishPacket{FixedHeader: FixedHeader{MessageType: Publish}, uuid: uuid.NewUUID()}
  129. case Puback:
  130. cp = &PubackPacket{FixedHeader: FixedHeader{MessageType: Puback}, uuid: uuid.NewUUID()}
  131. case Pubrec:
  132. cp = &PubrecPacket{FixedHeader: FixedHeader{MessageType: Pubrec}, uuid: uuid.NewUUID()}
  133. case Pubrel:
  134. cp = &PubrelPacket{FixedHeader: FixedHeader{MessageType: Pubrel, Qos: 1}, uuid: uuid.NewUUID()}
  135. case Pubcomp:
  136. cp = &PubcompPacket{FixedHeader: FixedHeader{MessageType: Pubcomp}, uuid: uuid.NewUUID()}
  137. case Subscribe:
  138. cp = &SubscribePacket{FixedHeader: FixedHeader{MessageType: Subscribe, Qos: 1}, uuid: uuid.NewUUID()}
  139. case Suback:
  140. cp = &SubackPacket{FixedHeader: FixedHeader{MessageType: Suback}, uuid: uuid.NewUUID()}
  141. case Unsubscribe:
  142. cp = &UnsubscribePacket{FixedHeader: FixedHeader{MessageType: Unsubscribe, Qos: 1}, uuid: uuid.NewUUID()}
  143. case Unsuback:
  144. cp = &UnsubackPacket{FixedHeader: FixedHeader{MessageType: Unsuback}, uuid: uuid.NewUUID()}
  145. case Pingreq:
  146. cp = &PingreqPacket{FixedHeader: FixedHeader{MessageType: Pingreq}, uuid: uuid.NewUUID()}
  147. case Pingresp:
  148. cp = &PingrespPacket{FixedHeader: FixedHeader{MessageType: Pingresp}, uuid: uuid.NewUUID()}
  149. default:
  150. return nil
  151. }
  152. return cp
  153. }
  154. //NewControlPacketWithHeader is used to create a new ControlPacket of the type
  155. //specified within the FixedHeader that is passed to the function.
  156. //The newly created ControlPacket is empty and a pointer is returned.
  157. func NewControlPacketWithHeader(fh FixedHeader) (cp ControlPacket) {
  158. switch fh.MessageType {
  159. case Connect:
  160. cp = &ConnectPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  161. case Connack:
  162. cp = &ConnackPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  163. case Disconnect:
  164. cp = &DisconnectPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  165. case Publish:
  166. cp = &PublishPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  167. case Puback:
  168. cp = &PubackPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  169. case Pubrec:
  170. cp = &PubrecPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  171. case Pubrel:
  172. cp = &PubrelPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  173. case Pubcomp:
  174. cp = &PubcompPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  175. case Subscribe:
  176. cp = &SubscribePacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  177. case Suback:
  178. cp = &SubackPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  179. case Unsubscribe:
  180. cp = &UnsubscribePacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  181. case Unsuback:
  182. cp = &UnsubackPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  183. case Pingreq:
  184. cp = &PingreqPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  185. case Pingresp:
  186. cp = &PingrespPacket{FixedHeader: fh, uuid: uuid.NewUUID()}
  187. default:
  188. return nil
  189. }
  190. return cp
  191. }
  192. //Details struct returned by the Details() function called on
  193. //ControlPackets to present details of the Qos and MessageID
  194. //of the ControlPacket
  195. type Details struct {
  196. Qos byte
  197. MessageID uint16
  198. }
  199. //FixedHeader is a struct to hold the decoded information from
  200. //the fixed header of an MQTT ControlPacket
  201. type FixedHeader struct {
  202. MessageType byte
  203. Dup bool
  204. Qos byte
  205. Retain bool
  206. RemainingLength int
  207. }
  208. func (fh FixedHeader) String() string {
  209. return fmt.Sprintf("%s: dup: %t qos: %d retain: %t rLength: %d", PacketNames[fh.MessageType], fh.Dup, fh.Qos, fh.Retain, fh.RemainingLength)
  210. }
  211. func boolToByte(b bool) byte {
  212. switch b {
  213. case true:
  214. return 1
  215. default:
  216. return 0
  217. }
  218. }
  219. func (fh *FixedHeader) pack() bytes.Buffer {
  220. var header bytes.Buffer
  221. header.WriteByte(fh.MessageType<<4 | boolToByte(fh.Dup)<<3 | fh.Qos<<1 | boolToByte(fh.Retain))
  222. header.Write(encodeLength(fh.RemainingLength))
  223. return header
  224. }
  225. func (fh *FixedHeader) unpack(typeAndFlags byte, r io.Reader) {
  226. fh.MessageType = typeAndFlags >> 4
  227. fh.Dup = (typeAndFlags>>3)&0x01 > 0
  228. fh.Qos = (typeAndFlags >> 1) & 0x03
  229. fh.Retain = typeAndFlags&0x01 > 0
  230. fh.RemainingLength = decodeLength(r)
  231. }
  232. func decodeByte(b io.Reader) byte {
  233. num := make([]byte, 1)
  234. b.Read(num)
  235. return num[0]
  236. }
  237. func decodeUint16(b io.Reader) uint16 {
  238. num := make([]byte, 2)
  239. b.Read(num)
  240. return binary.BigEndian.Uint16(num)
  241. }
  242. func encodeUint16(num uint16) []byte {
  243. bytes := make([]byte, 2)
  244. binary.BigEndian.PutUint16(bytes, num)
  245. return bytes
  246. }
  247. func encodeString(field string) []byte {
  248. fieldLength := make([]byte, 2)
  249. binary.BigEndian.PutUint16(fieldLength, uint16(len(field)))
  250. return append(fieldLength, []byte(field)...)
  251. }
  252. func decodeString(b io.Reader) string {
  253. fieldLength := decodeUint16(b)
  254. field := make([]byte, fieldLength)
  255. b.Read(field)
  256. return string(field)
  257. }
  258. func decodeBytes(b io.Reader) []byte {
  259. fieldLength := decodeUint16(b)
  260. field := make([]byte, fieldLength)
  261. b.Read(field)
  262. return field
  263. }
  264. func encodeBytes(field []byte) []byte {
  265. fieldLength := make([]byte, 2)
  266. binary.BigEndian.PutUint16(fieldLength, uint16(len(field)))
  267. return append(fieldLength, field...)
  268. }
  269. func encodeLength(length int) []byte {
  270. var encLength []byte
  271. for {
  272. digit := byte(length % 128)
  273. length /= 128
  274. if length > 0 {
  275. digit |= 0x80
  276. }
  277. encLength = append(encLength, digit)
  278. if length == 0 {
  279. break
  280. }
  281. }
  282. return encLength
  283. }
  284. func decodeLength(r io.Reader) int {
  285. var rLength uint32
  286. var multiplier uint32
  287. b := make([]byte, 1)
  288. for {
  289. io.ReadFull(r, b)
  290. digit := b[0]
  291. rLength |= uint32(digit&127) << multiplier
  292. if (digit & 128) == 0 {
  293. break
  294. }
  295. multiplier += 7
  296. }
  297. return int(rLength)
  298. }