temp_alarm_node.go 8.6 KB


  1. package nodes
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "github.com/gogf/gf/encoding/gjson"
  6. "github.com/gogf/gf/os/grpool"
  7. "regexp"
  8. "sparrow/pkg/protocol"
  9. "sparrow/pkg/rpcs"
  10. "sparrow/pkg/ruleEngine"
  11. "sparrow/pkg/server"
  12. "sparrow/pkg/utils"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. )
  18. type TempAlarmNode struct {
  19. pool *grpool.Pool
  20. config *TempAlarmNodeConfig
  21. client *utils.HttpClient
  22. bufPool sync.Pool
  23. }
  24. func (t *TempAlarmNode) Init(ctx ruleEngine.Context, config string) error {
  25. t.pool = grpool.New(10)
  26. fmt.Printf("initConfig:---------------------------%s\r\n", config)
  27. if config == "" {
  28. t.config = &TempAlarmNodeConfig{}
  29. } else {
  30. c := new(TempAlarmNodeConfig)
  31. err := json.Unmarshal([]byte(config), c)
  32. if err != nil {
  33. return err
  34. }
  35. t.config = c
  36. }
  37. client := utils.NewHttpClient()
  38. client.SetLogger(server.Log)
  39. t.client = client
  40. return nil
  41. }
  42. func (t *TempAlarmNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
  43. deviceId := message.MetaData["device_id"].(string)
  44. if t.evaluateConditions(message) {
  45. err := t.newAlarmMessage(message)
  46. if err != nil {
  47. return err
  48. }
  49. ctx.TellNext(message, protocol.True)
  50. return nil
  51. } else {
  52. args := rpcs.ArgsGetAlarm{
  53. DeviceCode: deviceId,
  54. }
  55. reply := rpcs.ReplayAlarm{}
  56. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.DelAlarm", args, &reply)
  57. if err != nil {
  58. server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
  59. }
  60. ctx.TellNext(message, protocol.False)
  61. }
  62. return nil
  63. }
  64. type TempAlarmNodeConfig struct {
  65. ProductKey string `json:"product_key"` // 设备产品key
  66. Duration int `json:"duration"` // 持续时间
  67. Interval int `json:"interval"` // 告警间隔
  68. FillUserInfo bool `json:"fill_user_info"` // 是否需要匹配用户信息
  69. Rules []*Rule `json:"rules"` // 报警规则
  70. MessageTemplate string `json:"message_template"` // 报警内容模板
  71. MessageFields []*MessageFields `json:"message_fields"` // 报警内容字段
  72. }
  73. type MessageFields struct {
  74. Field string `json:"field"`
  75. Type string `json:"type"`
  76. }
  77. // Rule 表示一组报警规则
  78. type Rule struct {
  79. Conditions []*Condition `json:"conditions"`
  80. LogicalOp int `json:"logical_op"` // 1 "AND" 2 "OR"
  81. }
  82. type Condition struct {
  83. Field string `json:"field"` // 字段名
  84. Value string `json:"value"` // 值
  85. Operator int `json:"operator"` // 比较类型 数值比较 1 > 2 >= 3 = 4 <= 5 < 6 != 字符串比较 1 相等 2 不相等 3 包含 4 不包含
  86. Type int `json:"type"` //字段类型 1 字符串 2 数值
  87. }
  88. type AlarmMessage struct {
  89. DeviceId string `json:"device_id"`
  90. UserName string `json:"user_name"`
  91. Address string `json:"address"`
  92. Phone string `json:"phone"`
  93. SetTemp int `json:"set_temp"`
  94. Temperature int `json:"temperature"`
  95. Duration int `json:"duration"`
  96. }
  97. func (t *TempAlarmNode) newAlarmMessage(message *protocol.Message) error {
  98. dataMap := make(map[string]interface{})
  99. var err error
  100. if t.config.FillUserInfo {
  101. result, err := utils.NewRequest("GET", fmt.Sprintf("%s/%s", "http://127.0.0.1:8199/iot/v1/devices/", message.MetaData["device_id"].(string)), nil)
  102. if err != nil {
  103. return err
  104. }
  105. if result == nil {
  106. return nil
  107. }
  108. j := gjson.New(result)
  109. fmt.Printf("查询用户信息------------------:%s\r\n", j.MustToJsonString())
  110. dataMap = j.GetMap("data")
  111. fmt.Printf("dataMap------------------:%v\r\n", dataMap)
  112. } else {
  113. j := gjson.New(message.MetaData)
  114. dataMap = j.GetMap("status")
  115. }
  116. message.AlarmMessage, err = fillTemplate(t.config.MessageTemplate, dataMap)
  117. if err != nil {
  118. return err
  119. }
  120. return nil
  121. }
  122. func (t *TempAlarmNode) evaluateConditions(message *protocol.Message) bool {
  123. deviceId := message.MetaData["device_id"].(string)
  124. fmt.Printf("message------------------:%s\r\n", gjson.New(message.MetaData).MustToJsonString())
  125. // 判断设备类型
  126. if message.MetaData["product_key"] != t.config.ProductKey {
  127. fmt.Printf("productKey------------------:%s\r\n", message.MetaData["product_key"])
  128. return false
  129. }
  130. // 判断设备是否在线
  131. onlineargs := rpcs.ArgsGetDeviceOnlineStatus{
  132. Id: deviceId,
  133. }
  134. onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
  135. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
  136. if err != nil || onlinereply.ClientIP == "" {
  137. return false
  138. }
  139. // 判断报警规则
  140. if len(t.config.Rules) == 0 {
  141. return false
  142. }
  143. data := make(map[string]interface{})
  144. var ok bool
  145. j := gjson.New(message.Data)
  146. fmt.Printf("Data------------------:%s\r\n", j.MustToJsonString())
  147. status := j.Get("status")
  148. if data, ok = status.(map[string]interface{}); !ok {
  149. return false
  150. }
  151. // 判断设备是否开机
  152. if data["power"].(int) == 0 {
  153. return false
  154. }
  155. for _, rule := range t.config.Rules {
  156. var results []bool
  157. for _, cond := range rule.Conditions {
  158. results = append(results, t.checkValue(cond.Value, data[cond.Field], cond.Type, cond.Operator))
  159. }
  160. // 应用逻辑运算
  161. if rule.LogicalOp == 1 {
  162. for _, r := range results {
  163. if !r {
  164. return false
  165. }
  166. }
  167. } else {
  168. for _, r := range results {
  169. if r {
  170. break
  171. }
  172. return false
  173. }
  174. }
  175. }
  176. // 判断持续时间
  177. if t.config.Duration == 0 {
  178. return true
  179. }
  180. args := rpcs.ArgsGetAlarm{
  181. DeviceCode: deviceId,
  182. }
  183. reply := rpcs.ReplayAlarm{}
  184. setReply := rpcs.ArgsAlarmInfo{}
  185. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetAlarm", args, &reply)
  186. if err != nil {
  187. server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
  188. }
  189. if reply.AlarmParams.FirstReportTime == 0 {
  190. setArgs := rpcs.ArgsAlarmInfo{
  191. DeviceCode: deviceId,
  192. FirstReportTime: time.Now().Unix(),
  193. }
  194. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
  195. if err != nil {
  196. return false
  197. }
  198. return false
  199. }
  200. duration := time.Now().Sub(time.Unix(reply.AlarmParams.FirstReportTime, 0))
  201. if duration.Seconds() > float64(t.config.Interval*60) {
  202. interval := time.Now().Sub(time.Unix(reply.AlarmParams.LastAlarmTime, 0))
  203. if interval.Seconds() >= float64(t.config.Duration*60) {
  204. setArgs := rpcs.ArgsAlarmInfo{
  205. DeviceCode: deviceId,
  206. FirstReportTime: reply.AlarmParams.FirstReportTime,
  207. LastAlarmTime: time.Now().Unix(),
  208. }
  209. err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
  210. if err != nil {
  211. return false
  212. }
  213. return true
  214. }
  215. }
  216. return false
  217. }
  218. // 字符串比较
  219. func (t *TempAlarmNode) checkString(target string, value string, operator int) bool {
  220. switch operator {
  221. case 1:
  222. return target == value
  223. case 2:
  224. return target != value
  225. case 3:
  226. return strings.Contains(value, target)
  227. case 4:
  228. return !strings.Contains(value, target)
  229. }
  230. return false
  231. }
  232. // 数值比较
  233. func (t *TempAlarmNode) checkInt(target, value, operator int) bool {
  234. switch operator {
  235. case 1:
  236. return value > target
  237. case 2:
  238. return value >= target
  239. case 3:
  240. return value == target
  241. case 4:
  242. return value <= target
  243. case 5:
  244. return value < target
  245. case 6:
  246. return value != target
  247. }
  248. return false
  249. }
  250. // CheckValue 目标值验证
  251. func (t *TempAlarmNode) checkValue(target string, value interface{}, fieldType, operator int) bool {
  252. var strValue string
  253. var intValue int
  254. switch value.(type) {
  255. case string:
  256. strValue = value.(string)
  257. case int:
  258. intValue = value.(int)
  259. case float64:
  260. floatValue := value.(float64)
  261. intValue = int(floatValue)
  262. default:
  263. break
  264. }
  265. switch fieldType {
  266. case 1:
  267. if strValue != "" {
  268. return t.checkString(target, strValue, operator)
  269. }
  270. case 2:
  271. tInt, _ := strconv.Atoi(target)
  272. return t.checkInt(tInt, intValue, operator)
  273. }
  274. return false
  275. }
  276. // 根据变量映射填充模板
  277. func fillTemplate(template string, values map[string]interface{}) (string, error) {
  278. re := regexp.MustCompile(`\$\{([^}]+)\}`)
  279. return re.ReplaceAllStringFunc(template, func(match string) string {
  280. // 提取变量名
  281. varName := strings.TrimPrefix(strings.TrimSuffix(match, "}"), "${")
  282. // 检查变量是否存在
  283. value, exists := values[varName]
  284. if !exists {
  285. return match // 变量不存在,保持原样
  286. }
  287. // 根据值的类型进行转换
  288. switch v := value.(type) {
  289. case string:
  290. return v
  291. case int:
  292. return strconv.Itoa(v)
  293. case float32:
  294. return strconv.FormatFloat(float64(v), 'f', -1, 32)
  295. case float64:
  296. return strconv.FormatFloat(v, 'f', -1, 64)
  297. case bool:
  298. return strconv.FormatBool(v)
  299. case nil:
  300. return ""
  301. default:
  302. return fmt.Sprintf("%v", v)
  303. }
  304. }), nil
  305. }