123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- package main
- import (
- "context"
- "encoding/json"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/gogf/gf/os/grpool"
- "runtime"
- "runtime/debug"
- "sparrow/pkg/protocol"
- "sparrow/pkg/server"
- "sparrow/services/emqx-agent/client"
- "strings"
- "time"
- )
- type SubDev interface {
- SubDevMsg(handle Handle) error
- PublishToMsgToDev(topic string, payload []byte) error
- }
- type ConnectMsg struct {
- Username string `json:"username"`
- Ts int64 `json:"ts"`
- Sockport int `json:"sockport"`
- ProtoVer int `json:"proto_ver"`
- ProtoName string `json:"proto_name"`
- Keepalive int `json:"keepalive"`
- Ipaddress string `json:"ipaddress"`
- ExpiryInterval int `json:"expiry_interval"`
- ConnectedAt int64 `json:"connected_at"`
- Connack int `json:"connack"`
- Clientid string `json:"clientid"`
- Reason string `json:"reason"`
- CleanStart bool `json:"clean_start"`
- }
- type Handle func(ctx context.Context) DevSubHandle
- type DevSubHandle interface {
- Message(topic string, payload []byte) error
- Connected(status *protocol.DevConnectStatus) error
- Disconnected(status *protocol.DevConnectStatus) error
- }
- type MqttClient struct {
- client *client.MqttClient
- handlePool *grpool.Pool
- lockDevices map[string]*Device
- }
- func (d *MqttClient) PublishToMsgToDev(topic string, payload []byte) error {
- return d.client.Publish(topic, 1, false, payload)
- }
- const (
- ShareSubTopicPrefix = "$share/sparrow.agent/"
- TopicConnectStatus = ShareSubTopicPrefix + "$SYS/brokers/+/clients/#"
- TopicThing = ShareSubTopicPrefix + protocol.TopicHeadThing + "/up/#"
- TopicOta = ShareSubTopicPrefix + protocol.TopicHeadOta + "/up/#"
- TopicConfig = ShareSubTopicPrefix + protocol.TopicHeadConfig + "/up/#"
- TopicSDKLog = ShareSubTopicPrefix + protocol.TopicHeadLog + "/up/#"
- TopicShadow = ShareSubTopicPrefix + protocol.TopicHeadShadow + "/up/#"
- TopicGateway = ShareSubTopicPrefix + protocol.TopicHeadGateway + "/up/#"
- TopicExt = ShareSubTopicPrefix + protocol.TopicHeadExt + "/up/#"
- TopicEvent = ShareSubTopicPrefix + protocol.TopicHeadEvent + "/up/#"
- )
- func NewSubDev(conf *client.MqttConfig) (SubDev, error) {
- return newEmqClient(conf)
- }
- func newEmqClient(conf *client.MqttConfig) (SubDev, error) {
- mc, err := client.NewMqttClient(conf)
- if err != nil {
- return nil, err
- }
- return &MqttClient{
- client: mc,
- handlePool: grpool.New(1000),
- }, nil
- }
- func (d *MqttClient) SubDevMsg(handle Handle) error {
- //err := d.subDevMsg(nil, handle)
- //if err != nil {
- // return err
- //}
- client.SetMqttSetOnConnectHandler(func(cli mqtt.Client) {
- err := d.subDevMsg(cli, handle)
- if err != nil {
- server.Log.Errorf("mqttSetOnConnectHandler.subDevMsg err:%v", err)
- }
- })
- return nil
- }
- func (d *MqttClient) subDevMsg(cli mqtt.Client, handle Handle) error {
- err := d.subscribeWithFunc(cli, TopicConnectStatus, d.subConnectStatus(handle))
- if err != nil {
- return err
- }
- err = d.subscribeWithFunc(cli, TopicThing, func(ctx context.Context, topic string, payload []byte) error {
- return handle(ctx).Message(topic, payload)
- })
- if err != nil {
- return err
- }
- //err = d.subscribeWithFunc(cli, TopicConfig, func(ctx context.Context, topic string, payload []byte) error {
- // return handle(ctx).Message(topic, payload)
- //})
- //if err != nil {
- // return err
- //}
- err = d.subscribeWithFunc(cli, TopicOta, func(ctx context.Context, topic string, payload []byte) error {
- return handle(ctx).Message(topic, payload)
- })
- if err != nil {
- return err
- }
- //err = d.subscribeWithFunc(cli, TopicExt, func(ctx context.Context, topic string, payload []byte) error {
- // return handle(ctx).Message(topic, payload)
- //})
- //if err != nil {
- // return err
- //}
- //err = d.subscribeWithFunc(cli, TopicShadow, func(ctx context.Context, topic string, payload []byte) error {
- // return handle(ctx).Message(topic, payload)
- //})
- //
- //if err != nil {
- // return err
- //}
- //err = d.subscribeWithFunc(cli, TopicGateway, func(ctx context.Context, topic string, payload []byte) error {
- // return handle(ctx).Message(topic, payload)
- //})
- //if err != nil {
- // return err
- //}
- //err = d.subscribeWithFunc(cli, TopicSDKLog, func(ctx context.Context, topic string, payload []byte) error {
- // return handle(ctx).Message(topic, payload)
- //})
- //if err != nil {
- // return err
- //}
- err = d.subscribeWithFunc(cli, TopicEvent, func(ctx context.Context, topic string, payload []byte) error {
- return handle(ctx).Message(topic, payload)
- })
- if err != nil {
- return err
- }
- return nil
- }
- func (d *MqttClient) subConnectStatus(handle Handle) func(ctx context.Context, topic string, payload []byte) error {
- return func(ctx context.Context, topic string, payload []byte) error {
- var (
- msg ConnectMsg
- err error
- )
- err = json.Unmarshal(payload, &msg)
- if err != nil {
- server.Log.Errorf("json.Unmarshal err :%s, topic :%v", err, topic)
- return err
- }
- status := protocol.DevConnectStatus{
- DeviceCode: msg.Username,
- DeviceId: msg.Clientid,
- ClientIp: msg.Ipaddress,
- }
- if strings.HasSuffix(topic, "/connected") || strings.HasSuffix(topic, "/subscribed") {
- status.Action = "LOGIN"
- return handle(ctx).Connected(&status)
- } else {
- status.Action = "LOGOUT"
- status.Reason = msg.Reason
- return handle(ctx).Disconnected(&status)
- }
- }
- }
- func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string,
- handle func(ctx context.Context, topic string, payload []byte) error) error {
- return d.client.Subscribe(cli, topic, 1, func(c mqtt.Client, message mqtt.Message) {
- _ = d.handlePool.Add(func() {
- go func() {
- ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
- defer cancel()
- Recover(ctx)
- err := handle(ctx, message.Topic(), message.Payload())
- if err != nil {
- server.Log.Errorf("handle failure err :%s, topic :%v", err, topic)
- }
- }()
- })
- })
- }
- func Recover(ctx context.Context) {
- if p := recover(); p != nil {
- HandleThrow(ctx, p)
- }
- }
- func HandleThrow(ctx context.Context, p any) {
- pc := make([]uintptr, 1)
- runtime.Callers(3, pc)
- f := runtime.FuncForPC(pc[0])
- server.Log.Errorf("HandleThrow|func=%s|error=%#v|stack=%s\n", f, p, string(debug.Stack()))
- //os.Exit(-1)
- }
|