123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330 |
- package nodes
- import (
- "encoding/json"
- "fmt"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/os/grpool"
- "regexp"
- "sparrow/pkg/protocol"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- "sparrow/pkg/utils"
- "strconv"
- "strings"
- "sync"
- "time"
- )
- type TempAlarmNode struct {
- pool *grpool.Pool
- config *TempAlarmNodeConfig
- client *utils.HttpClient
- bufPool sync.Pool
- }
- func (t *TempAlarmNode) Init(ctx ruleEngine.Context, config string) error {
- t.pool = grpool.New(10)
- fmt.Printf("initConfig:---------------------------%s\r\n", config)
- if config == "" {
- t.config = &TempAlarmNodeConfig{}
- } else {
- c := new(TempAlarmNodeConfig)
- err := json.Unmarshal([]byte(config), c)
- if err != nil {
- fmt.Printf("config转换失败:----------------%s", err.Error())
- return err
- }
- fmt.Printf("config转换c:----------------%s", gjson.New(c).MustToJsonString())
- t.config = c
- }
- client := utils.NewHttpClient()
- client.SetLogger(server.Log)
- t.client = client
- return nil
- }
- func (t *TempAlarmNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
- deviceId := message.MetaData["device_id"].(string)
- if t.evaluateConditions(message) {
- err := t.newAlarmMessage(message)
- if err != nil {
- return err
- }
- ctx.TellNext(message, protocol.True)
- return nil
- } else {
- args := rpcs.ArgsGetAlarm{
- DeviceCode: deviceId,
- }
- reply := rpcs.ReplayAlarm{}
- err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.DelAlarm", args, &reply)
- if err != nil {
- server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
- }
- ctx.TellNext(message, protocol.False)
- }
- return nil
- }
- type TempAlarmNodeConfig struct {
- ProductKey string `json:"product_key"` // 设备产品key
- Duration int `json:"duration"` // 持续时间
- Interval int `json:"interval"` // 告警间隔
- FillUserInfo string `json:"fill_user_info"` // 是否需要匹配用户信息
- Rules []*Rule `json:"rules"` // 报警规则
- MessageTemplate string `json:"message_template"` // 报警内容模板
- MessageFields []*MessageFields `json:"message_fields"` // 报警内容字段
- }
- type MessageFields struct {
- Field string `json:"field"`
- Type int `json:"type"`
- }
- // Rule 表示一组报警规则
- type Rule struct {
- Conditions []*Condition `json:"conditions"`
- LogicalOp int `json:"logical_op"` // 1 "AND" 2 "OR"
- }
- type Condition struct {
- Field string `json:"field"` // 字段名
- Value string `json:"value"` // 值
- Operator int `json:"operator"` // 比较类型 数值比较 1 > 2 >= 3 = 4 <= 5 < 6 != 字符串比较 1 相等 2 不相等 3 包含 4 不包含
- Type int `json:"type"` //字段类型 1 字符串 2 数值
- }
- type AlarmMessage struct {
- DeviceId string `json:"device_id"`
- UserName string `json:"user_name"`
- Address string `json:"address"`
- Phone string `json:"phone"`
- SetTemp int `json:"set_temp"`
- Temperature int `json:"temperature"`
- Duration int `json:"duration"`
- }
- func (t *TempAlarmNode) newAlarmMessage(message *protocol.Message) error {
- dataMap := make(map[string]interface{})
- var err error
- if t.config.FillUserInfo == "true" {
- result, err := utils.NewRequest("GET", fmt.Sprintf("%s/%s", "http://127.0.0.1:8199/iot/v1/devices/", message.MetaData["device_id"].(string)), nil)
- if err != nil {
- return err
- }
- if result == nil {
- return nil
- }
- j := gjson.New(result)
- fmt.Printf("查询用户信息------------------:%s\r\n", j.MustToJsonString())
- dataMap = j.GetMap("data")
- fmt.Printf("dataMap------------------:%v\r\n", dataMap)
- } else {
- j := gjson.New(message.MetaData)
- dataMap = j.GetMap("status")
- }
- message.AlarmMessage, err = fillTemplate(t.config.MessageTemplate, dataMap)
- if err != nil {
- return err
- }
- return nil
- }
- func (t *TempAlarmNode) evaluateConditions(message *protocol.Message) bool {
- deviceId := message.MetaData["device_id"].(string)
- // 判断设备类型
- if message.MetaData["product_key"].(string) != t.config.ProductKey {
- return false
- }
- // 判断设备是否在线
- onlineargs := rpcs.ArgsGetDeviceOnlineStatus{
- Id: deviceId,
- }
- onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
- err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
- if err != nil || onlinereply.ClientIP == "" {
- return false
- }
- // 判断报警规则
- if len(t.config.Rules) == 0 {
- return false
- }
- data := make(map[string]interface{})
- var ok bool
- j := gjson.New(message.Data)
- status := j.Get("status")
- if data, ok = status.(map[string]interface{}); !ok {
- return false
- }
- // 判断设备是否开机
- if data["power"].(float64) == 0 {
- return false
- }
- for _, rule := range t.config.Rules {
- var results []bool
- for _, cond := range rule.Conditions {
- results = append(results, t.checkValue(cond.Value, data[cond.Field], cond.Type, cond.Operator))
- }
- // 应用逻辑运算
- if rule.LogicalOp == 1 {
- for _, r := range results {
- if !r {
- return false
- }
- }
- } else {
- for _, r := range results {
- if r {
- break
- }
- return false
- }
- }
- }
- // 判断持续时间
- if t.config.Duration == 0 {
- return true
- }
- args := rpcs.ArgsGetAlarm{
- DeviceCode: deviceId,
- }
- reply := rpcs.ReplayAlarm{}
- setReply := rpcs.ArgsAlarmInfo{}
- err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetAlarm", args, &reply)
- if err != nil {
- server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
- }
- if reply.AlarmParams.FirstReportTime == 0 {
- setArgs := rpcs.ArgsAlarmInfo{
- DeviceCode: deviceId,
- FirstReportTime: time.Now().Unix(),
- }
- err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
- if err != nil {
- return false
- }
- return false
- }
- duration := time.Now().Sub(time.Unix(reply.AlarmParams.FirstReportTime, 0))
- if duration.Seconds() > float64(t.config.Interval*60) {
- interval := time.Now().Sub(time.Unix(reply.AlarmParams.LastAlarmTime, 0))
- if interval.Seconds() >= float64(t.config.Duration*60) {
- setArgs := rpcs.ArgsAlarmInfo{
- DeviceCode: deviceId,
- FirstReportTime: reply.AlarmParams.FirstReportTime,
- LastAlarmTime: time.Now().Unix(),
- }
- err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
- if err != nil {
- return false
- }
- return true
- }
- }
- return false
- }
- // 字符串比较
- func (t *TempAlarmNode) checkString(target string, value string, operator int) bool {
- switch operator {
- case 1:
- return target == value
- case 2:
- return target != value
- case 3:
- return strings.Contains(value, target)
- case 4:
- return !strings.Contains(value, target)
- }
- return false
- }
- // 数值比较
- func (t *TempAlarmNode) checkInt(target, value, operator int) bool {
- switch operator {
- case 1:
- return value > target
- case 2:
- return value >= target
- case 3:
- return value == target
- case 4:
- return value <= target
- case 5:
- return value < target
- case 6:
- return value != target
- }
- return false
- }
- // CheckValue 目标值验证
- func (t *TempAlarmNode) checkValue(target string, value interface{}, fieldType, operator int) bool {
- var strValue string
- var intValue int
- switch value.(type) {
- case string:
- strValue = value.(string)
- case int:
- intValue = value.(int)
- case float64:
- floatValue := value.(float64)
- intValue = int(floatValue)
- default:
- break
- }
- switch fieldType {
- case 1:
- if strValue != "" {
- return t.checkString(target, strValue, operator)
- }
- case 2:
- tInt, _ := strconv.Atoi(target)
- return t.checkInt(tInt, intValue, operator)
- }
- return false
- }
- // 根据变量映射填充模板
- func fillTemplate(template string, values map[string]interface{}) (string, error) {
- re := regexp.MustCompile(`\$\{([^}]+)\}`)
- return re.ReplaceAllStringFunc(template, func(match string) string {
- // 提取变量名
- varName := strings.TrimPrefix(strings.TrimSuffix(match, "}"), "${")
- // 检查变量是否存在
- value, exists := values[varName]
- if !exists {
- return match // 变量不存在,保持原样
- }
- // 根据值的类型进行转换
- switch v := value.(type) {
- case string:
- return v
- case int:
- return strconv.Itoa(v)
- case float32:
- return strconv.FormatFloat(float64(v), 'f', -1, 32)
- case float64:
- return strconv.FormatFloat(v, 'f', -1, 64)
- case bool:
- return strconv.FormatBool(v)
- case nil:
- return ""
- default:
- return fmt.Sprintf("%v", v)
- }
- }), nil
- }
|