123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- package manager
- import (
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "time"
- )
- type Action struct {
- DeviceID string `json:"device_id"` // 设备ID
- SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要
- ActionExecutor string `json:"action_executor"` // 动作对象类型
- ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细
- PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息
- }
- // TaskExecutorProperty 定时任务执行动作执行参数
- type TaskExecutorProperty struct {
- /*
- 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
- */
- FunctionCode string `json:"function_code"`
- /*
- 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
- */
- FunctionValue map[string]interface{} `json:"function_value"`
- /*
- 延时时间。当 action_executor 是 delay 时,此参数必填。
- */
- DelaySeconds int64 `json:"delay_seconds"`
- }
- type PlcPubMessage struct {
- Topic string `json:"topic"`
- Payload []byte `json:"payload"`
- }
- // TaskExecutor 任务执行器,用来执行具体的任务动作
- type TaskExecutor struct {
- Actions []*Action
- }
- func NewTaskExecutor(actions []*Action) *TaskExecutor {
- return &TaskExecutor{
- Actions: actions,
- }
- }
- func (a *TaskExecutor) Do() error {
- for _, action := range a.Actions {
- if err := a.doTask(action); err != nil {
- return err
- }
- }
- return nil
- }
- func (a *TaskExecutor) doTask(action *Action) error {
- // 调用设备接入服务
- rpchost, err := getAccessRPCHost(action.DeviceID)
- if err != nil {
- return err
- }
- reply := &rpcs.ReplyEmptyResult{}
- if rpchost != "" {
- args := rpcs.ArgsSendCommand{
- DeviceId: action.DeviceID,
- SubDevice: action.SubDeviceId,
- Cmd: action.ExecutorProperty.FunctionCode,
- Params: action.ExecutorProperty.FunctionValue,
- }
- server.Log.Debugf("do Device Issue task args:%v", args)
- return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
- }
- var publishArgs rpcs.ArgsPublishMessage
- publishArgs.Topic = action.PlcPubMessage.Topic
- publishArgs.Payload = action.PlcPubMessage.Payload
- err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply)
- if err != nil {
- server.Log.Errorf("plc设备发送消息失败:%v", err)
- }
- return nil
- }
- // 执行延时任务
- func (a *TaskExecutor) doDelayTask(action *Action) error {
- time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second)
- return a.doTask(action)
- }
- func getAccessRPCHost(deviceid string) (string, error) {
- args := rpcs.ArgsGetDeviceOnlineStatus{
- Id: deviceid,
- }
- reply := &rpcs.ReplyGetDeviceOnlineStatus{}
- err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
- if err != nil {
- return "", err
- }
- return reply.AccessRPCHost, nil
- }
|