123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- package client
- import (
- "crypto/tls"
- "fmt"
- MQTT "github.com/eclipse/paho.mqtt.golang"
- "github.com/gogf/gf/util/guid"
- "math/rand"
- "net/url"
- "sparrow/pkg/server"
- "time"
- "sync"
- )
- var (
- mqttInitOnce sync.Once
- mqttClient *MqttClient
- mqttSetOnConnectHandler func(cli MQTT.Client)
- )
- type MqttClient struct {
- clients []MQTT.Client
- }
- func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
- mqttInitOnce.Do(func() {
- var clients []MQTT.Client
- for len(clients) < conf.ConnNum {
- var (
- mc MQTT.Client
- )
- for i := 3; i > 0; i-- {
- mc, err = initMqttClient(conf)
- if err != nil {
- server.Log.Errorf("init mqtt client failed: %s", err.Error())
- continue
- }
- break
- }
- if err != nil {
- server.Log.Errorf("mqtt 连接失败: %s", err.Error())
- panic(err)
- }
- clients = append(clients, mc)
- var cli = MqttClient{clients: clients}
- mqttClient = &cli
- }
- })
- return mqttClient, err
- }
- func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
- opts := MQTT.NewClientOptions()
- for _, broker := range conf.Brokers {
- opts.AddBroker(broker)
- }
- opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
- SetUsername(conf.User).
- SetPassword(conf.Password).
- SetConnectRetry(true).
- SetCleanSession(true)
- opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
- server.Log.Infof("connect to %s", broker.String())
- return tlsCfg
- })
- opts.SetConnectionLostHandler(func(client MQTT.Client, reason error) {
- server.Log.Errorf("connection lost: %s", reason.Error())
- })
- opts.SetOnConnectHandler(func(client MQTT.Client) {
- if mqttSetOnConnectHandler != nil {
- mqttSetOnConnectHandler(client)
- }
- server.Log.Infof("connected success")
- })
- mc = MQTT.NewClient(opts)
- if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
- server.Log.Errorf("connected failed")
- return
- }
- return
- }
- // Subscribe 订阅消息
- func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
- var clients = m.clients
- if cli != nil {
- clients = []MQTT.Client{cli}
- }
- for _, client := range clients {
- err := client.Subscribe(topic, qos, callback).Error()
- if err != nil {
- server.Log.Errorf("subscribe failed: %s", err.Error())
- }
- }
- return nil
- }
- // Publish 发布消息
- func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
- id := rand.Intn(len(m.clients))
- return m.clients[id].Publish(topic, qos, retained, payload).Error()
- }
- func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
- mqttSetOnConnectHandler = f
- }
|