package client import ( "crypto/tls" "fmt" MQTT "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/util/guid" "math/rand" "net/url" "sparrow/pkg/server" "time" "sync" ) var ( mqttInitOnce sync.Once mqttClient *MqttClient mqttSetOnConnectHandler func(cli MQTT.Client) ) type MqttClient struct { clients []MQTT.Client } func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) { mqttInitOnce.Do(func() { var clients []MQTT.Client for len(clients) < conf.ConnNum { var ( mc MQTT.Client ) for i := 3; i > 0; i-- { mc, err = initMqttClient(conf) if err != nil { server.Log.Errorf("init mqtt client failed: %s", err.Error()) continue } break } if err != nil { server.Log.Errorf("mqtt 连接失败: %s", err.Error()) panic(err) } clients = append(clients, mc) var cli = MqttClient{clients: clients} mqttClient = &cli } }) return mqttClient, err } func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) { opts := MQTT.NewClientOptions() for _, broker := range conf.Brokers { opts.AddBroker(broker) } opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())). SetUsername(conf.User). SetPassword(conf.Password). SetConnectRetry(true). SetCleanSession(true). SetKeepAlive(120) opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config { server.Log.Infof("connect to %s", broker.String()) return tlsCfg }) opts.SetOnConnectHandler(func(client MQTT.Client) { if mqttSetOnConnectHandler != nil { mqttSetOnConnectHandler(client) } server.Log.Infof("connected success") }) mc = MQTT.NewClient(opts) if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false { server.Log.Errorf("connected failed") return } return } // Subscribe 订阅消息 func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error { var clients = m.clients if cli != nil { clients = []MQTT.Client{cli} } for _, client := range clients { err := client.Subscribe(topic, qos, callback).Error() if err != nil { server.Log.Errorf("subscribe failed: %s", err.Error()) } } return nil } // Publish 发布消息 func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error { id := rand.Intn(len(m.clients)) return m.clients[id].Publish(topic, qos, retained, payload).Error() } func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) { mqttSetOnConnectHandler = f }