123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package actors
- import (
- "github.com/gogf/gf/util/guid"
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- )
- // AppActor 服务级actor
- type AppActor struct {
- ruleEngine.ContextBasedCreator
- deleteTenants map[string]string
- }
- func (a *AppActor) GetActorRef() ruleEngine.Ref {
- return a.Ctx
- }
- func (a *AppActor) Init(ctx ruleEngine.Ctx) error {
- a.Ctx = ctx
- return nil
- }
- func (a *AppActor) Process(msg protocol.ActorMsg) error {
- switch msg.GetMessageType() {
- case protocol.APP_INIT_MSG:
- if err := a.initTenants(); err != nil {
- server.Log.Error(err)
- }
- case protocol.QUEUE_TO_RULE_ENGINE_MSG:
- return a.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
- case protocol.COMPONENT_LIFE_CYCLE_MSG:
- return a.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
- default:
- server.Log.Debugf("未知的消息类型:%s", msg.GetMessageType())
- }
- return nil
- }
- // 消息队列向规则引擎的消息处理
- func (a *AppActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
- if ref, err := a.getOrCreateTenantActor(msg.TenantId); err == nil {
- ref.Tell(msg)
- } else {
- return err
- }
- msg.Message.GetCallBack().OnSuccess()
- return nil
- }
- // 组件生命周期事件处理
- func (a *AppActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
- var target ruleEngine.Ref
- var err error
- if msg.EntityId.GetEntityType() == entities.TENANT {
- tenantId := msg.EntityId.GetId()
- if msg.EventType == ruleEngine.DELETED {
- a.deleteTenants[tenantId] = tenantId
- return a.Ctx.Stop(tenantId)
- } else {
- target, err = a.getOrCreateTenantActor(msg.TenantId)
- if err != nil {
- return err
- }
- }
- } else {
- target, err = a.getOrCreateTenantActor(msg.TenantId)
- if err != nil {
- return err
- }
- }
- if target != nil {
- target.TellWithHighPriority(msg)
- } else {
- server.Log.Debugf("invalid component lifecycle msg %s:", msg.TenantId)
- }
- return nil
- }
- func (a *AppActor) initTenants() error {
- server.Log.Debug("starting main actor")
- tenants, err := a.SystemCtx.TenantService.FindTenants()
- if err != nil {
- return err
- }
- for _, t := range tenants {
- server.Log.Debugf("creating tenant actor :%s,%s", t.Id, t.Name)
- _, err := a.getOrCreateTenantActor(t.Id)
- if err != nil {
- server.Log.Error(err)
- continue
- }
- server.Log.Debugf("tenant actor :%s,%s created", t.Id, t.Name)
- }
- server.Log.Debugln("main actor started")
- return nil
- }
- func (a *AppActor) getOrCreateTenantActor(tId string) (ruleEngine.Ref, error) {
- return a.Ctx.GetOrCreateChildActor(tId, ruleEngine.TENANT_DISPATCHER_NAME,
- NewTenantActorCreator(a.SystemCtx, tId))
- }
- func (a *AppActor) Destroy() error {
- return nil
- }
- func (a *AppActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
- if err != nil {
- return ruleEngine.Stop()
- } else {
- return ruleEngine.Resume()
- }
- }
- // AppActorCreator app actor creator implements creator interface
- type AppActorCreator struct {
- ruleEngine.ContextBasedCreator
- }
- func NewAppActorCreator(systemCtx *ruleEngine.SystemContext) *AppActorCreator {
- ins := new(AppActorCreator)
- ins.SystemCtx = systemCtx
- return ins
- }
- func (a *AppActorCreator) CreateActorId() string {
- return guid.S()
- }
- func (a *AppActorCreator) CreateActor() ruleEngine.Actor {
- appC := &AppActor{
- deleteTenants: make(map[string]string),
- }
- appC.SystemCtx = a.SystemCtx
- return appC
- }
|