mqtt_client.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. package client
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/gogf/gf/util/guid"
  7. "math/rand"
  8. "net/url"
  9. "sparrow/pkg/server"
  10. "time"
  11. "sync"
  12. )
  13. var (
  14. mqttInitOnce sync.Once
  15. mqttClient *MqttClient
  16. mqttSetOnConnectHandler func(cli MQTT.Client)
  17. )
  18. type MqttClient struct {
  19. clients []MQTT.Client
  20. }
  21. func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
  22. mqttInitOnce.Do(func() {
  23. var clients []MQTT.Client
  24. for len(clients) < conf.ConnNum {
  25. var (
  26. mc MQTT.Client
  27. )
  28. for i := 3; i > 0; i-- {
  29. mc, err = initMqttClient(conf)
  30. if err != nil {
  31. server.Log.Errorf("init mqtt client failed: %s", err.Error())
  32. continue
  33. }
  34. break
  35. }
  36. if err != nil {
  37. server.Log.Errorf("mqtt 连接失败: %s", err.Error())
  38. panic(err)
  39. }
  40. clients = append(clients, mc)
  41. var cli = MqttClient{clients: clients}
  42. mqttClient = &cli
  43. }
  44. })
  45. return mqttClient, err
  46. }
  47. func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
  48. opts := MQTT.NewClientOptions()
  49. for _, broker := range conf.Brokers {
  50. opts.AddBroker(broker)
  51. }
  52. opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
  53. SetUsername(conf.User).
  54. SetPassword(conf.Password).
  55. SetConnectRetry(true).
  56. SetCleanSession(true).
  57. SetKeepAlive(120)
  58. opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
  59. server.Log.Infof("connect to %s", broker.String())
  60. return tlsCfg
  61. })
  62. opts.SetOnConnectHandler(func(client MQTT.Client) {
  63. if mqttSetOnConnectHandler != nil {
  64. mqttSetOnConnectHandler(client)
  65. }
  66. server.Log.Infof("connected success")
  67. })
  68. mc = MQTT.NewClient(opts)
  69. if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
  70. server.Log.Errorf("connected failed")
  71. return
  72. }
  73. return
  74. }
  75. // Subscribe 订阅消息
  76. func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
  77. var clients = m.clients
  78. if cli != nil {
  79. clients = []MQTT.Client{cli}
  80. }
  81. for _, client := range clients {
  82. err := client.Subscribe(topic, qos, callback).Error()
  83. if err != nil {
  84. server.Log.Errorf("subscribe failed: %s", err.Error())
  85. }
  86. }
  87. return nil
  88. }
  89. // Publish 发布消息
  90. func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
  91. id := rand.Intn(len(m.clients))
  92. return m.clients[id].Publish(topic, qos, retained, payload).Error()
  93. }
  94. func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
  95. mqttSetOnConnectHandler = f
  96. }