package ruleEngine import ( "github.com/gogf/gf/container/gqueue" "sparrow/pkg/protocol" "sparrow/pkg/server" "sync" "sync/atomic" ) type MailBox struct { system System id string parentRef Ref actor Actor dispatcher IDispatcher highPriorityMessages *gqueue.Queue normalPriorityMessages *gqueue.Queue busyState int32 readySate int32 throughPut int // 处理的吞吐量 closed bool mu sync.Mutex } const ( _ int32 = iota FREE BUSY NOTREADY READY ) func NewMailBox( system System, selfId string, parentRef Ref, actor Actor, dispatcher IDispatcher, ) *MailBox { return &MailBox{ highPriorityMessages: gqueue.New(), normalPriorityMessages: gqueue.New(), system: system, id: selfId, parentRef: parentRef, actor: actor, dispatcher: dispatcher, throughPut: 10, } } // set mailbox ready status func (m *MailBox) setReadyStat(stat int32) { v := atomic.LoadInt32(&stat) m.readySate = v } func (m *MailBox) getReadyStat() int32 { return atomic.LoadInt32(&m.readySate) } func (m *MailBox) getBusyStat() int32 { return atomic.LoadInt32(&m.busyState) } // set mailbox busy status func (m *MailBox) setBusyStat(stat int32) { v := atomic.LoadInt32(&stat) m.busyState = v } func (m *MailBox) tryInit(attempt int) { server.Log.Debugf("Try to init actor, attempt %d", attempt) err := m.actor.Init(m) if err != nil { server.Log.Errorf("failed to init actor, err :%s, attempt %d", err.Error(), attempt) attempt += 1 if attempt > 10 { _ = m.system.StopActorById(m.id) } _ = m.dispatcher.Submit(func() { m.tryInit(attempt) }) } m.setReadyStat(READY) m.setBusyStat(FREE) m.tryProcessQueue(false) } func (m *MailBox) tryProcessQueue(newMsg bool) { if m.getReadyStat() != READY { server.Log.Debugf("MessageBox is not ready") return } if newMsg || m.normalPriorityMessages.Size() > 0 || m.highPriorityMessages.Size() > 0 { if m.getBusyStat() == FREE { m.setBusyStat(BUSY) err := m.dispatcher.Submit(func() { m.processMailbox() }) if err != nil { server.Log.Error(err) } } } else { server.Log.Debugf("MessageBox is empty") } } func (m *MailBox) processMailbox() { var noMoreElement bool for i := 0; i < m.throughPut; i++ { var msg protocol.ActorMsg var getQueue *gqueue.Queue if m.highPriorityMessages.Size() > 0 { getQueue = m.highPriorityMessages } else if m.normalPriorityMessages.Size() > 0 { getQueue = m.normalPriorityMessages } if getQueue == nil { noMoreElement = true break } msg, ok := getQueue.Pop().(protocol.ActorMsg) if !ok { noMoreElement = true break } if msg != nil { server.Log.Debugf("Going to process message:%s, %v", m.id, msg) if err := m.actor.Process(msg); err != nil { strategy := m.actor.OnProcessFailure(err) if strategy._stop { _ = m.system.StopActorById(m.id) } } } else { noMoreElement = true break } } if noMoreElement { m.setBusyStat(FREE) _ = m.dispatcher.Submit(func() { m.tryProcessQueue(false) }) } else { _ = m.dispatcher.Submit(func() { m.processMailbox() }) } } // Init 邮箱初始化 func (m *MailBox) Init() error { return m.dispatcher.Submit(func() { m.tryInit(1) }) } func (m *MailBox) GetActorId() string { return m.id } func (m *MailBox) Tell(msg protocol.ActorMsg) { m.enqueue(msg, false) } // push a message to queue func (m *MailBox) enqueue(msg protocol.ActorMsg, isHighPriority bool) { if isHighPriority { m.highPriorityMessages.Push(msg) } else { m.normalPriorityMessages.Push(msg) } m.tryProcessQueue(true) } func (m *MailBox) TellWithHighPriority(msg protocol.ActorMsg) { m.enqueue(msg, true) } func (m *MailBox) GetSelf() string { return m.id } func (m *MailBox) GetParentRef() Ref { return m.parentRef } func (m *MailBox) TellActor(actorId string, msg protocol.ActorMsg) { m.system.Tell(actorId, msg) } func (m *MailBox) Stop(actorId string) error { return m.system.StopActorById(actorId) } func (m *MailBox) GetOrCreateChildActor(actorId string, dispatcherName string, creator Creator) (Ref, error) { actorRef := m.system.GetActor(actorId) if actorRef == nil { return m.system.CreateChildActor(dispatcherName, creator, m.id) } return actorRef, nil } func (m *MailBox) BroadcastChildren(msg protocol.ActorMsg) error { return m.system.BroadcastToChildren(m.id, msg) } func (m *MailBox) destroy() error { m.mu.Lock() defer m.mu.Unlock() if m.closed { return nil } m.highPriorityMessages.Close() m.normalPriorityMessages.Close() m.setReadyStat(NOTREADY) if err := m.actor.Destroy(); err != nil { server.Log.Warnf("Failed to destroy actor :%s, err :%s", m.id, err.Error()) } m.closed = true return nil }