publish.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Allan Stockdill-Mander
  15. */
  16. package packets
  17. import (
  18. "bytes"
  19. "fmt"
  20. "io"
  21. )
  22. // PublishPacket is an internal representation of the fields of the
  23. // Publish MQTT packet
  24. type PublishPacket struct {
  25. FixedHeader
  26. TopicName string
  27. MessageID uint16
  28. Payload []byte
  29. }
  30. func (p *PublishPacket) String() string {
  31. return fmt.Sprintf("%s topicName: %s MessageID: %d payload: %s", p.FixedHeader, p.TopicName, p.MessageID, string(p.Payload))
  32. }
  33. func (p *PublishPacket) Write(w io.Writer) error {
  34. var body bytes.Buffer
  35. var err error
  36. body.Write(encodeString(p.TopicName))
  37. if p.Qos > 0 {
  38. body.Write(encodeUint16(p.MessageID))
  39. }
  40. p.FixedHeader.RemainingLength = body.Len() + len(p.Payload)
  41. packet := p.FixedHeader.pack()
  42. packet.Write(body.Bytes())
  43. packet.Write(p.Payload)
  44. _, err = w.Write(packet.Bytes())
  45. return err
  46. }
  47. // Unpack decodes the details of a ControlPacket after the fixed
  48. // header has been read
  49. func (p *PublishPacket) Unpack(b io.Reader) error {
  50. var payloadLength = p.FixedHeader.RemainingLength
  51. var err error
  52. p.TopicName, err = decodeString(b)
  53. if err != nil {
  54. return err
  55. }
  56. if p.Qos > 0 {
  57. p.MessageID, err = decodeUint16(b)
  58. if err != nil {
  59. return err
  60. }
  61. payloadLength -= len(p.TopicName) + 4
  62. } else {
  63. payloadLength -= len(p.TopicName) + 2
  64. }
  65. if payloadLength < 0 {
  66. return fmt.Errorf("error unpacking publish, payload length < 0")
  67. }
  68. p.Payload = make([]byte, payloadLength)
  69. _, err = b.Read(p.Payload)
  70. return err
  71. }
  72. // Copy creates a new PublishPacket with the same topic and payload
  73. // but an empty fixed header, useful for when you want to deliver
  74. // a message with different properties such as Qos but the same
  75. // content
  76. func (p *PublishPacket) Copy() *PublishPacket {
  77. newP := NewControlPacket(Publish).(*PublishPacket)
  78. newP.TopicName = p.TopicName
  79. newP.Payload = p.Payload
  80. return newP
  81. }
  82. // Details returns a Details struct containing the Qos and
  83. // MessageID of this ControlPacket
  84. func (p *PublishPacket) Details() Details {
  85. return Details{Qos: p.Qos, MessageID: p.MessageID}
  86. }