| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- package manager
- import (
- "bytes"
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/util/guid"
- "sparrow/pkg/models"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "sparrow/pkg/utils"
- "strings"
- "time"
- )
- type Action struct {
- ActionId string `json:"action_id"` // 动作ID
- DeviceID string `json:"device_id"` // 设备ID
- SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要
- ActionExecutor string `json:"action_executor"` // 动作对象类型
- ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细
- PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息
- }
- // TaskExecutorProperty 定时任务执行动作执行参数
- type TaskExecutorProperty struct {
- /*
- 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
- */
- FunctionCode string `json:"function_code"`
- /*
- 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
- */
- FunctionValue map[string]interface{} `json:"function_value"`
- /*
- 延时时间。当 action_executor 是 delay 时,此参数必填。
- */
- DelaySeconds int64 `json:"delay_seconds"`
- }
- type PlcPubMessage struct {
- Topic string `json:"topic"`
- Payload []byte `json:"payload"`
- }
- // TaskExecutor 任务执行器,用来执行具体的任务动作
- type TaskExecutor struct {
- Actions []*Action
- client *utils.HttpClient
- }
- func NewTaskExecutor(actions []*Action) *TaskExecutor {
- client := utils.NewHttpClient()
- client.SetLogger(server.Log)
- return &TaskExecutor{
- Actions: actions,
- client: client,
- }
- }
- func (a *TaskExecutor) Do(id string) error {
- for _, action := range a.Actions {
- if err := a.doTask(action); err != nil {
- return err
- }
- }
- return nil
- }
- func (a *TaskExecutor) doTask(action *Action) error {
- // 调用设备接入服务
- rpchost, err := getAccessRPCHost(action.DeviceID)
- if err != nil {
- return err
- }
- reply := &rpcs.ReplyEmptyResult{}
- if rpchost != "" {
- args := rpcs.ArgsSendCommand{
- DeviceId: action.DeviceID,
- SubDevice: action.SubDeviceId,
- Cmd: action.ExecutorProperty.FunctionCode,
- Params: action.ExecutorProperty.FunctionValue,
- }
- server.Log.Debugf("do Device Issue task args:%v", args)
- return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
- }
- var publishArgs rpcs.ArgsPublishMessage
- publishArgs.Topic = action.PlcPubMessage.Topic
- publishArgs.Payload = action.PlcPubMessage.Payload
- err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply)
- if err != nil {
- server.Log.Errorf("plc设备发送消息失败:%v", err)
- }
- fmt.Printf("plc设备发送消息成功:topic:%s,payload:%s", publishArgs.Topic, publishArgs.Payload)
- return nil
- }
- // 执行延时任务
- func (a *TaskExecutor) doDelayTask(action *Action) error {
- time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second)
- return a.doTask(action)
- }
- func getAccessRPCHost(deviceid string) (string, error) {
- args := rpcs.ArgsGetDeviceOnlineStatus{
- Id: deviceid,
- }
- reply := &rpcs.ReplyGetDeviceOnlineStatus{}
- err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
- if err != nil {
- return "", err
- }
- return reply.AccessRPCHost, nil
- }
- func (a *TaskExecutor) saveHis(id string, conditionId []string) error {
- // 收集所有动作ID
- var actionIds []string
- for _, action := range a.Actions {
- if action.ActionId != "" {
- actionIds = append(actionIds, action.ActionId)
- }
- }
- conditionIdStr := strings.Join(conditionId, ",")
- actionIdStr := strings.Join(actionIds, ",")
- // 1. 通过 RPC 调用 Registry 同步到本地数据库
- sceneHis := models.SceneHis{
- RecordId: guid.S(),
- SceneID: id,
- ConditionId: conditionIdStr,
- ActionId: actionIdStr,
- }
- rpcReply := rpcs.ReplyEmptyResult{}
- rpcErr := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateSceneHis", &sceneHis, &rpcReply)
- if rpcErr != nil {
- server.Log.Errorf("RPC保存场景执行历史失败: sceneId=%s, error=%s", id, rpcErr.Error())
- }
- // 2. 通过 HTTP POST 通知 knowoapi
- url := "http://127.0.0.1:8199/iot/v1/scene_history"
- body := make(map[string]interface{})
- body["scene_id"] = id
- if conditionIdStr != "" {
- body["condition_id"] = conditionIdStr
- }
- if actionIdStr != "" {
- body["action_id"] = actionIdStr
- }
- w := new(bytes.Buffer)
- if err := json.NewEncoder(w).Encode(body); err != nil {
- return err
- }
- req, err := utils.NewRequest("POST", url, w)
- if err != nil {
- server.Log.Error(err)
- return err
- }
- req.Header.Add("Content-Type", "application/json")
- _, err = a.client.Do(req)
- if err != nil {
- server.Log.Errorf("HTTP保存场景执行历史失败: sceneId=%s, error=%s", id, err.Error())
- }
- return err
- }
|