executer.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. package manager
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/encoding/gjson"
  5. "sparrow/pkg/rpcs"
  6. "sparrow/pkg/server"
  7. "sparrow/pkg/utils"
  8. "time"
  9. )
  10. type Action struct {
  11. DeviceID string `json:"device_id"` // 设备ID
  12. SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要
  13. ActionExecutor string `json:"action_executor"` // 动作对象类型
  14. ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细
  15. PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息
  16. }
  17. // TaskExecutorProperty 定时任务执行动作执行参数
  18. type TaskExecutorProperty struct {
  19. /*
  20. 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
  21. */
  22. FunctionCode string `json:"function_code"`
  23. /*
  24. 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
  25. */
  26. FunctionValue map[string]interface{} `json:"function_value"`
  27. /*
  28. 延时时间。当 action_executor 是 delay 时,此参数必填。
  29. */
  30. DelaySeconds int64 `json:"delay_seconds"`
  31. }
  32. type PlcPubMessage struct {
  33. Topic string `json:"topic"`
  34. Payload []byte `json:"payload"`
  35. }
  36. // TaskExecutor 任务执行器,用来执行具体的任务动作
  37. type TaskExecutor struct {
  38. Actions []*Action
  39. }
  40. func NewTaskExecutor(actions []*Action) *TaskExecutor {
  41. return &TaskExecutor{
  42. Actions: actions,
  43. }
  44. }
  45. func (a *TaskExecutor) Do(id string) error {
  46. for _, action := range a.Actions {
  47. if err := a.doTask(action); err != nil {
  48. return err
  49. }
  50. }
  51. return nil
  52. }
  53. func (a *TaskExecutor) doTask(action *Action) error {
  54. // 调用设备接入服务
  55. rpchost, err := getAccessRPCHost(action.DeviceID)
  56. if err != nil {
  57. return err
  58. }
  59. reply := &rpcs.ReplyEmptyResult{}
  60. if rpchost != "" {
  61. args := rpcs.ArgsSendCommand{
  62. DeviceId: action.DeviceID,
  63. SubDevice: action.SubDeviceId,
  64. Cmd: action.ExecutorProperty.FunctionCode,
  65. Params: action.ExecutorProperty.FunctionValue,
  66. }
  67. server.Log.Debugf("do Device Issue task args:%v", args)
  68. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  69. }
  70. var publishArgs rpcs.ArgsPublishMessage
  71. publishArgs.Topic = action.PlcPubMessage.Topic
  72. publishArgs.Payload = action.PlcPubMessage.Payload
  73. err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply)
  74. if err != nil {
  75. server.Log.Errorf("plc设备发送消息失败:%v", err)
  76. }
  77. fmt.Printf("plc设备发送消息成功:topic:%s,payload:%s", publishArgs.Topic, publishArgs.Payload)
  78. return nil
  79. }
  80. // 执行延时任务
  81. func (a *TaskExecutor) doDelayTask(action *Action) error {
  82. time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second)
  83. return a.doTask(action)
  84. }
  85. func getAccessRPCHost(deviceid string) (string, error) {
  86. args := rpcs.ArgsGetDeviceOnlineStatus{
  87. Id: deviceid,
  88. }
  89. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  90. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  91. if err != nil {
  92. return "", err
  93. }
  94. return reply.AccessRPCHost, nil
  95. }
  96. func (a *TaskExecutor) saveHis(id string, conditionId []string) error {
  97. client := utils.NewHttpClient()
  98. url := "http://127.0.0.1:8199/iot/v1/scene_history"
  99. body := make(map[string]interface{})
  100. body["scene_id"] = id
  101. body["time"] = time.Now()
  102. body["condition_id"] = conditionId
  103. _, err := client.Post(url, "application/json", gjson.New(body))
  104. if err != nil {
  105. server.Log.Errorf("sync his error:%s", err.Error())
  106. }
  107. return err
  108. //args := models.SceneHis{
  109. // RecordId: guid.S(),
  110. // SceneID: id,
  111. // DeviceId: action.DeviceID,
  112. // SubDeviceId: action.SubDeviceId,
  113. //}
  114. //if action.ExecutorProperty.FunctionCode != "" {
  115. // args.Cmd = action.ExecutorProperty.FunctionCode
  116. // args.Params = gjson.New(action.ExecutorProperty.FunctionValue).MustToJsonString()
  117. //}
  118. //if action.PlcPubMessage != nil {
  119. // args.Topic = action.PlcPubMessage.Topic
  120. // args.Payload = string(action.PlcPubMessage.Payload)
  121. //}
  122. //return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateSceneHis", args, &rpcs.ReplyEmptyResult{})
  123. }