|
@@ -4,6 +4,7 @@ import (
|
|
|
"github.com/gogf/gf/container/gqueue"
|
|
|
"sparrow/pkg/protocol"
|
|
|
"sparrow/pkg/server"
|
|
|
+ "sync"
|
|
|
"sync/atomic"
|
|
|
)
|
|
|
|
|
@@ -18,6 +19,8 @@ type MailBox struct {
|
|
|
busyState int32
|
|
|
readySate int32
|
|
|
throughPut int // 处理的吞吐量
|
|
|
+ closed bool
|
|
|
+ mu sync.Mutex
|
|
|
}
|
|
|
|
|
|
const (
|
|
@@ -73,6 +76,7 @@ func (m *MailBox) tryInit(attempt int) {
|
|
|
attempt += 1
|
|
|
if attempt > 10 {
|
|
|
_ = m.system.StopActorById(m.id)
|
|
|
+ return
|
|
|
}
|
|
|
_ = m.dispatcher.Submit(func() {
|
|
|
m.tryInit(attempt)
|
|
@@ -206,12 +210,18 @@ func (m *MailBox) BroadcastChildren(msg protocol.ActorMsg) error {
|
|
|
}
|
|
|
|
|
|
func (m *MailBox) destroy() error {
|
|
|
- return m.dispatcher.Submit(func() {
|
|
|
- //m.highPriorityMessages.Close()
|
|
|
- //m.normalPriorityMessages.Close()
|
|
|
- m.setReadyStat(NOTREADY)
|
|
|
- if err := m.actor.Destroy(); err != nil {
|
|
|
- server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
|
|
|
- }
|
|
|
- })
|
|
|
+ server.Log.Debugf("Going to destroy mailbox:%s", m.id)
|
|
|
+ m.mu.Lock()
|
|
|
+ defer m.mu.Unlock()
|
|
|
+ if m.closed {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ m.highPriorityMessages.Close()
|
|
|
+ m.normalPriorityMessages.Close()
|
|
|
+ m.setReadyStat(NOTREADY)
|
|
|
+ if err := m.actor.Destroy(); err != nil {
|
|
|
+ server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
|
|
|
+ }
|
|
|
+ m.closed = true
|
|
|
+ return nil
|
|
|
}
|