浏览代码

测试agent

lijian 1 年之前
父节点
当前提交
d240034405

+ 8 - 0
pkg/protocol/emqx_device.go

@@ -0,0 +1,8 @@
+package protocol
+
+type DevConnectStatus struct {
+	DeviceCode string
+	Reason     string
+	Action     string
+	DeviceId   string
+}

+ 49 - 0
pkg/protocol/topic.go

@@ -0,0 +1,49 @@
+package protocol
+
+/*
+物理型topic:
+$thing/up/property/${productID}/${deviceName}	发布	属性上报
+$thing/down/property/${productID}/${deviceName}	订阅	属性下发与属性上报响应
+$thing/up/event/${productID}/${deviceName}	发布	事件上报
+$thing/down/event/${productID}/${deviceName}	订阅	事件上报响应
+$thing/up/action/${productID}/${deviceName}	发布	设备响应行为执行结果
+$thing/down/action/${productID}/${deviceName}	订阅	应用调用设备行为
+系统级topic:
+$ota/report/${productID}/${deviceName}	发布	固件升级消息上行
+$ota/update/${productID}/${deviceName}	订阅	固件升级消息下行
+$broadcast/rxd/${productID}/${deviceName}	订阅	广播消息下行
+$shadow/operation/up/{productID}/${deviceName}	发布	设备影子消息上行
+$shadow/operation/down/{productID}/${deviceName}	订阅	设备影子消息下行
+$rrpc/txd/{productID}/${deviceName}/${MessageId}	发布	RRPC消息上行,MessageId为RRPC消息ID
+$rrpc/rxd/{productID}/${deviceName}/+	订阅	RRPC消息下行
+$sys/operation/up/{productID}/${deviceName}	发布	系统topic:ntp服务消息上行
+$sys/operation/down/{productID}/${deviceName}/+	订阅	系统topic:ntp服务消息下行
+log topic
+$log/up/operation/${productID}/${deviceName} //设备查询是否需要上传调试日志及日志级别,上行
+$log/down/operation/${productID}/${deviceName}
+$log/up/report/${productID}/${deviceName} //设备上传调试日志内容,上行
+$log/down/report/${productID}/${deviceName}
+$log/down/update/${productID}/${deviceName} //服务器端下发调试日志配置,下行
+
+自定义topic:
+${productID}/${deviceName}/control	订阅	编辑删除
+${productID}/${deviceName}/data	订阅和发布	编辑删除
+${productID}/${deviceName}/event	发布
+${productID}/${deviceName}/xxxxx	订阅和发布   //自定义 暂不做支持
+*/
+const (
+	TopicHeadThing   = "$thing"
+	Thing            = "thing"
+	TopicHeadOta     = "$ota"
+	Ota              = "ota"
+	TopicHeadConfig  = "$config"
+	Config           = "config"
+	TopicHeadLog     = "$log"
+	Log              = "log"
+	TopicHeadShadow  = "$shadow"
+	Shadow           = "shadow"
+	TopicHeadGateway = "$gateway"
+	Gateway          = "gateway"
+	TopicHeadExt     = "$ext"
+	Ext              = "ext"
+)

+ 9 - 0
services/emqx-agent/client/config.go

@@ -0,0 +1,9 @@
+package client
+
+type MqttConfig struct {
+	ClientId string
+	Brokers  []string
+	User     string
+	Password string
+	ConnNum  int
+}

+ 99 - 0
services/emqx-agent/client/mqtt_client.go

@@ -0,0 +1,99 @@
+package client
+
+import (
+	"crypto/tls"
+	"fmt"
+	MQTT "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gogf/gf/util/guid"
+	"math/rand"
+	"net/url"
+	"sparrow/pkg/server"
+
+	"sync"
+	"time"
+)
+
+var (
+	mqttInitOnce            sync.Once
+	mqttClient              *MqttClient
+	mqttSetOnConnectHandler func(cli MQTT.Client)
+)
+
+type MqttClient struct {
+	clients []MQTT.Client
+}
+
+func NewMqttClient(conf *MqttConfig) (mcs *MqttClient, err error) {
+	mqttInitOnce.Do(func() {
+		var clients []MQTT.Client
+		for len(clients) < conf.ConnNum {
+			var (
+				mc MQTT.Client
+			)
+			for i := 3; i > 0; i++ {
+				mc, err = initMqttClient(conf)
+				if err != nil {
+					server.Log.Errorf("init mqtt client failed: %s", err.Error())
+					continue
+				}
+				break
+			}
+			if err != nil {
+				server.Log.Errorf("mqtt 连接失败: %s", err.Error())
+				panic(err)
+			}
+			clients = append(clients, mc)
+			var cli = MqttClient{clients: clients}
+			mqttClient = &cli
+		}
+	})
+	return mqttClient, err
+}
+
+func initMqttClient(conf *MqttConfig) (mc MQTT.Client, err error) {
+	opts := MQTT.NewClientOptions()
+	for _, broker := range conf.Brokers {
+		opts.AddBroker(broker)
+	}
+	opts.SetClientID(fmt.Sprintf("%s_%s", conf.ClientId, guid.S())).
+		SetUsername(conf.User).SetPassword(conf.Password).
+		SetConnectRetry(true)
+	opts.SetConnectionAttemptHandler(func(broker *url.URL, tlsCfg *tls.Config) *tls.Config {
+		server.Log.Infof("connect to %s", broker.String())
+		return tlsCfg
+	})
+	opts.SetOnConnectHandler(func(client MQTT.Client) {
+		server.Log.Infof("connected success")
+	})
+
+	mc = MQTT.NewClient(opts)
+	if errF := mc.Connect().WaitTimeout(time.Second * 5); errF == false {
+		server.Log.Errorf("connected failed")
+		return
+	}
+	return
+}
+
+// Subscribe 订阅消息
+func (m MqttClient) Subscribe(cli MQTT.Client, topic string, qos byte, callback MQTT.MessageHandler) error {
+	var clients = m.clients
+	if cli != nil {
+		clients = []MQTT.Client{cli}
+	}
+	for _, client := range clients {
+		err := client.Subscribe(topic, qos, callback).Error()
+		if err != nil {
+			server.Log.Errorf("subscribe failed: %s", err.Error())
+		}
+	}
+	return nil
+}
+
+// Publish 发布消息
+func (m MqttClient) Publish(topic string, qos byte, retained bool, payload interface{}) error {
+	id := rand.Intn(len(m.clients))
+	return m.clients[id].Publish(topic, qos, retained, payload).Error()
+}
+func SetMqttSetOnConnectHandler(f func(cli MQTT.Client)) {
+	mqttSetOnConnectHandler = f
+}

+ 6 - 0
services/emqx-agent/config.toml

@@ -0,0 +1,6 @@
+[mqtt]
+client_id = "sparrow"
+brokers = ["114.115.251.196:1883"]
+user = "sparrow_test"
+password = "sparrow_test"
+conn_number = 10

+ 27 - 0
services/emqx-agent/emqx-agent.go

@@ -1 +1,28 @@
 package main
+
+import "sparrow/pkg/protocol"
+
+type Agent struct {
+}
+
+// Message 收到设备上报消息处理
+func (a *Agent) Message(topic string, payload []byte) error {
+	//TODO implement me
+	panic("implement me")
+}
+
+// Connected 设备接入时
+func (a *Agent) Connected(status *protocol.DevConnectStatus) error {
+	//TODO implement me
+	panic("implement me")
+}
+
+// Disconnected 设备断开连接时
+func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
+	//TODO implement me
+	panic("implement me")
+}
+
+func NewAgent() *Agent {
+	return &Agent{}
+}

+ 25 - 7
services/emqx-agent/main.go

@@ -2,10 +2,11 @@ package main
 
 import (
 	"context"
-	"log"
+	"github.com/gogf/gf/frame/g"
 	"math"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
+	"sparrow/services/emqx-agent/client"
 	"sync"
 	"time"
 
@@ -63,10 +64,6 @@ func (counter *Counter) GetCount() (num int64) {
 	return counter.count
 }
 
-const (
-	port = ":9000"
-)
-
 var cnter *Counter = NewCounter(0, 100)
 
 // emqttServer is used to implement emqx_exhook_v1.s *emqttServer
@@ -232,13 +229,34 @@ func main() {
 		server.Log.Fatal(err)
 		return
 	}
+	agent := NewAgent()
+	sd, err := NewSubDev(&client.MqttConfig{
+		ClientId: g.Cfg().GetString("mqtt.client_id"),
+		User:     g.Cfg().GetString("mqtt.user"),
+		Password: g.Cfg().GetString("mqtt.password"),
+		Brokers:  g.Cfg().GetStrings("mqtt.brokers"),
+	})
+	if err != nil {
+		panic(err)
+	}
+	err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle {
+		return agent
+	})
+	if err != nil {
+		panic(err)
+	}
 	s := grpc.NewServer()
 	pb.RegisterHookProviderServer(s, &emqttServer{})
-	log.Println("Started gRPC emqttServer on ::9000")
-	// register a http handler
 	err = server.RegisterHTTPHandler(s)
 	if err != nil {
 		server.Log.Errorf("RegisterHTTPHandler Error: %s", err)
 		return
 	}
+	err = server.RegisterRPCHandler(agent)
+
+	// start to run
+	err = server.Run()
+	if err != nil {
+		server.Log.Fatal(err)
+	}
 }

+ 195 - 0
services/emqx-agent/sub_dev.go

@@ -0,0 +1,195 @@
+package main
+
+import (
+	"context"
+	"encoding/json"
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"runtime"
+	"runtime/debug"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/server"
+	"sparrow/services/emqx-agent/client"
+	"strings"
+	"time"
+)
+
+type SubDev interface {
+	SubDevMsg(handle Handle) error
+}
+
+type ConnectMsg struct {
+	Username       string `json:"username"`
+	Ts             int64  `json:"ts"`
+	Sockport       int    `json:"sockport"`
+	ProtoVer       int    `json:"proto_ver"`
+	ProtoName      string `json:"proto_name"`
+	Keepalive      int    `json:"keepalive"`
+	Ipaddress      string `json:"ipaddress"`
+	ExpiryInterval int    `json:"expiry_interval"`
+	ConnectedAt    int64  `json:"connected_at"`
+	Connack        int    `json:"connack"`
+	Clientid       string `json:"clientid"`
+	Reason         string `json:"reason"`
+	CleanStart     bool   `json:"clean_start"`
+}
+
+type Handle func(ctx context.Context) DevSubHandle
+
+type DevSubHandle interface {
+	Message(topic string, payload []byte) error
+	Connected(status *protocol.DevConnectStatus) error
+	Disconnected(status *protocol.DevConnectStatus) error
+}
+
+type (
+	MqttClient struct {
+		client *client.MqttClient
+	}
+)
+
+const (
+	ShareSubTopicPrefix = "$share/sparrow.agent/"
+	TopicConnectStatus  = ShareSubTopicPrefix + "$SYS/brokers/+/clients/#"
+	TopicThing          = ShareSubTopicPrefix + protocol.Thing + "/up/#"
+	TopicOta            = ShareSubTopicPrefix + protocol.TopicHeadOta + "/up/#"
+	TopicConfig         = ShareSubTopicPrefix + protocol.TopicHeadConfig + "/up/#"
+	TopicSDKLog         = ShareSubTopicPrefix + protocol.TopicHeadLog + "/up/#"
+	TopicShadow         = ShareSubTopicPrefix + protocol.TopicHeadShadow + "/up/#"
+	TopicGateway        = ShareSubTopicPrefix + protocol.TopicHeadGateway + "/up/#"
+	TopicExt            = ShareSubTopicPrefix + protocol.TopicHeadExt + "/up/#"
+)
+
+func NewSubDev(conf *client.MqttConfig) (SubDev, error) {
+	return newEmqClient(conf)
+}
+
+func newEmqClient(conf *client.MqttConfig) (SubDev, error) {
+	mc, err := client.NewMqttClient(conf)
+	if err != nil {
+		return nil, err
+	}
+	return &MqttClient{
+		client: mc,
+	}, nil
+}
+
+func (d *MqttClient) SubDevMsg(handle Handle) error {
+
+	err := d.subDevMsg(nil, handle)
+	if err != nil {
+		return err
+	}
+	client.SetMqttSetOnConnectHandler(func(cli mqtt.Client) {
+		err := d.subDevMsg(cli, handle)
+		if err != nil {
+			server.Log.Errorf("mqttSetOnConnectHandler.subDevMsg err:%v", err)
+		}
+	})
+	return nil
+}
+
+func (d *MqttClient) subDevMsg(cli mqtt.Client, handle Handle) error {
+	err := d.subscribeWithFunc(cli, TopicConnectStatus, d.subConnectStatus(handle))
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicThing, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicConfig, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicOta, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicExt, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicShadow, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicGateway, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	err = d.subscribeWithFunc(cli, TopicSDKLog, func(ctx context.Context, topic string, payload []byte) error {
+		return handle(ctx).Message(topic, payload)
+	})
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (d *MqttClient) subConnectStatus(handle Handle) func(ctx context.Context, topic string, payload []byte) error {
+	return func(ctx context.Context, topic string, payload []byte) error {
+		var (
+			msg ConnectMsg
+			err error
+		)
+		err = json.Unmarshal(payload, &msg)
+		if err != nil {
+			server.Log.Errorf("json.Unmarshal err :%s, topic :%v", err, topic)
+			return err
+		}
+		status := protocol.DevConnectStatus{
+			DeviceCode: msg.Username,
+			DeviceId:   msg.Clientid,
+		}
+		if strings.HasSuffix(topic, "/disconnected") {
+			status.Action = "LOGIN"
+			return handle(ctx).Connected(&status)
+		} else {
+			status.Action = "LOGOUT"
+			status.Reason = msg.Reason
+			return handle(ctx).Disconnected(&status)
+		}
+	}
+}
+
+func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string,
+	handle func(ctx context.Context, topic string, payload []byte) error) error {
+	return d.client.Subscribe(cli, topic, 1, func(c mqtt.Client, message mqtt.Message) {
+		go func() {
+			ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
+			defer cancel()
+			Recover(ctx)
+			err := handle(ctx, message.Topic(), message.Payload())
+			if err != nil {
+				server.Log.Errorf("handle failure err :%s, topic :%v", err, topic)
+			}
+		}()
+	})
+}
+
+func Recover(ctx context.Context) {
+	if p := recover(); p != nil {
+		HandleThrow(ctx, p)
+	}
+}
+
+func HandleThrow(ctx context.Context, p any) {
+	pc := make([]uintptr, 1)
+	runtime.Callers(3, pc)
+	f := runtime.FuncForPC(pc[0])
+	server.Log.Errorf("HandleThrow|func=%s|error=%#v|stack=%s\n", f, p, string(debug.Stack()))
+	//os.Exit(-1)
+}