message.go 15 KB


  1. package mqtt
  2. import (
  3. "bytes"
  4. "errors"
  5. "io"
  6. )
  7. // OoS only support QoS 0
  8. const (
  9. QosAtMostOnce = TagQosLevel(iota)
  10. QosAtLeastOnce
  11. QosExactlyOnce
  12. QosInvalid
  13. )
  14. // Max Payload size
  15. const (
  16. MaxPayloadSize = (1 << (4 * 7)) - 1
  17. )
  18. type TagQosLevel uint8
  19. func (qos TagQosLevel) IsValid() bool {
  20. return qos < QosInvalid && qos >= QosAtMostOnce
  21. }
  22. func (qos TagQosLevel) HasId() bool {
  23. return qos == QosAtLeastOnce || qos == QosExactlyOnce
  24. }
  25. func (qos TagQosLevel) IsAtLeastOnce() bool {
  26. return qos == QosAtLeastOnce
  27. }
  28. func (qos TagQosLevel) IsExactlyOnce() bool {
  29. return qos == QosExactlyOnce
  30. }
  31. // Message Type
  32. const (
  33. MsgConnect = TagMessageType(iota + 1)
  34. MsgConnAck
  35. MsgPublish
  36. MsgPubAck
  37. MsgPubRec
  38. MsgPubRel
  39. MsgPubComp
  40. MsgSubscribe
  41. MsgSubAck
  42. MsgUnsubscribe
  43. MsgUnsubAck
  44. MsgPingReq
  45. MsgPingResp
  46. MsgDisconnect
  47. MsgInvalid
  48. )
  49. // retcode
  50. const (
  51. RetCodeAccepted = TagRetCode(iota)
  52. RetCodeUnacceptableProtocolVersion
  53. RetCodeIdentifierRejected
  54. RetCodeServerUnavailable
  55. RetCodeBadUsernameOrPassword
  56. RetCodeNotAuthorized
  57. RetCodeInvalid
  58. )
  59. type TagRetCode uint8
  60. func (rc TagRetCode) IsValid() bool {
  61. return rc >= RetCodeAccepted && rc < RetCodeInvalid
  62. }
  63. type TagMessageType uint8
  64. func (msg TagMessageType) IsValid() bool {
  65. return msg >= MsgConnect && msg < MsgInvalid
  66. }
  67. // message interface
  68. type Message interface {
  69. Encode(w io.Writer) error
  70. Decode(r io.Reader, hdr Header, packetRemaining int32) error
  71. }
  72. // message fix header
  73. type Header struct {
  74. DupFlag bool
  75. QosLevel TagQosLevel
  76. Retain bool
  77. }
  78. func (hdr *Header) Encode(w io.Writer, msgType TagMessageType, remainingLength int32) error {
  79. buf := new(bytes.Buffer)
  80. err := hdr.EncodeInto(buf, msgType, remainingLength)
  81. if err != nil {
  82. return err
  83. }
  84. _, err = w.Write(buf.Bytes())
  85. return err
  86. }
  87. func (hdr *Header) EncodeInto(buf *bytes.Buffer, msgType TagMessageType, remainingLength int32) error {
  88. if !hdr.QosLevel.IsValid() {
  89. return errors.New("Invalid Qos level")
  90. }
  91. if !msgType.IsValid() {
  92. return errors.New("Invalid MsgType")
  93. }
  94. val := byte(msgType) << 4
  95. val |= (boolToByte(hdr.DupFlag) << 3)
  96. val |= byte(hdr.QosLevel) << 1
  97. val |= boolToByte(hdr.Retain)
  98. buf.WriteByte(val)
  99. encodeLength(remainingLength, buf)
  100. return nil
  101. }
  102. func (hdr *Header) Decode(r io.Reader) (msgType TagMessageType, remainingLength int32, err error) {
  103. var buf [1]byte
  104. if _, err := io.ReadFull(r, buf[:]); err != nil {
  105. return 0, 0, err
  106. }
  107. byte1 := buf[0]
  108. msgType = TagMessageType(byte1 & 0xf0 >> 4)
  109. *hdr = Header{
  110. DupFlag: byte1&0x08 > 0,
  111. QosLevel: TagQosLevel(byte1 & 0x06 >> 1),
  112. Retain: byte1&0x01 > 0,
  113. }
  114. remainingLength, err = decodeLength(r)
  115. return msgType, remainingLength, err
  116. }
  117. func writeMessage(w io.Writer, msgType TagMessageType, hdr *Header, payloadBuf *bytes.Buffer, extraLength int32) error {
  118. totalPayloadLength := int64(len(payloadBuf.Bytes())) + int64(extraLength)
  119. if totalPayloadLength > MaxPayloadSize {
  120. return errors.New("message too long")
  121. }
  122. buf := new(bytes.Buffer)
  123. err := hdr.EncodeInto(buf, msgType, int32(totalPayloadLength))
  124. if err != nil {
  125. return err
  126. }
  127. buf.Write(payloadBuf.Bytes())
  128. _, err = w.Write(buf.Bytes())
  129. return err
  130. }
  131. // Connect represents an MQTT CONNECT message.
  132. type Connect struct {
  133. Header
  134. ProtocolName string
  135. ProtocolVersion uint8
  136. WillRetain bool
  137. WillFlag bool
  138. CleanSession bool
  139. WillQos TagQosLevel
  140. KeepAliveTimer uint16
  141. ClientID string
  142. WillTopic, WillMessage string
  143. UsernameFlag, PasswordFlag bool
  144. Username, Password string
  145. }
  146. func (msg *Connect) Encode(w io.Writer) (err error) {
  147. if msg.WillQos > QosInvalid {
  148. return errors.New("invalid Qos")
  149. }
  150. buf := new(bytes.Buffer)
  151. flags := boolToByte(msg.UsernameFlag) << 7
  152. flags |= boolToByte(msg.PasswordFlag) << 6
  153. flags |= boolToByte(msg.WillRetain) << 5
  154. flags |= byte(msg.WillQos) << 3
  155. flags |= boolToByte(msg.WillFlag) << 2
  156. flags |= boolToByte(msg.CleanSession) << 1
  157. setString(msg.ProtocolName, buf)
  158. setUint8(msg.ProtocolVersion, buf)
  159. buf.WriteByte(flags)
  160. setUint16(msg.KeepAliveTimer, buf)
  161. setString(msg.ClientID, buf)
  162. if msg.WillFlag {
  163. setString(msg.WillTopic, buf)
  164. setString(msg.WillMessage, buf)
  165. }
  166. if msg.UsernameFlag {
  167. setString(msg.Username, buf)
  168. }
  169. if msg.PasswordFlag {
  170. setString(msg.Password, buf)
  171. }
  172. return writeMessage(w, MsgConnect, &msg.Header, buf, 0)
  173. }
  174. func (msg *Connect) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  175. protocolName, err := getString(r, &packetRemaining)
  176. if err != nil {
  177. return err
  178. }
  179. protocolVersion, err := getUint8(r, &packetRemaining)
  180. if err != nil {
  181. return err
  182. }
  183. flags, err := getUint8(r, &packetRemaining)
  184. if err != nil {
  185. return err
  186. }
  187. keepAliveTimer, err := getUint16(r, &packetRemaining)
  188. if err != nil {
  189. return err
  190. }
  191. ClientID, err := getString(r, &packetRemaining)
  192. if err != nil {
  193. return err
  194. }
  195. *msg = Connect{
  196. Header: hdr,
  197. ProtocolName: protocolName,
  198. ProtocolVersion: protocolVersion,
  199. UsernameFlag: flags&0x80 > 0,
  200. PasswordFlag: flags&0x40 > 0,
  201. WillRetain: flags&0x20 > 0,
  202. WillQos: TagQosLevel(flags & 0x18 >> 3),
  203. WillFlag: flags&0x04 > 0,
  204. CleanSession: flags&0x02 > 0,
  205. KeepAliveTimer: keepAliveTimer,
  206. ClientID: ClientID,
  207. }
  208. if msg.WillFlag {
  209. msg.WillTopic, err = getString(r, &packetRemaining)
  210. if err != nil {
  211. return err
  212. }
  213. msg.WillMessage, err = getString(r, &packetRemaining)
  214. if err != nil {
  215. return err
  216. }
  217. }
  218. if msg.UsernameFlag {
  219. msg.Username, err = getString(r, &packetRemaining)
  220. if err != nil {
  221. return err
  222. }
  223. }
  224. if msg.PasswordFlag {
  225. msg.Password, err = getString(r, &packetRemaining)
  226. if err != nil {
  227. return err
  228. }
  229. }
  230. if packetRemaining != 0 {
  231. return errors.New("message too long")
  232. }
  233. return nil
  234. }
  235. // ConnAck represents an MQTT CONNACK message.
  236. type ConnAck struct {
  237. Header
  238. ReturnCode TagRetCode
  239. }
  240. func (msg *ConnAck) Encode(w io.Writer) (err error) {
  241. buf := new(bytes.Buffer)
  242. buf.WriteByte(byte(0)) // Reserved byte.
  243. setUint8(uint8(msg.ReturnCode), buf)
  244. return writeMessage(w, MsgConnAck, &msg.Header, buf, 0)
  245. }
  246. func (msg *ConnAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  247. msg.Header = hdr
  248. _, err = getUint8(r, &packetRemaining) // Skip reserved byte.
  249. if err != nil {
  250. return err
  251. }
  252. code, err := getUint8(r, &packetRemaining)
  253. if err != nil {
  254. return err
  255. }
  256. msg.ReturnCode = TagRetCode(code)
  257. if !msg.ReturnCode.IsValid() {
  258. return errors.New("invliad retcode")
  259. }
  260. if packetRemaining != 0 {
  261. return errors.New("message too long")
  262. }
  263. return nil
  264. }
  265. // Publish represents an MQTT PUBLISH message.
  266. type Publish struct {
  267. Header
  268. TopicName string
  269. MessageID uint16
  270. Payload Payload
  271. }
  272. func (msg *Publish) Encode(w io.Writer) (err error) {
  273. buf := new(bytes.Buffer)
  274. setString(msg.TopicName, buf)
  275. if msg.Header.QosLevel.HasId() {
  276. setUint16(msg.MessageID, buf)
  277. }
  278. if err = msg.Payload.WritePayload(buf); err != nil {
  279. return err
  280. }
  281. if err = writeMessage(w, MsgPublish, &msg.Header, buf, int32(0)); err != nil {
  282. return err
  283. }
  284. return nil
  285. }
  286. func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  287. msg.Header = hdr
  288. msg.TopicName, err = getString(r, &packetRemaining)
  289. if err != nil {
  290. return err
  291. }
  292. if msg.Header.QosLevel.HasId() {
  293. msg.MessageID, err = getUint16(r, &packetRemaining)
  294. if err != nil {
  295. return err
  296. }
  297. }
  298. payloadReader := &io.LimitedReader{r, int64(packetRemaining)}
  299. msg.Payload = make(BytesPayload, int(packetRemaining))
  300. return msg.Payload.ReadPayload(payloadReader, int(packetRemaining))
  301. }
  302. // PubAck represents an MQTT PUBACK message.
  303. type PubAck struct {
  304. Header
  305. MessageID uint16
  306. }
  307. func (msg *PubAck) Encode(w io.Writer) error {
  308. return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubAck)
  309. }
  310. func (msg *PubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  311. msg.Header = hdr
  312. return decodeAckCommon(r, packetRemaining, &msg.MessageID)
  313. }
  314. // PubRec represents an MQTT PUBREC message.
  315. type PubRec struct {
  316. Header
  317. MessageID uint16
  318. }
  319. func (msg *PubRec) Encode(w io.Writer) error {
  320. return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubRec)
  321. }
  322. func (msg *PubRec) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  323. msg.Header = hdr
  324. return decodeAckCommon(r, packetRemaining, &msg.MessageID)
  325. }
  326. // PubRel represents an MQTT PUBREL message.
  327. type PubRel struct {
  328. Header
  329. MessageID uint16
  330. }
  331. func (msg *PubRel) Encode(w io.Writer) error {
  332. return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubRel)
  333. }
  334. func (msg *PubRel) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  335. msg.Header = hdr
  336. return decodeAckCommon(r, packetRemaining, &msg.MessageID)
  337. }
  338. // PubComp represents an MQTT PUBCOMP message.
  339. type PubComp struct {
  340. Header
  341. MessageID uint16
  342. }
  343. func (msg *PubComp) Encode(w io.Writer) error {
  344. return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubComp)
  345. }
  346. func (msg *PubComp) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  347. msg.Header = hdr
  348. return decodeAckCommon(r, packetRemaining, &msg.MessageID)
  349. }
  350. // Subscribe represents an MQTT SUBSCRIBE message.
  351. type Subscribe struct {
  352. Header
  353. MessageID uint16
  354. Topics []TopicQos
  355. }
  356. type TopicQos struct {
  357. Topic string
  358. Qos TagQosLevel
  359. }
  360. func (msg *Subscribe) Encode(w io.Writer) (err error) {
  361. buf := new(bytes.Buffer)
  362. if msg.Header.QosLevel.HasId() {
  363. setUint16(msg.MessageID, buf)
  364. }
  365. for _, topicSub := range msg.Topics {
  366. setString(topicSub.Topic, buf)
  367. setUint8(uint8(topicSub.Qos), buf)
  368. }
  369. return writeMessage(w, MsgSubscribe, &msg.Header, buf, 0)
  370. }
  371. func (msg *Subscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  372. msg.Header = hdr
  373. if msg.Header.QosLevel.HasId() {
  374. msg.MessageID, err = getUint16(r, &packetRemaining)
  375. if err != nil {
  376. return err
  377. }
  378. }
  379. var topics []TopicQos
  380. for packetRemaining > 0 {
  381. topic, err := getString(r, &packetRemaining)
  382. if err != nil {
  383. return err
  384. }
  385. qos, err := getUint8(r, &packetRemaining)
  386. if err != nil {
  387. return err
  388. }
  389. topics = append(topics, TopicQos{
  390. Topic: topic,
  391. Qos: TagQosLevel(qos),
  392. })
  393. }
  394. msg.Topics = topics
  395. return nil
  396. }
  397. // SubAck represents an MQTT SUBACK message.
  398. type SubAck struct {
  399. Header
  400. MessageID uint16
  401. TopicsQos []TagQosLevel
  402. }
  403. func (msg *SubAck) Encode(w io.Writer) (err error) {
  404. buf := new(bytes.Buffer)
  405. setUint16(msg.MessageID, buf)
  406. for i := 0; i < len(msg.TopicsQos); i += 1 {
  407. setUint8(uint8(msg.TopicsQos[i]), buf)
  408. }
  409. return writeMessage(w, MsgSubAck, &msg.Header, buf, 0)
  410. }
  411. func (msg *SubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  412. msg.Header = hdr
  413. msg.MessageID, err = getUint16(r, &packetRemaining)
  414. if err != nil {
  415. return err
  416. }
  417. topicsQos := make([]TagQosLevel, 0)
  418. for packetRemaining > 0 {
  419. qos, err := getUint8(r, &packetRemaining)
  420. if err != nil {
  421. return err
  422. }
  423. grantedQos := TagQosLevel(qos & 0x03)
  424. topicsQos = append(topicsQos, grantedQos)
  425. }
  426. msg.TopicsQos = topicsQos
  427. return nil
  428. }
  429. // Unsubscribe represents an MQTT UNSUBSCRIBE message.
  430. type Unsubscribe struct {
  431. Header
  432. MessageID uint16
  433. Topics []string
  434. }
  435. func (msg *Unsubscribe) Encode(w io.Writer) (err error) {
  436. buf := new(bytes.Buffer)
  437. if msg.Header.QosLevel.HasId() {
  438. setUint16(msg.MessageID, buf)
  439. }
  440. for _, topic := range msg.Topics {
  441. setString(topic, buf)
  442. }
  443. return writeMessage(w, MsgUnsubscribe, &msg.Header, buf, 0)
  444. }
  445. func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  446. msg.Header = hdr
  447. if msg.Header.QosLevel.HasId() {
  448. msg.MessageID, err = getUint16(r, &packetRemaining)
  449. if err != nil {
  450. return err
  451. }
  452. }
  453. topics := make([]string, 0)
  454. for packetRemaining > 0 {
  455. topic, err := getString(r, &packetRemaining)
  456. if err != nil {
  457. return err
  458. }
  459. topics = append(topics, topic)
  460. }
  461. msg.Topics = topics
  462. return nil
  463. }
  464. // UnsubAck represents an MQTT UNSUBACK message.
  465. type UnsubAck struct {
  466. Header
  467. MessageID uint16
  468. }
  469. func (msg *UnsubAck) Encode(w io.Writer) error {
  470. return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgUnsubAck)
  471. }
  472. func (msg *UnsubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
  473. msg.Header = hdr
  474. return decodeAckCommon(r, packetRemaining, &msg.MessageID)
  475. }
  476. // PingReq represents an MQTT PINGREQ message.
  477. type PingReq struct {
  478. Header
  479. }
  480. func (msg *PingReq) Encode(w io.Writer) error {
  481. return msg.Header.Encode(w, MsgPingReq, 0)
  482. }
  483. func (msg *PingReq) Decode(r io.Reader, hdr Header, packetRemaining int32) error {
  484. if packetRemaining != 0 {
  485. return errors.New("msg too long")
  486. }
  487. return nil
  488. }
  489. // PingResp represents an MQTT PINGRESP message.
  490. type PingResp struct {
  491. Header
  492. }
  493. func (msg *PingResp) Encode(w io.Writer) error {
  494. return msg.Header.Encode(w, MsgPingResp, 0)
  495. }
  496. func (msg *PingResp) Decode(r io.Reader, hdr Header, packetRemaining int32) error {
  497. if packetRemaining != 0 {
  498. return errors.New("msg too long")
  499. }
  500. return nil
  501. }
  502. // Disconnect represents an MQTT DISCONNECT message.
  503. type Disconnect struct {
  504. Header
  505. }
  506. func (msg *Disconnect) Encode(w io.Writer) error {
  507. return msg.Header.Encode(w, MsgDisconnect, 0)
  508. }
  509. func (msg *Disconnect) Decode(r io.Reader, hdr Header, packetRemaining int32) error {
  510. if packetRemaining != 0 {
  511. return errors.New("msg too long")
  512. }
  513. return nil
  514. }
  515. func encodeAckCommon(w io.Writer, hdr *Header, MessageID uint16, msgType TagMessageType) error {
  516. buf := new(bytes.Buffer)
  517. setUint16(MessageID, buf)
  518. return writeMessage(w, msgType, hdr, buf, 0)
  519. }
  520. func decodeAckCommon(r io.Reader, packetRemaining int32, MessageID *uint16) (err error) {
  521. *MessageID, err = getUint16(r, &packetRemaining)
  522. if err != nil {
  523. return err
  524. }
  525. if packetRemaining != 0 {
  526. return errors.New("msg too long")
  527. }
  528. return nil
  529. }
  530. // DecodeOneMessage decodes one message from r. config provides specifics on
  531. // how to decode messages, nil indicates that the DefaultDecoderConfig should
  532. // be used.
  533. func DecodeOneMessage(r io.Reader) (msg Message, err error) {
  534. var hdr Header
  535. var msgType TagMessageType
  536. var packetRemaining int32
  537. msgType, packetRemaining, err = hdr.Decode(r)
  538. if err != nil {
  539. return nil, err
  540. }
  541. msg, err = NewMessage(msgType)
  542. if err != nil {
  543. return nil, err
  544. }
  545. return msg, msg.Decode(r, hdr, packetRemaining)
  546. }
  547. func NewMessage(msgType TagMessageType) (msg Message, err error) {
  548. switch msgType {
  549. case MsgConnect:
  550. msg = new(Connect)
  551. case MsgConnAck:
  552. msg = new(ConnAck)
  553. case MsgPublish:
  554. msg = new(Publish)
  555. case MsgPubAck:
  556. msg = new(PubAck)
  557. case MsgPubRec:
  558. msg = new(PubRec)
  559. case MsgPubRel:
  560. msg = new(PubRel)
  561. case MsgPubComp:
  562. msg = new(PubComp)
  563. case MsgSubscribe:
  564. msg = new(Subscribe)
  565. case MsgUnsubAck:
  566. msg = new(UnsubAck)
  567. case MsgSubAck:
  568. msg = new(SubAck)
  569. case MsgUnsubscribe:
  570. msg = new(Unsubscribe)
  571. case MsgPingReq:
  572. msg = new(PingReq)
  573. case MsgPingResp:
  574. msg = new(PingResp)
  575. case MsgDisconnect:
  576. msg = new(Disconnect)
  577. default:
  578. return nil, errors.New("msgType error")
  579. }
  580. return msg, nil
  581. }