package main import ( "context" "encoding/json" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/os/grpool" "runtime" "runtime/debug" "sparrow/pkg/protocol" "sparrow/pkg/server" "sparrow/services/emqx-agent/client" "strings" "time" ) type SubDev interface { SubDevMsg(handle Handle) error PublishToMsgToDev(topic string, payload []byte) error } type ConnectMsg struct { Username string `json:"username"` Ts int64 `json:"ts"` Sockport int `json:"sockport"` ProtoVer int `json:"proto_ver"` ProtoName string `json:"proto_name"` Keepalive int `json:"keepalive"` Ipaddress string `json:"ipaddress"` ExpiryInterval int `json:"expiry_interval"` ConnectedAt int64 `json:"connected_at"` Connack int `json:"connack"` Clientid string `json:"clientid"` Reason string `json:"reason"` CleanStart bool `json:"clean_start"` } type Handle func(ctx context.Context) DevSubHandle type DevSubHandle interface { Message(topic string, payload []byte) error Connected(status *protocol.DevConnectStatus) error Disconnected(status *protocol.DevConnectStatus) error } type MqttClient struct { client *client.MqttClient handlePool *grpool.Pool lockDevices map[string]*Device } func (d *MqttClient) PublishToMsgToDev(topic string, payload []byte) error { return d.client.Publish(topic, 1, false, payload) } const ( ShareSubTopicPrefix = "$share/sparrow.agent/" TopicConnectStatus = ShareSubTopicPrefix + "$SYS/brokers/+/clients/#" TopicThing = ShareSubTopicPrefix + protocol.TopicHeadThing + "/up/#" TopicOta = ShareSubTopicPrefix + protocol.TopicHeadOta + "/up/#" TopicConfig = ShareSubTopicPrefix + protocol.TopicHeadConfig + "/up/#" TopicSDKLog = ShareSubTopicPrefix + protocol.TopicHeadLog + "/up/#" TopicShadow = ShareSubTopicPrefix + protocol.TopicHeadShadow + "/up/#" TopicGateway = ShareSubTopicPrefix + protocol.TopicHeadGateway + "/up/#" TopicExt = ShareSubTopicPrefix + protocol.TopicHeadExt + "/up/#" TopicEvent = ShareSubTopicPrefix + protocol.TopicHeadEvent + "/up/#" ) func NewSubDev(conf *client.MqttConfig) (SubDev, error) { return newEmqClient(conf) } func newEmqClient(conf *client.MqttConfig) (SubDev, error) { mc, err := client.NewMqttClient(conf) if err != nil { return nil, err } return &MqttClient{ client: mc, handlePool: grpool.New(1000), }, nil } func (d *MqttClient) SubDevMsg(handle Handle) error { err := d.subDevMsg(nil, handle) if err != nil { return err } client.SetMqttSetOnConnectHandler(func(cli mqtt.Client) { err := d.subDevMsg(cli, handle) if err != nil { server.Log.Errorf("mqttSetOnConnectHandler.subDevMsg err:%v", err) } }) return nil } func (d *MqttClient) subDevMsg(cli mqtt.Client, handle Handle) error { err := d.subscribeWithFunc(cli, TopicConnectStatus, d.subConnectStatus(handle)) if err != nil { server.Log.Infof("subDevMsg err:%v", err) return err } err = d.subscribeWithFunc(cli, TopicThing, func(ctx context.Context, topic string, payload []byte) error { return handle(ctx).Message(topic, payload) }) if err != nil { return err } //err = d.subscribeWithFunc(cli, TopicConfig, func(ctx context.Context, topic string, payload []byte) error { // return handle(ctx).Message(topic, payload) //}) //if err != nil { // return err //} err = d.subscribeWithFunc(cli, TopicOta, func(ctx context.Context, topic string, payload []byte) error { return handle(ctx).Message(topic, payload) }) if err != nil { return err } //err = d.subscribeWithFunc(cli, TopicExt, func(ctx context.Context, topic string, payload []byte) error { // return handle(ctx).Message(topic, payload) //}) //if err != nil { // return err //} //err = d.subscribeWithFunc(cli, TopicShadow, func(ctx context.Context, topic string, payload []byte) error { // return handle(ctx).Message(topic, payload) //}) // //if err != nil { // return err //} //err = d.subscribeWithFunc(cli, TopicGateway, func(ctx context.Context, topic string, payload []byte) error { // return handle(ctx).Message(topic, payload) //}) //if err != nil { // return err //} //err = d.subscribeWithFunc(cli, TopicSDKLog, func(ctx context.Context, topic string, payload []byte) error { // return handle(ctx).Message(topic, payload) //}) //if err != nil { // return err //} err = d.subscribeWithFunc(cli, TopicEvent, func(ctx context.Context, topic string, payload []byte) error { return handle(ctx).Message(topic, payload) }) if err != nil { return err } return nil } func (d *MqttClient) subConnectStatus(handle Handle) func(ctx context.Context, topic string, payload []byte) error { return func(ctx context.Context, topic string, payload []byte) error { var ( msg ConnectMsg err error ) err = json.Unmarshal(payload, &msg) if err != nil { server.Log.Errorf("json.Unmarshal err :%s, topic :%v", err, topic) return err } status := protocol.DevConnectStatus{ DeviceCode: msg.Username, DeviceId: msg.Clientid, ClientIp: msg.Ipaddress, } if strings.HasSuffix(topic, "/connected") || strings.HasSuffix(topic, "/subscribed") { status.Action = "LOGIN" return handle(ctx).Connected(&status) } else { status.Action = "LOGOUT" status.Reason = msg.Reason return handle(ctx).Disconnected(&status) } } } func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string, handle func(ctx context.Context, topic string, payload []byte) error) error { return d.client.Subscribe(cli, topic, 1, func(c mqtt.Client, message mqtt.Message) { _ = d.handlePool.Add(func() { go func() { ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second) defer cancel() Recover(ctx) err := handle(ctx, message.Topic(), message.Payload()) if err != nil { server.Log.Errorf("handle failure err :%s, topic :%v", err, topic) } }() }) }) } func Recover(ctx context.Context) { if p := recover(); p != nil { HandleThrow(ctx, p) } } func HandleThrow(ctx context.Context, p any) { pc := make([]uintptr, 1) runtime.Callers(3, pc) f := runtime.FuncForPC(pc[0]) server.Log.Errorf("HandleThrow|func=%s|error=%#v|stack=%s\n", f, p, string(debug.Stack())) //os.Exit(-1) }