lijian 1 год назад
Родитель
Сommit
d30abf6ee0

+ 6 - 2
pkg/protocol/topic.go

@@ -11,8 +11,8 @@ $thing/up/status/${productID}/${deviceName}	发布	属性上报
 $thing/down/status/${productID}/${deviceName}	订阅	属性下发与属性上报响应
 $thing/up/event/${productID}/${deviceName}	发布	事件上报
 $thing/down/event/${productID}/${deviceName}	订阅	事件上报响应
-$thing/up/action/${productID}/${deviceName}	发布	设备响应行为执行结果
-$thing/down/action/${productID}/${deviceName}	订阅	应用调用设备行为
+$thing/up/command/${productID}/${deviceName}	发布	设备响应行为执行结果
+$thing/down/command/${productID}/${deviceName}	订阅	应用调用设备行为
 系统级topic:
 $ota/report/${productID}/${deviceName}	发布	固件升级消息上行
 $ota/update/${productID}/${deviceName}	订阅	固件升级消息下行
@@ -109,6 +109,10 @@ func parseLast(topics []string) (topicInfo *TopicInfo, err error) {
 	}, err
 }
 
+func GetCommandTopic(deviceCode, productKey string) string {
+	return strings.Join([]string{TopicHeadThing, "down", "command", productKey, deviceCode}, "/")
+}
+
 func getDirection(dir string) Direction {
 	switch dir {
 	case "up":

+ 37 - 0
services/apiprovider/actions.go

@@ -272,6 +272,43 @@ func SendCommandToDevice(device *models.Device, config *productconfig.ProductCon
 
 }
 
+func SendCommandToDeviceV2(device *models.Device, config *productconfig.ProductConfig,
+	urlparams martini.Params, req *http.Request, r render.Render) {
+	timeout := req.URL.Query().Get("timeout")
+
+	server.Log.Printf("ACTION SendCommandToDevice, identifier:: %v, request: %v, timeout: %v",
+		device.DeviceIdentifier, req.Body, timeout)
+
+	var args map[string]interface{}
+	decoder := json.NewDecoder(req.Body)
+	err := decoder.Decode(&args)
+	if err != nil {
+		r.JSON(http.StatusOK, renderError(ErrWrongRequestFormat, err))
+		return
+	}
+
+	j := gjson.New(args)
+	cmdargs := rpcs.ArgsSendCommand{
+		DeviceId:  device.DeviceIdentifier,
+		SubDevice: j.GetString("subDeviceId"),
+		WaitTime:  uint32(defaultTimeOut),
+		Params:    j.GetMap("data.params"),
+		Cmd:       j.GetString("data.cmd"),
+	}
+	cmdreply := rpcs.ReplySendCommand{}
+
+	err = server.RPCCallByName(context.Background(), rpcs.ControllerName, "Agent.SendCommand", cmdargs, &cmdreply)
+	if err != nil {
+		server.Log.Errorf("send devie command error: %v", err)
+		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
+		return
+	}
+
+	r.JSON(http.StatusOK, Common{})
+	return
+
+}
+
 // AddRule 增加设备规则
 func AddRule(device *models.Device, req *http.Request, r render.Render) {
 	var ruleReq CreateRuleRequest

+ 6 - 0
services/apiprovider/router.go

@@ -67,6 +67,12 @@ func route(m *martini.ClassicMartini) {
 
 		r.Get("/devices/online", CheckDeviceIsOnline)
 	})
+	m.Group("application/v2", func(r martini.Router) {
+		// send a command to device
+		r.Post("/devices/:identifier/commands",
+			ApplicationAuthOnDeviceIdentifer, CheckDeviceOnline, CheckProductConfig,
+			SendCommandToDevice)
+	})
 
 	m.Post("/application/auth", AppAuth)
 

+ 53 - 2
services/emqx-agent/agent.go

@@ -8,9 +8,11 @@ import (
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
+	"time"
 )
 
 type Agent struct {
+	client SubDev
 }
 
 // Message 收到设备上报消息处理
@@ -204,6 +206,55 @@ func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
 	return err
 }
 
-func NewAgent() *Agent {
-	return &Agent{}
+// SendCommand rpc 发送设备命令
+func (a *Agent) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
+	// 查询设备信息
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", args.DeviceId, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", args.DeviceId)
+		return nil
+	}
+	product := &models.Product{}
+	err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", args.DeviceId)
+		return nil
+	}
+	cmd := &klink.CloudSend{
+		Action:      "cloudSend",
+		MsgId:       0,
+		DeviceCode:  args.DeviceId,
+		SubDeviceId: args.SubDevice,
+		Timestamp:   time.Now().Unix(),
+		Data: &klink.CloudSendData{
+			Cmd:    args.Cmd,
+			Params: args.Params,
+		},
+	}
+	msg, err := cmd.Marshal()
+	if err != nil {
+		return err
+	}
+	return a.client.PublishToMsgToDev(protocol.GetCommandTopic(args.DeviceId, product.RecordId), msg)
+}
+
+// GetStatus rpc 获取设备状态
+func (a *Agent) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
+	server.Log.Infof("Access Get Status: %v", args)
+	// first send a get status command
+	cmdArgs := rpcs.ArgsSendCommand{
+		DeviceId:  args.Id,
+		WaitTime:  0,
+		SubDevice: args.SubDeviceId,
+		Cmd:       "report",
+	}
+	cmdReply := rpcs.ReplySendCommand{}
+	return a.SendCommand(cmdArgs, &cmdReply)
+}
+
+func NewAgent(client SubDev) *Agent {
+	return &Agent{
+		client: client,
+	}
 }

+ 2 - 1
services/emqx-agent/main.go

@@ -229,7 +229,7 @@ func main() {
 		server.Log.Fatal(err)
 		return
 	}
-	agent := NewAgent()
+
 	sd, err := NewSubDev(&client.MqttConfig{
 		ClientId: g.Cfg().GetString("mqtt.client_id"),
 		User:     g.Cfg().GetString("mqtt.user"),
@@ -240,6 +240,7 @@ func main() {
 	if err != nil {
 		panic(err)
 	}
+	agent := NewAgent(sd)
 	err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle {
 		return agent
 	})

+ 5 - 0
services/emqx-agent/sub_dev.go

@@ -16,6 +16,7 @@ import (
 
 type SubDev interface {
 	SubDevMsg(handle Handle) error
+	PublishToMsgToDev(topic string, payload []byte) error
 }
 
 type ConnectMsg struct {
@@ -47,6 +48,10 @@ type MqttClient struct {
 	handlePool *grpool.Pool
 }
 
+func (d *MqttClient) PublishToMsgToDev(topic string, payload []byte) error {
+	return d.client.Publish(topic, 1, false, payload)
+}
+
 const (
 	ShareSubTopicPrefix = "$share/sparrow.agent/"
 	TopicConnectStatus  = ShareSubTopicPrefix + "$SYS/brokers/+/clients/#"