message.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. /*
  2. * Copyright (c) 2021 IBM Corp and others.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Seth Hoenig
  15. * Allan Stockdill-Mander
  16. * Mike Robertson
  17. */
  18. package mqtt
  19. import (
  20. "net/url"
  21. "sync"
  22. "github.com/eclipse/paho.mqtt.golang/packets"
  23. )
  24. // Message defines the externals that a message implementation must support
  25. // these are received messages that are passed to the callbacks, not internal
  26. // messages
  27. type Message interface {
  28. Duplicate() bool
  29. Qos() byte
  30. Retained() bool
  31. Topic() string
  32. MessageID() uint16
  33. Payload() []byte
  34. Ack()
  35. }
  36. type message struct {
  37. duplicate bool
  38. qos byte
  39. retained bool
  40. topic string
  41. messageID uint16
  42. payload []byte
  43. once sync.Once
  44. ack func()
  45. }
  46. func (m *message) Duplicate() bool {
  47. return m.duplicate
  48. }
  49. func (m *message) Qos() byte {
  50. return m.qos
  51. }
  52. func (m *message) Retained() bool {
  53. return m.retained
  54. }
  55. func (m *message) Topic() string {
  56. return m.topic
  57. }
  58. func (m *message) MessageID() uint16 {
  59. return m.messageID
  60. }
  61. func (m *message) Payload() []byte {
  62. return m.payload
  63. }
  64. func (m *message) Ack() {
  65. m.once.Do(m.ack)
  66. }
  67. func messageFromPublish(p *packets.PublishPacket, ack func()) Message {
  68. return &message{
  69. duplicate: p.Dup,
  70. qos: p.Qos,
  71. retained: p.Retain,
  72. topic: p.TopicName,
  73. messageID: p.MessageID,
  74. payload: p.Payload,
  75. ack: ack,
  76. }
  77. }
  78. func newConnectMsgFromOptions(options *ClientOptions, broker *url.URL) *packets.ConnectPacket {
  79. m := packets.NewControlPacket(packets.Connect).(*packets.ConnectPacket)
  80. m.CleanSession = options.CleanSession
  81. m.WillFlag = options.WillEnabled
  82. m.WillRetain = options.WillRetained
  83. m.ClientIdentifier = options.ClientID
  84. if options.WillEnabled {
  85. m.WillQos = options.WillQos
  86. m.WillTopic = options.WillTopic
  87. m.WillMessage = options.WillPayload
  88. }
  89. username := options.Username
  90. password := options.Password
  91. if broker.User != nil {
  92. username = broker.User.Username()
  93. if pwd, ok := broker.User.Password(); ok {
  94. password = pwd
  95. }
  96. }
  97. if options.CredentialsProvider != nil {
  98. username, password = options.CredentialsProvider()
  99. }
  100. if username != "" {
  101. m.UsernameFlag = true
  102. m.Username = username
  103. // mustn't have password without user as well
  104. if password != "" {
  105. m.PasswordFlag = true
  106. m.Password = []byte(password)
  107. }
  108. }
  109. m.Keepalive = uint16(options.KeepAlive)
  110. return m
  111. }