executer.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package manager
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sparrow/pkg/utils"
  9. "strings"
  10. "time"
  11. )
  12. type Action struct {
  13. DeviceID string `json:"device_id"` // 设备ID
  14. SubDeviceId string `json:"sub_device_id"` // 实体子设备Id,如果需要
  15. ActionExecutor string `json:"action_executor"` // 动作对象类型
  16. ExecutorProperty *TaskExecutorProperty `json:"executor_property"` // 动作执行明细
  17. PlcPubMessage *PlcPubMessage `json:"plc_pub_message"` // PLC消息
  18. }
  19. // TaskExecutorProperty 定时任务执行动作执行参数
  20. type TaskExecutorProperty struct {
  21. /*
  22. 指令 code。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
  23. */
  24. FunctionCode string `json:"function_code"`
  25. /*
  26. 指令 value。当 action_executor 是 device_issue 或 device_group_issue 时,此参数必填。
  27. */
  28. FunctionValue map[string]interface{} `json:"function_value"`
  29. /*
  30. 延时时间。当 action_executor 是 delay 时,此参数必填。
  31. */
  32. DelaySeconds int64 `json:"delay_seconds"`
  33. }
  34. type PlcPubMessage struct {
  35. Topic string `json:"topic"`
  36. Payload []byte `json:"payload"`
  37. }
  38. // TaskExecutor 任务执行器,用来执行具体的任务动作
  39. type TaskExecutor struct {
  40. Actions []*Action
  41. client *utils.HttpClient
  42. }
  43. func NewTaskExecutor(actions []*Action) *TaskExecutor {
  44. client := utils.NewHttpClient()
  45. client.SetLogger(server.Log)
  46. return &TaskExecutor{
  47. Actions: actions,
  48. client: client,
  49. }
  50. }
  51. func (a *TaskExecutor) Do(id string) error {
  52. for _, action := range a.Actions {
  53. if err := a.doTask(action); err != nil {
  54. return err
  55. }
  56. }
  57. return nil
  58. }
  59. func (a *TaskExecutor) doTask(action *Action) error {
  60. // 调用设备接入服务
  61. rpchost, err := getAccessRPCHost(action.DeviceID)
  62. if err != nil {
  63. return err
  64. }
  65. reply := &rpcs.ReplyEmptyResult{}
  66. if rpchost != "" {
  67. args := rpcs.ArgsSendCommand{
  68. DeviceId: action.DeviceID,
  69. SubDevice: action.SubDeviceId,
  70. Cmd: action.ExecutorProperty.FunctionCode,
  71. Params: action.ExecutorProperty.FunctionValue,
  72. }
  73. server.Log.Debugf("do Device Issue task args:%v", args)
  74. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  75. }
  76. var publishArgs rpcs.ArgsPublishMessage
  77. publishArgs.Topic = action.PlcPubMessage.Topic
  78. publishArgs.Payload = action.PlcPubMessage.Payload
  79. err = server.RPCCallByName(nil, rpcs.EmqxAgentServiceName, "Access.PublishMessage", publishArgs, reply)
  80. if err != nil {
  81. server.Log.Errorf("plc设备发送消息失败:%v", err)
  82. }
  83. fmt.Printf("plc设备发送消息成功:topic:%s,payload:%s", publishArgs.Topic, publishArgs.Payload)
  84. return nil
  85. }
  86. // 执行延时任务
  87. func (a *TaskExecutor) doDelayTask(action *Action) error {
  88. time.Sleep(time.Duration(action.ExecutorProperty.DelaySeconds) * time.Second)
  89. return a.doTask(action)
  90. }
  91. func getAccessRPCHost(deviceid string) (string, error) {
  92. args := rpcs.ArgsGetDeviceOnlineStatus{
  93. Id: deviceid,
  94. }
  95. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  96. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  97. if err != nil {
  98. return "", err
  99. }
  100. return reply.AccessRPCHost, nil
  101. }
  102. func (a *TaskExecutor) saveHis(id string, conditionId []string) error {
  103. url := "http://127.0.0.1:8199/iot/v1/scene_history"
  104. body := make(map[string]interface{})
  105. body["scene_id"] = id
  106. if len(conditionId) > 0 {
  107. body["condition_id"] = strings.Join(conditionId, ",")
  108. }
  109. w := new(bytes.Buffer)
  110. if err := json.NewEncoder(w).Encode(body); err != nil {
  111. return err
  112. }
  113. req, err := utils.NewRequest("POST", url, w)
  114. if err != nil {
  115. server.Log.Error(err)
  116. return err
  117. }
  118. req.Header.Add("Content-Type", "application/json")
  119. _, err = a.client.Do(req)
  120. if err != nil {
  121. server.Log.Errorf("请求出错%s", err.Error())
  122. }
  123. //_, err := a.client.Post(url, "application/json", gjson.New(body))
  124. //if err != nil {
  125. // server.Log.Errorf("sync his error:%s", err.Error())
  126. //}
  127. return err
  128. //args := models.SceneHis{
  129. // RecordId: guid.S(),
  130. // SceneID: id,
  131. // DeviceId: action.DeviceID,
  132. // SubDeviceId: action.SubDeviceId,
  133. //}
  134. //if action.ExecutorProperty.FunctionCode != "" {
  135. // args.Cmd = action.ExecutorProperty.FunctionCode
  136. // args.Params = gjson.New(action.ExecutorProperty.FunctionValue).MustToJsonString()
  137. //}
  138. //if action.PlcPubMessage != nil {
  139. // args.Topic = action.PlcPubMessage.Topic
  140. // args.Payload = string(action.PlcPubMessage.Payload)
  141. //}
  142. //return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateSceneHis", args, &rpcs.ReplyEmptyResult{})
  143. }