|
@@ -4,7 +4,6 @@ import (
|
|
"github.com/gogf/gf/v2/container/gqueue"
|
|
"github.com/gogf/gf/v2/container/gqueue"
|
|
"sparrow/pkg/protocol"
|
|
"sparrow/pkg/protocol"
|
|
"sparrow/pkg/server"
|
|
"sparrow/pkg/server"
|
|
- "sync"
|
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
)
|
|
)
|
|
|
|
|
|
@@ -19,8 +18,6 @@ type MailBox struct {
|
|
busyState int32
|
|
busyState int32
|
|
readySate int32
|
|
readySate int32
|
|
throughPut int // 处理的吞吐量
|
|
throughPut int // 处理的吞吐量
|
|
- closed bool
|
|
|
|
- mu sync.Mutex
|
|
|
|
}
|
|
}
|
|
|
|
|
|
const (
|
|
const (
|
|
@@ -93,7 +90,7 @@ func (m *MailBox) tryProcessQueue(newMsg bool) {
|
|
return
|
|
return
|
|
}
|
|
}
|
|
|
|
|
|
- if newMsg || m.normalPriorityMessages.Size() > 0 || m.highPriorityMessages.Size() > 0 {
|
|
|
|
|
|
+ if newMsg || m.normalPriorityMessages.Len() > 0 || m.highPriorityMessages.Len() > 0 {
|
|
if m.getBusyStat() == FREE {
|
|
if m.getBusyStat() == FREE {
|
|
m.setBusyStat(BUSY)
|
|
m.setBusyStat(BUSY)
|
|
err := m.dispatcher.Submit(func() {
|
|
err := m.dispatcher.Submit(func() {
|
|
@@ -113,9 +110,9 @@ func (m *MailBox) processMailbox() {
|
|
for i := 0; i < m.throughPut; i++ {
|
|
for i := 0; i < m.throughPut; i++ {
|
|
var msg protocol.ActorMsg
|
|
var msg protocol.ActorMsg
|
|
var getQueue *gqueue.Queue
|
|
var getQueue *gqueue.Queue
|
|
- if m.highPriorityMessages.Size() > 0 {
|
|
|
|
|
|
+ if m.highPriorityMessages.Len() > 0 {
|
|
getQueue = m.highPriorityMessages
|
|
getQueue = m.highPriorityMessages
|
|
- } else if m.normalPriorityMessages.Size() > 0 {
|
|
|
|
|
|
+ } else if m.normalPriorityMessages.Len() > 0 {
|
|
getQueue = m.normalPriorityMessages
|
|
getQueue = m.normalPriorityMessages
|
|
}
|
|
}
|
|
if getQueue == nil {
|
|
if getQueue == nil {
|
|
@@ -169,8 +166,7 @@ func (m *MailBox) Tell(msg protocol.ActorMsg) {
|
|
|
|
|
|
// push a message to queue
|
|
// push a message to queue
|
|
func (m *MailBox) enqueue(msg protocol.ActorMsg, isHighPriority bool) {
|
|
func (m *MailBox) enqueue(msg protocol.ActorMsg, isHighPriority bool) {
|
|
- m.mu.Lock()
|
|
|
|
- defer m.mu.Unlock()
|
|
|
|
|
|
+
|
|
if isHighPriority {
|
|
if isHighPriority {
|
|
m.highPriorityMessages.Push(msg)
|
|
m.highPriorityMessages.Push(msg)
|
|
} else {
|
|
} else {
|
|
@@ -212,17 +208,11 @@ func (m *MailBox) BroadcastChildren(msg protocol.ActorMsg) error {
|
|
}
|
|
}
|
|
|
|
|
|
func (m *MailBox) destroy() error {
|
|
func (m *MailBox) destroy() error {
|
|
- m.mu.Lock()
|
|
|
|
- defer m.mu.Unlock()
|
|
|
|
- if m.closed {
|
|
|
|
- return nil
|
|
|
|
- }
|
|
|
|
- //m.highPriorityMessages.Close()
|
|
|
|
- //m.normalPriorityMessages.Close()
|
|
|
|
|
|
+ m.highPriorityMessages.Close()
|
|
|
|
+ m.normalPriorityMessages.Close()
|
|
m.setReadyStat(NOTREADY)
|
|
m.setReadyStat(NOTREADY)
|
|
if err := m.actor.Destroy(); err != nil {
|
|
if err := m.actor.Destroy(); err != nil {
|
|
server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
|
|
server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error())
|
|
}
|
|
}
|
|
- m.closed = true
|
|
|
|
return nil
|
|
return nil
|
|
}
|
|
}
|