mqtt_client.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package client
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/gogf/gf/util/guid"
  7. "math/rand"
  8. "net/url"
  9. "sparrow/pkg/server"
  10. "time"
  11. "sync"
  12. )
  13. var (
  14. mqttInitOnce sync.Once
  15. mqttClient *MqttClient
  16. mqttSetOnConnectHandler func(cli MQTT.Client)
  17. )
  18. type MqttClient struct {
  19. clients []MQTT.Client
  20. }
  21. func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
  22. mqttInitOnce.Do(func() {
  23. var clients []MQTT.Client
  24. for len(clients) < conf.ConnNum {
  25. var (
  26. mc MQTT.Client
  27. )
  28. for i := 3; i > 0; i-- {
  29. mc, err = initMqttClient(conf)
  30. if err != nil {
  31. server.Log.Errorf("init mqtt client failed: %s", err.Error())
  32. continue
  33. }
  34. break
  35. }
  36. if err != nil {
  37. server.Log.Errorf("mqtt 连接失败: %s", err.Error())
  38. panic(err)
  39. }
  40. clients = append(clients, mc)
  41. var cli = MqttClient{clients: clients}
  42. mqttClient = &cli
  43. }
  44. })
  45. return mqttClient, err
  46. }
  47. func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
  48. opts := MQTT.NewClientOptions()
  49. for _, broker := range conf.Brokers {
  50. opts.AddBroker(broker)
  51. }
  52. opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
  53. SetUsername(conf.User).
  54. SetPassword(conf.Password).
  55. SetConnectRetry(true).
  56. SetCleanSession(true)
  57. opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
  58. server.Log.Infof("connect to %s", broker.String())
  59. return tlsCfg
  60. })
  61. opts.SetConnectionLostHandler(func(client MQTT.Client, reason error) {
  62. server.Log.Errorf("connection lost: %s", reason.Error())
  63. })
  64. opts.SetOnConnectHandler(func(client MQTT.Client) {
  65. if mqttSetOnConnectHandler != nil {
  66. mqttSetOnConnectHandler(client)
  67. }
  68. server.Log.Infof("connected success")
  69. })
  70. mc = MQTT.NewClient(opts)
  71. if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
  72. server.Log.Errorf("connected failed")
  73. return
  74. }
  75. return
  76. }
  77. // Subscribe 订阅消息
  78. func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
  79. var clients = m.clients
  80. if cli != nil {
  81. clients = []MQTT.Client{cli}
  82. }
  83. for _, client := range clients {
  84. err := client.Subscribe(topic, qos, callback).Error()
  85. if err != nil {
  86. server.Log.Errorf("subscribe failed: %s", err.Error())
  87. }
  88. }
  89. return nil
  90. }
  91. // Publish 发布消息
  92. func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
  93. id := rand.Intn(len(m.clients))
  94. return m.clients[id].Publish(topic, qos, retained, payload).Error()
  95. }
  96. func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
  97. mqttSetOnConnectHandler = f
  98. }