package manager import ( "encoding/json" "errors" "fmt" "github.com/gogf/gf/container/gmap" "github.com/gogf/gf/encoding/gjson" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sparrow/pkg/utils" "time" ) type DeviceSceneConfig struct { SceneId string `json:"scene_id"` DecisionExpr string `json:"decision_expr"` // 条件表达式 and or Conditions []*DeviceCondition `json:"conditions"` // 条件 Actions []*Action `json:"actions"` // 执行动作 Interval int `json:"interval"` // 检查间隔(秒) ticker *time.Ticker `json:"-"` // 定时器 stopChan chan struct{} `json:"-"` // 停止信号通道 } // DeviceCondition 设备场景配置 type DeviceCondition struct { DeviceType string `json:"device_type"` // 设备类型 DeviceId string `json:"device_id"` // 设备id SubDeviceId string `json:"sub_device_id"` // 子设备id FieldType int `json:"field_type"` // 字段类型 1字符串 2数值 Field string `json:"field"` // 字段名 TargetValue string `json:"target_value"` // 值 Operator int `json:"operator"` // 比较类型 数值比较 1 > 2 >= 3 = 4 <= 5 < 6 != } type DeviceSceneService struct { tasks *gmap.HashMap } func NewDeviceSceneService() *DeviceSceneService { return &DeviceSceneService{ tasks: gmap.New(true), } } func (d *DeviceSceneService) Add(config string) error { var c DeviceSceneConfig err := json.Unmarshal([]byte(config), &c) if err != nil { } if len(c.Conditions) == 0 { return errors.New("天气监控任务配置错误:判断条件不能为空") } // 初始化Ticker和停止通道 c.ticker = time.NewTicker(time.Duration(c.Interval) * time.Minute) c.stopChan = make(chan struct{}) // 启动监控协程 go d.monitorTask(c) d.tasks.Set(c.SceneId, c) return nil } func (d *DeviceSceneService) Update(config string) error { var c DeviceSceneConfig err := json.Unmarshal([]byte(config), &c) if err != nil { server.Log.Errorf("config to timerConfig error :%s", err.Error()) } _ = d.Stop(c.SceneId) // 初始化Ticker和停止通道 c.ticker = time.NewTicker(time.Duration(c.Interval) * time.Minute) c.stopChan = make(chan struct{}) // 启动监控协程 go d.monitorTask(c) d.tasks.Set(c.SceneId, c) server.Log.Debugf("UpdateWeatherScene :%s", config) return nil } func (d *DeviceSceneService) Remove(config string) error { var c DeviceSceneConfig err := json.Unmarshal([]byte(config), &c) if err != nil { server.Log.Errorf("config to timerConfig error :%s", err.Error()) } d.tasks.Remove(c.SceneId) server.Log.Debugf("RemoveTimeScene :%s", c.SceneId) return nil } // Start 停止任务 func (d *DeviceSceneService) Start(id string) error { if !d.tasks.Contains(id) { return errors.New("任务不存在") } task := d.tasks.Get(id) c := task.(DeviceSceneConfig) go d.monitorTask(c) return nil } // Stop 停止任务 func (d *DeviceSceneService) Stop(id string) error { if !d.tasks.Contains(id) { return errors.New("任务不存在") } task := d.tasks.Get(id) c := task.(DeviceSceneConfig) c.stopChan <- struct{}{} return nil } // monitorTask 监控任务:使用select监听Ticker和停止信号 func (d *DeviceSceneService) monitorTask(config DeviceSceneConfig) { for { select { case <-config.ticker.C: // 定时触发 result, err := d.checkDeviceCondition(config) if err != nil { server.Log.Errorf("compare weather condition error :%s", err.Error()) } if result { if err = NewTaskExecutor(config.Actions).Do(); err != nil { server.Log.Errorf("weather do taskid :%s error:%s", config.SceneId, err.Error()) } } case <-config.stopChan: // 收到停止信号 config.ticker.Stop() return } } } func (d *DeviceSceneService) checkDeviceCondition(config DeviceSceneConfig) (bool, error) { var results []bool for _, v := range config.Conditions { var args rpcs.ArgsGetStatus args.Key = fmt.Sprintf("device:%s:status:%s%s", v.DeviceType, v.DeviceId, v.SubDeviceId) var reply rpcs.ReplyStatus err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceStatus", args, &reply) if err != nil { server.Log.Errorf("设备状态数据获取失败:%v", err) return false, err } j := gjson.New(reply.Status) // 判断是否满足条件并填入到result results = append(results, utils.CheckValue(v.TargetValue, j.Get(v.Field), v.FieldType, v.Operator)) } switch config.DecisionExpr { case "and": for _, v := range results { if !v { return false, nil } } return true, nil case "or": for _, v := range results { if v { return true, nil } } } return false, nil }