Browse Source

增加温度报警节点及钉钉机器人节点

liuxiulin 12 hours ago
parent
commit
4b92d714bd

+ 75 - 0
pkg/deviceAlarm/deviceAlarm.go

@@ -0,0 +1,75 @@
+package deviceAlarm
+
+import (
+	"github.com/gogf/gf/database/gredis"
+)
+
+const (
+	AlarmPrefix = "alarm:"
+	dataExpires = 7200
+)
+
+type AlarmParams struct {
+	DeviceCode      string `json:"device_code"`
+	FirstReportTime int64  `json:"first_report_time"`
+	LastAlarmTime   int64  `json:"last_alarm_time"`
+}
+
+type Alarm struct {
+	redisClient *gredis.Redis
+}
+
+func NewAlarm(host string, port, db int) *Alarm {
+	red := gredis.New(&gredis.Config{
+		Host:      host,
+		Port:      port,
+		Db:        14,
+		MaxActive: 100,
+	})
+	helper := &Alarm{
+		redisClient: red,
+	}
+	return helper
+}
+
+func (a *Alarm) SetAlarm(params *AlarmParams) error {
+	key := AlarmPrefix + params.DeviceCode
+	_, err := a.redisClient.Do("SET", key, params)
+	if err != nil {
+		return err
+	}
+	_, err = a.redisClient.Do("EXPIRE", key, dataExpires)
+	if err != nil {
+		return err
+	}
+	return nil
+
+}
+
+func (a *Alarm) GetAlarm(code string) (*AlarmParams, error) {
+	key := AlarmPrefix + code
+	alarm := new(AlarmParams)
+	// get status from redis
+	result, err := a.redisClient.DoVar("GET", key)
+	if err != nil {
+		return nil, err
+	}
+	err = result.Struct(alarm)
+	if err != nil {
+		return nil, err
+	}
+
+	return alarm, nil
+
+}
+
+func (a *Alarm) DeleteAlarm(code string) error {
+	key := AlarmPrefix + code
+	// get status from redis
+	_, err := a.redisClient.DoVar("DEL", key)
+	if err != nil {
+		return err
+	}
+	return nil
+
+}

+ 2 - 2
pkg/mqtt/message.go

@@ -37,7 +37,7 @@ func (qos TagQosLevel) IsExactlyOnce() bool {
 	return qos == QosExactlyOnce
 }
 
-// Message Type
+// Message NoticeType
 const (
 	MsgConnect = TagMessageType(iota + 1)
 	MsgConnAck
@@ -56,7 +56,7 @@ const (
 	MsgInvalid
 )
 
-//  retcode
+// retcode
 const (
 	RetCodeAccepted = TagRetCode(iota)
 	RetCodeUnacceptableProtocolVersion

+ 12 - 11
pkg/protocol/message.go

@@ -8,17 +8,18 @@ import (
 )
 
 type Message struct {
-	QueueName   string
-	Id          string
-	Ts          *time.Time
-	Type        string
-	Data        string
-	RuleChanId  string
-	RuleNodeId  string
-	Callback    IMessageCallBack
-	MetaData    map[string]interface{}
-	Originator  string
-	execCounter int32
+	QueueName    string
+	Id           string
+	Ts           *time.Time
+	Type         string
+	Data         string
+	RuleChanId   string
+	RuleNodeId   string
+	Callback     IMessageCallBack
+	MetaData     map[string]interface{}
+	Originator   string
+	execCounter  int32
+	AlarmMessage string
 }
 
 func (a *Message) TransformMsg(msgType string, ori string, data string) *Message {

+ 11 - 0
pkg/rpcs/devicemanager.go

@@ -1,6 +1,7 @@
 package rpcs
 
 import (
+	"sparrow/pkg/deviceAlarm"
 	"sparrow/pkg/online"
 )
 
@@ -58,3 +59,13 @@ type ReplyOtaProgress struct {
 type ReplyDeviceStatus struct {
 	Status interface{}
 }
+
+type ArgsAlarmInfo deviceAlarm.AlarmParams
+
+type ArgsGetAlarm struct {
+	DeviceCode string
+}
+
+type ReplayAlarm struct {
+	AlarmParams *deviceAlarm.AlarmParams
+}

+ 83 - 0
pkg/ruleEngine/nodes/dingtalk_robot_node.go

@@ -0,0 +1,83 @@
+package nodes
+
+import (
+	"encoding/json"
+	"github.com/gogf/gf/encoding/gjson"
+	"github.com/gogf/gf/os/grpool"
+	"net/http"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+	"sparrow/pkg/server"
+	"sparrow/pkg/utils"
+	"sync"
+)
+
+type DingTalkRobotNode struct {
+	pool    *grpool.Pool
+	config  *DingTalkRobotNodeConfig
+	client  *utils.HttpClient
+	bufPool sync.Pool
+}
+
+type DingTalkRobotNodeConfig struct {
+	WebHook string `json:"webhook"` // webhook
+}
+
+func (d *DingTalkRobotNode) Init(ctx ruleEngine.Context, config string) error {
+	d.pool = grpool.New(10)
+	if config == "" {
+		d.config = &DingTalkRobotNodeConfig{
+			WebHook: "http://localhost:8899/test",
+		}
+	} else {
+		c := new(DingTalkRobotNodeConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		d.config = c
+	}
+	client := utils.NewHttpClient()
+	client.SetLogger(server.Log)
+	d.client = client
+	return nil
+}
+
+func (d *DingTalkRobotNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	body, err := d.newBody(message)
+	if body == nil {
+		return nil
+	} else if err != nil {
+		return err
+	}
+	req, err := utils.NewRequest("POST", d.config.WebHook, gjson.New(body))
+	if err != nil {
+		server.Log.Error(err)
+		return err
+	}
+	req.Header.Add("Content-Type", "application/json")
+	return d.pool.Add(func() {
+		res, err := d.client.Do(req)
+		if err != nil {
+			server.Log.Errorf("请求出错%s", err.Error())
+			return
+		}
+		if res != nil && res.Response() != nil {
+			defer res.Close()
+			if res.Response().StatusCode == http.StatusOK {
+				ctx.TellSuccess(message)
+			}
+		}
+	})
+}
+
+func (d *DingTalkRobotNode) newBody(message *protocol.Message) (body map[string]interface{}, err error) {
+	msg := map[string]interface{}{
+		"content": message.AlarmMessage,
+	}
+	body = map[string]interface{}{
+		"msgtype": "text",
+		"text":    msg,
+	}
+	return body, nil
+}

+ 1 - 0
pkg/ruleEngine/nodes/reg_types.go

@@ -21,6 +21,7 @@ func init() {
 	registerType((*MQTTBrokerNode)(nil))
 	registerType((*DeviceIdFilterNode)(nil))
 	registerType((*LogNode)(nil))
+	registerType((*DingTalkRobotNode)(nil))
 }
 
 func registerType(elem interface{}) {

+ 328 - 0
pkg/ruleEngine/nodes/temp_alarm_node.go

@@ -0,0 +1,328 @@
+package nodes
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/gogf/gf/encoding/gjson"
+	"github.com/gogf/gf/os/grpool"
+	"regexp"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/ruleEngine"
+	"sparrow/pkg/server"
+	"sparrow/pkg/utils"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+type TempAlarmNode struct {
+	pool    *grpool.Pool
+	config  *TempAlarmNodeConfig
+	client  *utils.HttpClient
+	bufPool sync.Pool
+}
+
+func (t *TempAlarmNode) Init(ctx ruleEngine.Context, config string) error {
+	t.pool = grpool.New(10)
+	if config == "" {
+		t.config = &TempAlarmNodeConfig{}
+	} else {
+		c := new(TempAlarmNodeConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		t.config = c
+	}
+	client := utils.NewHttpClient()
+	client.SetLogger(server.Log)
+	t.client = client
+	return nil
+}
+
+func (t *TempAlarmNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	deviceId := message.MetaData["device_id"].(string)
+	if t.evaluateConditions(message) {
+		err := t.newAlarmMessage(message)
+		if err != nil {
+			return err
+		}
+		ctx.TellNext(message, protocol.True)
+		return nil
+	} else {
+		args := rpcs.ArgsGetAlarm{
+			DeviceCode: deviceId,
+		}
+		reply := rpcs.ReplayAlarm{}
+		err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.DelAlarm", args, &reply)
+		if err != nil {
+			server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
+		}
+		ctx.TellNext(message, protocol.False)
+	}
+	return nil
+}
+
+type TempAlarmNodeConfig struct {
+	ProductKey      string           `json:"product_key"`      // 设备产品key
+	Duration        int              `json:"duration"`         // 持续时间
+	Interval        int              `json:"interval"`         // 告警间隔
+	FillUserInfo    bool             `json:"fill_user_info"`   // 是否需要匹配用户信息
+	Rules           []*Rule          `json:"rules"`            // 报警规则
+	MessageTemplate string           `json:"message_template"` // 报警内容模板
+	MessageFields   []*MessageFields `json:"message_fields"`   // 报警内容字段
+}
+
+type MessageFields struct {
+	Field string `json:"field"`
+	Type  string `json:"type"`
+}
+
+// Rule 表示一组报警规则
+type Rule struct {
+	Conditions []*Condition `json:"conditions"`
+	LogicalOp  int          `json:"logical_op"` // 1 "AND" 2 "OR"
+}
+
+type Condition struct {
+	Field    string `json:"field"`    // 字段名
+	Value    string `json:"value"`    // 值
+	Operator int    `json:"operator"` // 比较类型  数值比较 1 >  2 >= 3 = 4 <= 5 < 6 !=   字符串比较  1 相等  2  不相等   3 包含  4  不包含
+	Type     int    `json:"type"`     //字段类型 1 字符串 2 数值
+}
+
+type AlarmMessage struct {
+	DeviceId    string `json:"device_id"`
+	UserName    string `json:"user_name"`
+	Address     string `json:"address"`
+	Phone       string `json:"phone"`
+	SetTemp     int    `json:"set_temp"`
+	Temperature int    `json:"temperature"`
+	Duration    int    `json:"duration"`
+}
+
+func (t *TempAlarmNode) newAlarmMessage(message *protocol.Message) error {
+	dataMap := make(map[string]interface{})
+	var err error
+	if t.config.FillUserInfo {
+		result, err := utils.NewRequest("GET", fmt.Sprintf("%s/%s", "http://127.0.0.1:8199/iot/v1/devices/", message.MetaData["device_id"].(string)), nil)
+		if err != nil {
+			return err
+		}
+		if result == nil {
+			return nil
+		}
+		j := gjson.New(result)
+		fmt.Printf("查询用户信息------------------:%s\r\n", j.MustToJsonString())
+		dataMap = j.GetMap("data")
+		fmt.Printf("dataMap------------------:%v\r\n", dataMap)
+	} else {
+		j := gjson.New(message.MetaData)
+		dataMap = j.GetMap("status")
+	}
+
+	message.AlarmMessage, err = fillTemplate(t.config.MessageTemplate, dataMap)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (t *TempAlarmNode) evaluateConditions(message *protocol.Message) bool {
+	deviceId := message.MetaData["device_id"].(string)
+	fmt.Printf("message------------------:%s\r\n", gjson.New(message.MetaData).MustToJsonString())
+	// 判断设备类型
+	if message.MetaData["product_key"] != t.config.ProductKey {
+		fmt.Printf("productKey------------------:%s\r\n", message.MetaData["product_key"])
+		return false
+	}
+
+	// 判断设备是否在线
+	onlineargs := rpcs.ArgsGetDeviceOnlineStatus{
+		Id: deviceId,
+	}
+	onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
+	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+	if err != nil || onlinereply.ClientIP == "" {
+		return false
+	}
+
+	// 判断报警规则
+	if len(t.config.Rules) == 0 {
+		return false
+	}
+
+	data := make(map[string]interface{})
+	var ok bool
+	if data, ok = message.MetaData["status"].(map[string]interface{}); !ok {
+		return false
+	}
+	// 判断设备是否开机
+	if data["power"].(int) == 0 {
+		return false
+	}
+
+	for _, rule := range t.config.Rules {
+		var results []bool
+		for _, cond := range rule.Conditions {
+			results = append(results, t.CheckValue(cond.Value, data[cond.Field], cond.Type, cond.Operator))
+		}
+		// 应用逻辑运算
+		if rule.LogicalOp == 1 {
+			for _, r := range results {
+				if !r {
+					return false
+				}
+			}
+		} else {
+			for _, r := range results {
+				if r {
+					break
+				}
+				return false
+			}
+		}
+	}
+
+	// 判断持续时间
+	if t.config.Duration == 0 {
+		return true
+	}
+	args := rpcs.ArgsGetAlarm{
+		DeviceCode: deviceId,
+	}
+	reply := rpcs.ReplayAlarm{}
+	setReply := rpcs.ArgsAlarmInfo{}
+	err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetAlarm", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device offline error. deviceid: %v, error: %v", reply.AlarmParams.DeviceCode, err)
+	}
+
+	if reply.AlarmParams.FirstReportTime == 0 {
+		setArgs := rpcs.ArgsAlarmInfo{
+			DeviceCode:      deviceId,
+			FirstReportTime: time.Now().Unix(),
+		}
+		err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
+		if err != nil {
+			return false
+		}
+		return false
+	}
+
+	duration := time.Now().Sub(time.Unix(reply.AlarmParams.FirstReportTime, 0))
+	if duration.Seconds() > float64(t.config.Interval*60) {
+		interval := time.Now().Sub(time.Unix(reply.AlarmParams.LastAlarmTime, 0))
+		if interval.Seconds() >= float64(t.config.Duration*60) {
+			setArgs := rpcs.ArgsAlarmInfo{
+				DeviceCode:      deviceId,
+				FirstReportTime: reply.AlarmParams.FirstReportTime,
+				LastAlarmTime:   time.Now().Unix(),
+			}
+			err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.SetAlarm", setArgs, &setReply)
+			if err != nil {
+				return false
+			}
+
+			return true
+		}
+	}
+	return false
+}
+
+// 字符串比较
+func (t *TempAlarmNode) checkString(target string, value string, operator int) bool {
+	switch operator {
+	case 1:
+		return target == value
+	case 2:
+		return target != value
+	case 3:
+		return strings.Contains(value, target)
+	case 4:
+		return !strings.Contains(value, target)
+	}
+	return false
+}
+
+// 数值比较
+func (t *TempAlarmNode) checkInt(target, value, operator int) bool {
+	switch operator {
+	case 1:
+		return value > target
+	case 2:
+		return value >= target
+	case 3:
+		return value == target
+	case 4:
+		return value <= target
+	case 5:
+		return value < target
+	case 6:
+		return value != target
+	}
+	return false
+}
+
+// CheckValue 目标值验证
+func (t *TempAlarmNode) CheckValue(target string, value interface{}, fieldType, operator int) bool {
+	var strValue string
+	var intValue int
+	switch value.(type) {
+	case string:
+		strValue = value.(string)
+	case int:
+		intValue = value.(int)
+	case float64:
+		floatValue := value.(float64)
+		intValue = int(floatValue)
+	default:
+		break
+	}
+	switch fieldType {
+	case 1:
+		if strValue != "" {
+			return t.checkString(target, strValue, operator)
+		}
+	case 2:
+		tInt, _ := strconv.Atoi(target)
+		return t.checkInt(tInt, intValue, operator)
+	}
+	return false
+}
+
+// 根据变量映射填充模板
+func fillTemplate(template string, values map[string]interface{}) (string, error) {
+	re := regexp.MustCompile(`\$\{([^}]+)\}`)
+
+	return re.ReplaceAllStringFunc(template, func(match string) string {
+		// 提取变量名
+		varName := strings.TrimPrefix(strings.TrimSuffix(match, "}"), "${")
+
+		// 检查变量是否存在
+		value, exists := values[varName]
+		if !exists {
+			return match // 变量不存在,保持原样
+		}
+
+		// 根据值的类型进行转换
+		switch v := value.(type) {
+		case string:
+			return v
+		case int:
+			return strconv.Itoa(v)
+		case float32:
+			return strconv.FormatFloat(float64(v), 'f', -1, 32)
+		case float64:
+			return strconv.FormatFloat(v, 'f', -1, 64)
+		case bool:
+			return strconv.FormatBool(v)
+		case nil:
+			return ""
+		default:
+			return fmt.Sprintf("%v", v)
+		}
+	}), nil
+}

+ 28 - 0
services/devicemanager/manager.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"sparrow/pkg/deviceAlarm"
 	"sparrow/pkg/deviceStatus"
 	"sparrow/pkg/online"
 	"sparrow/pkg/otaUpgrade"
@@ -13,6 +14,7 @@ type DeviceManager struct {
 	tokenHelper   *token.Helper
 	otaManager    *otaUpgrade.OtaManager
 	statusManager *deviceStatus.DevStatusManager
+	alarmManager  *deviceAlarm.Alarm
 }
 
 func NewDeviceManager(redishost string, port, db int) *DeviceManager {
@@ -20,11 +22,13 @@ func NewDeviceManager(redishost string, port, db int) *DeviceManager {
 	helper := token.NewHelper(redishost, port, db)
 	otaMgr := otaUpgrade.NewOtaManager(redishost, port, db)
 	statusMgr := deviceStatus.NewDevStatusManager(redishost, port)
+	alarmMgr := deviceAlarm.NewAlarm(redishost, port, db)
 	return &DeviceManager{
 		onlineManager: mgr,
 		tokenHelper:   helper,
 		otaManager:    otaMgr,
 		statusManager: statusMgr,
+		alarmManager:  alarmMgr,
 	}
 }
 
@@ -140,3 +144,27 @@ func (dm *DeviceManager) GetDeviceInfo(args rpcs.ArgsGetStatus, reply *rpcs.Repl
 	}
 	return nil
 }
+
+func (dm *DeviceManager) SetAlarm(args rpcs.ArgsAlarmInfo, reply *rpcs.ArgsAlarmInfo) error {
+	return dm.alarmManager.SetAlarm(&deviceAlarm.AlarmParams{
+		DeviceCode:      args.DeviceCode,
+		FirstReportTime: args.FirstReportTime,
+		LastAlarmTime:   args.LastAlarmTime,
+	})
+}
+
+func (dm *DeviceManager) GetAlarm(args rpcs.ArgsGetAlarm, reply *rpcs.ReplayAlarm) error {
+	info, err := dm.alarmManager.GetAlarm(args.DeviceCode)
+	if err != nil {
+		return err
+	}
+
+	if info != nil {
+		reply.AlarmParams = info
+	}
+	return nil
+}
+
+func (dm *DeviceManager) DelAlarm(args rpcs.ArgsGetAlarm, reply *rpcs.ReplayAlarm) error {
+	return dm.alarmManager.DeleteAlarm(args.DeviceCode)
+}