Selaa lähdekoodia

fix: closed channel

lijian 2 vuotta sitten
vanhempi
commit
c95bde0d65
2 muutettua tiedostoa jossa 54 lisäystä ja 0 poistoa
  1. 4 0
      pkg/ruleEngine/mailbox.go
  2. 50 0
      pkg/ruleEngine/nodes/deviceid_filter_node.go

+ 4 - 0
pkg/ruleEngine/mailbox.go

@@ -211,11 +211,15 @@ func (m *MailBox) BroadcastChildren(msg protocol.ActorMsg) error {
 func (m *MailBox) destroy() error {
 	m.mu.Lock()
 	defer m.mu.Unlock()
+	if m.closed {
+		return nil
+	}
 	m.highPriorityMessages.Close()
 	m.normalPriorityMessages.Close()
 	m.setReadyStat(NOTREADY)
 	if err := m.actor.Destroy(); err != nil {
 		server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
 	}
+	m.closed = true
 	return nil
 }

+ 50 - 0
pkg/ruleEngine/nodes/deviceid_filter_node.go

@@ -0,0 +1,50 @@
+package nodes
+
+import (
+	"encoding/json"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/ruleEngine"
+)
+
+// DeviceIdFilterNode 设备ID过滤节点
+type DeviceIdFilterNode struct {
+	config *DeviceIdFilterConfig
+}
+
+type DeviceIdFilterConfig struct {
+	DeviceId    string `json:"device_id"`     // 设备ID
+	SubDeviceId string `json:"sub_device_id"` // 子设备ID
+}
+
+func (d *DeviceIdFilterNode) Init(ctx ruleEngine.Context, config string) error {
+	if config == "" {
+		d.config = &DeviceIdFilterConfig{
+			DeviceId:    "",
+			SubDeviceId: "",
+		}
+	} else {
+		c := new(DeviceIdFilterConfig)
+		err := json.Unmarshal([]byte(config), c)
+		if err != nil {
+			return err
+		}
+		d.config = c
+	}
+	return nil
+}
+
+func (d *DeviceIdFilterNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
+	msgDeviceId, ok := message.MetaData["device_id"]
+	if ok {
+		if msgDeviceId.(string) == d.config.DeviceId {
+			ctx.TellNext(message, protocol.True)
+		}
+	}
+	if msgSubDeviceId, ok := message.MetaData["sub_device_id"]; ok {
+		if msgSubDeviceId.(string) == d.config.SubDeviceId {
+			ctx.TellNext(message, protocol.True)
+		}
+	}
+	ctx.TellNext(message, protocol.False)
+	return nil
+}