ソースを参照

解决不转发

lijian 4 週間 前
コミット
2d68b26c8d
1 ファイル変更13 行追加6 行削除
  1. 13 6
      pkg/ruleEngine/mailbox.go

+ 13 - 6
pkg/ruleEngine/mailbox.go

@@ -1,9 +1,10 @@
 package ruleEngine
 
 import (
-	"github.com/gogf/gf/v2/container/gqueue"
+	"github.com/gogf/gf/container/gqueue"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/server"
+	"sync"
 	"sync/atomic"
 )
 
@@ -18,6 +19,8 @@ type MailBox struct {
 	busyState              int32
 	readySate              int32
 	throughPut             int // 处理的吞吐量
+	closed                 bool
+	mu                     sync.Mutex
 }
 
 const (
@@ -73,7 +76,6 @@ func (m *MailBox) tryInit(attempt int) {
 		attempt += 1
 		if attempt > 10 {
 			_ = m.system.StopActorById(m.id)
-			return
 		}
 		_ = m.dispatcher.Submit(func() {
 			m.tryInit(attempt)
@@ -90,7 +92,7 @@ func (m *MailBox) tryProcessQueue(newMsg bool) {
 		return
 	}
 
-	if newMsg || m.normalPriorityMessages.Len() > 0 || m.highPriorityMessages.Len() > 0 {
+	if newMsg || m.normalPriorityMessages.Size() > 0 || m.highPriorityMessages.Size() > 0 {
 		if m.getBusyStat() == FREE {
 			m.setBusyStat(BUSY)
 			err := m.dispatcher.Submit(func() {
@@ -110,9 +112,9 @@ func (m *MailBox) processMailbox() {
 	for i := 0; i < m.throughPut; i++ {
 		var msg protocol.ActorMsg
 		var getQueue *gqueue.Queue
-		if m.highPriorityMessages.Len() > 0 {
+		if m.highPriorityMessages.Size() > 0 {
 			getQueue = m.highPriorityMessages
-		} else if m.normalPriorityMessages.Len() > 0 {
+		} else if m.normalPriorityMessages.Size() > 0 {
 			getQueue = m.normalPriorityMessages
 		}
 		if getQueue == nil {
@@ -166,7 +168,6 @@ func (m *MailBox) Tell(msg protocol.ActorMsg) {
 
 // push a message to queue
 func (m *MailBox) enqueue(msg protocol.ActorMsg, isHighPriority bool) {
-
 	if isHighPriority {
 		m.highPriorityMessages.Push(msg)
 	} else {
@@ -208,11 +209,17 @@ func (m *MailBox) BroadcastChildren(msg protocol.ActorMsg) error {
 }
 
 func (m *MailBox) destroy() error {
+	m.mu.Lock()
+	defer m.mu.Unlock()
+	if m.closed {
+		return nil
+	}
 	m.highPriorityMessages.Close()
 	m.normalPriorityMessages.Close()
 	m.setReadyStat(NOTREADY)
 	if err := m.actor.Destroy(); err != nil {
 		server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
 	}
+	m.closed = true
 	return nil
 }