package nodes import ( "encoding/json" "fmt" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/os/grpool" "regexp" "sparrow/pkg/protocol" "sparrow/pkg/rpcs" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "sparrow/pkg/utils" "strconv" "strings" "sync" "time" ) type TempAlarmNode struct { pool *grpool.Pool config *TempAlarmNodeConfig client *utils.HttpClient bufPool sync.Pool } func (t *TempAlarmNode) Init(ctx ruleEngine.Context, config string) error { t.pool = grpool.New(10) fmt.Printf("initConfig:---------------------------%s\r\n", config) if config == "" { t.config = &TempAlarmNodeConfig{} } else { c := new(TempAlarmNodeConfig) err := json.Unmarshal([]byte(config), c) if err != nil { fmt.Printf("config转换失败:----------------%s", err.Error()) return err } fmt.Printf("config转换c:----------------%s", gjson.New(c).MustToJsonString()) t.config = c } client := utils.NewHttpClient() client.SetLogger(server.Log) t.client = client return nil } func (t *TempAlarmNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { deviceId := message.MetaData["device_id"].(string) if t.evaluateConditions(message) { err := t.newAlarmMessage(message) if err != nil { return err } ctx.TellNext(message, protocol.True) return nil } else { args := rpcs.ArgsGetAlarm{ DeviceCode: deviceId, } reply := rpcs.ReplayAlarm{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.DelAlarm", args, &reply) if err != nil { server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err) } ctx.TellNext(message, protocol.False) } return nil } type TempAlarmNodeConfig struct { ProductKey string `json:"product_key"` // 设备产品key Duration int `json:"duration"` // 持续时间 Interval int `json:"interval"` // 告警间隔 FillUserInfo string `json:"fill_user_info"` // 是否需要匹配用户信息 Rules []*Rule `json:"rules"` // 报警规则 MessageTemplate string `json:"message_template"` // 报警内容模板 MessageFields []*MessageFields `json:"message_fields"` // 报警内容字段 } type MessageFields struct { Field string `json:"field"` Type int `json:"type"` } // Rule 表示一组报警规则 type Rule struct { Conditions []*Condition `json:"conditions"` LogicalOp int `json:"logical_op"` // 1 "AND" 2 "OR" } type Condition struct { Field string `json:"field"` // 字段名 Value string `json:"value"` // 值 Operator int `json:"operator"` // 比较类型 数值比较 1 > 2 >= 3 = 4 <= 5 < 6 != 字符串比较 1 相等 2 不相等 3 包含 4 不包含 Type int `json:"type"` //字段类型 1 字符串 2 数值 } type AlarmMessage struct { DeviceId string `json:"device_id"` UserName string `json:"user_name"` Address string `json:"address"` Phone string `json:"phone"` SetTemp int `json:"set_temp"` Temperature int `json:"temperature"` Duration int `json:"duration"` } func (t *TempAlarmNode) newAlarmMessage(message *protocol.Message) error { dataMap := make(map[string]interface{}) var err error if t.config.FillUserInfo == "true" { result, err := utils.NewRequest("GET", fmt.Sprintf("%s/%s", "http://127.0.0.1:8199/iot/v1/devices/", message.MetaData["device_id"].(string)), nil) if err != nil { return err } if result == nil { return nil } j := gjson.New(result) fmt.Printf("查询用户信息------------------:%s\r\n", j.MustToJsonString()) dataMap = j.GetMap("data") fmt.Printf("dataMap------------------:%v\r\n", dataMap) } else { j := gjson.New(message.MetaData) dataMap = j.GetMap("status") } message.AlarmMessage, err = fillTemplate(t.config.MessageTemplate, dataMap) if err != nil { return err } return nil } func (t *TempAlarmNode) evaluateConditions(message *protocol.Message) bool { deviceId := message.MetaData["device_id"].(string) // 判断设备类型 if message.MetaData["product_key"].(string) != t.config.ProductKey { return false } // 判断设备是否在线 onlineargs := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceId, } onlinereply := rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply) if err != nil || onlinereply.ClientIP == "" { return false } // 判断报警规则 if len(t.config.Rules) == 0 { return false } data := make(map[string]interface{}) var ok bool j := gjson.New(message.Data) status := j.Get("status") if data, ok = status.(map[string]interface{}); !ok { return false } // 判断设备是否开机 if data["power"].(float64) == 0 { return false } for _, rule := range t.config.Rules { var results []bool for _, cond := range rule.Conditions { results = append(results, t.checkValue(cond.Value, data[cond.Field], cond.Type, cond.Operator)) } // 应用逻辑运算 if rule.LogicalOp == 1 { for _, r := range results { if !r { return false } } } else { for _, r := range results { if r { break } return false } } } // 判断持续时间 if t.config.Duration == 0 { return true } args := rpcs.ArgsGetAlarm{ DeviceCode: deviceId, } reply := rpcs.ReplayAlarm{} setReply := rpcs.ArgsAlarmInfo{} err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetAlarm", args, &reply) if err != nil { server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err) } if reply.AlarmParams.FirstReportTime == 0 { fmt.Printf("温度:%f,超过预设值,请求rpc接口保存数据------------------\r\n", data["temperature"]) setArgs := rpcs.ArgsAlarmInfo{ DeviceCode: deviceId, FirstReportTime: time.Now().Unix(), } err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply) if err != nil { fmt.Printf("err------------------:%s\r\n", err.Error()) return false } return false } duration := time.Now().Sub(time.Unix(reply.AlarmParams.FirstReportTime, 0)) if duration.Seconds() > float64(t.config.Interval*60) { interval := time.Now().Sub(time.Unix(reply.AlarmParams.LastAlarmTime, 0)) if interval.Seconds() >= float64(t.config.Duration*60) { setArgs := rpcs.ArgsAlarmInfo{ DeviceCode: deviceId, FirstReportTime: reply.AlarmParams.FirstReportTime, LastAlarmTime: time.Now().Unix(), } err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply) if err != nil { return false } return true } } return false } // 字符串比较 func (t *TempAlarmNode) checkString(target string, value string, operator int) bool { switch operator { case 1: return target == value case 2: return target != value case 3: return strings.Contains(value, target) case 4: return !strings.Contains(value, target) } return false } // 数值比较 func (t *TempAlarmNode) checkInt(target, value, operator int) bool { switch operator { case 1: return value > target case 2: return value >= target case 3: return value == target case 4: return value <= target case 5: return value < target case 6: return value != target } return false } // CheckValue 目标值验证 func (t *TempAlarmNode) checkValue(target string, value interface{}, fieldType, operator int) bool { var strValue string var intValue int switch value.(type) { case string: strValue = value.(string) case int: intValue = value.(int) case float64: floatValue := value.(float64) intValue = int(floatValue) default: break } switch fieldType { case 1: if strValue != "" { return t.checkString(target, strValue, operator) } case 2: tInt, _ := strconv.Atoi(target) return t.checkInt(tInt, intValue, operator) } return false } // 根据变量映射填充模板 func fillTemplate(template string, values map[string]interface{}) (string, error) { re := regexp.MustCompile(`\$\{([^}]+)\}`) return re.ReplaceAllStringFunc(template, func(match string) string { // 提取变量名 varName := strings.TrimPrefix(strings.TrimSuffix(match, "}"), "${") // 检查变量是否存在 value, exists := values[varName] if !exists { return match // 变量不存在,保持原样 } // 根据值的类型进行转换 switch v := value.(type) { case string: return v case int: return strconv.Itoa(v) case float32: return strconv.FormatFloat(float64(v), 'f', -1, 32) case float64: return strconv.FormatFloat(v, 'f', -1, 64) case bool: return strconv.FormatBool(v) case nil: return "" default: return fmt.Sprintf("%v", v) } }), nil }