mqtt_client.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package client
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/gogf/gf/util/guid"
  7. "math/rand"
  8. "net/url"
  9. "sparrow/pkg/server"
  10. "time"
  11. "sync"
  12. )
  13. var (
  14. mqttInitOnce sync.Once
  15. mqttClient *MqttClient
  16. mqttSetOnConnectHandler func(cli MQTT.Client)
  17. )
  18. type MqttClient struct {
  19. clients []MQTT.Client
  20. }
  21. func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
  22. mqttInitOnce.Do(func() {
  23. var clients []MQTT.Client
  24. for len(clients) < conf.ConnNum {
  25. var (
  26. mc MQTT.Client
  27. )
  28. for i := 3; i > 0; i-- {
  29. mc, err = initMqttClient(conf)
  30. if err != nil {
  31. server.Log.Errorf("init mqtt client failed: %s", err.Error())
  32. continue
  33. }
  34. break
  35. }
  36. if err != nil {
  37. server.Log.Errorf("mqtt 连接失败: %s", err.Error())
  38. panic(err)
  39. }
  40. clients = append(clients, mc)
  41. var cli = MqttClient{clients: clients}
  42. mqttClient = &cli
  43. }
  44. })
  45. return mqttClient, err
  46. }
  47. func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
  48. opts := MQTT.NewClientOptions()
  49. for _, broker := range conf.Brokers {
  50. opts.AddBroker(broker)
  51. }
  52. opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
  53. SetUsername(conf.User).SetPassword(conf.Password).
  54. SetConnectRetry(true)
  55. opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
  56. server.Log.Infof("connect to %s", broker.String())
  57. return tlsCfg
  58. })
  59. opts.SetOnConnectHandler(func(client MQTT.Client) {
  60. server.Log.Infof("connected success")
  61. })
  62. mc = MQTT.NewClient(opts)
  63. if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
  64. server.Log.Errorf("connected failed")
  65. return
  66. }
  67. return
  68. }
  69. // Subscribe 订阅消息
  70. func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
  71. var clients = m.clients
  72. if cli != nil {
  73. clients = []MQTT.Client{cli}
  74. }
  75. for _, client := range clients {
  76. err := client.Subscribe(topic, qos, callback).Error()
  77. if err != nil {
  78. server.Log.Errorf("subscribe failed: %s", err.Error())
  79. }
  80. }
  81. return nil
  82. }
  83. // Publish 发布消息
  84. func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
  85. id := rand.Intn(len(m.clients))
  86. return m.clients[id].Publish(topic, qos, retained, payload).Error()
  87. }
  88. func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
  89. mqttSetOnConnectHandler = f
  90. }