broker.go 831 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. package mqtt
  2. import (
  3. "net"
  4. "time"
  5. )
  6. // Broker a mqtt broker
  7. type Broker struct {
  8. mgr *Manager
  9. }
  10. // NewBroker create new broker
  11. func NewBroker(p Provider) *Broker {
  12. // manager
  13. mgr := NewManager(p)
  14. handler := &Broker{mgr: mgr}
  15. return handler
  16. }
  17. // Handle tcp conn handle
  18. func (b *Broker) Handle(conn net.Conn) {
  19. b.mgr.NewConn(conn)
  20. }
  21. // SendMessageToDevice send message to device
  22. func (b *Broker) SendMessageToDevice(deviceid string, msgtype string, message []byte, timeout time.Duration) error {
  23. msg := &Publish{}
  24. msg.Header.QosLevel = QosAtLeastOnce
  25. msg.TopicName = msgtype
  26. msg.Payload = BytesPayload(message)
  27. return b.mgr.PublishMessage2Device(deviceid, msg, timeout)
  28. }
  29. // GetToken get device token with device id
  30. func (b *Broker) GetToken(deviceid string) ([]byte, error) {
  31. return b.mgr.GetToken(deviceid)
  32. }