mqtt_client.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package client
  2. import (
  3. "crypto/tls"
  4. "fmt"
  5. MQTT "github.com/eclipse/paho.mqtt.golang"
  6. "github.com/gogf/gf/util/guid"
  7. "math/rand"
  8. "net/url"
  9. "sparrow/pkg/server"
  10. "sync"
  11. )
  12. var (
  13. mqttInitOnce sync.Once
  14. mqttClient *MqttClient
  15. mqttSetOnConnectHandler func(cli MQTT.Client)
  16. )
  17. type MqttClient struct {
  18. clients []MQTT.Client
  19. }
  20. func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
  21. mqttInitOnce.Do(func() {
  22. var clients []MQTT.Client
  23. for len(clients) < conf.ConnNum {
  24. var (
  25. mc MQTT.Client
  26. )
  27. for i := 3; i > 0; i-- {
  28. mc, err = initMqttClient(conf)
  29. if err != nil {
  30. server.Log.Errorf("init mqtt client failed: %s", err.Error())
  31. continue
  32. }
  33. break
  34. }
  35. if err != nil {
  36. server.Log.Errorf("mqtt 连接失败: %s", err.Error())
  37. panic(err)
  38. }
  39. clients = append(clients, mc)
  40. var cli = MqttClient{clients: clients}
  41. mqttClient = &cli
  42. }
  43. })
  44. return mqttClient, err
  45. }
  46. func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
  47. opts := MQTT.NewClientOptions()
  48. for _, broker := range conf.Brokers {
  49. opts.AddBroker(broker)
  50. }
  51. opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
  52. SetUsername(conf.User).SetPassword(conf.Password).
  53. SetConnectRetry(true)
  54. opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
  55. server.Log.Infof("connect to %s", broker.String())
  56. return tlsCfg
  57. })
  58. opts.SetOnConnectHandler(func(client MQTT.Client) {
  59. server.Log.Infof("connected success")
  60. })
  61. mc = MQTT.NewClient(opts)
  62. //if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
  63. // server.Log.Errorf("connected failed")
  64. // return
  65. //}
  66. return
  67. }
  68. // Subscribe 订阅消息
  69. func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
  70. var clients = m.clients
  71. if cli != nil {
  72. clients = []MQTT.Client{cli}
  73. }
  74. for _, client := range clients {
  75. err := client.Subscribe(topic, qos, callback).Error()
  76. if err != nil {
  77. server.Log.Errorf("subscribe failed: %s", err.Error())
  78. }
  79. }
  80. return nil
  81. }
  82. // Publish 发布消息
  83. func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
  84. id := rand.Intn(len(m.clients))
  85. return m.clients[id].Publish(topic, qos, retained, payload).Error()
  86. }
  87. func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
  88. mqttSetOnConnectHandler = f
  89. }