package actors import ( "github.com/gogf/gf/util/guid" "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" ) // AppActor 服务级actor type AppActor struct { ruleEngine.ContextBasedCreator deleteTenants map[string]string } func (a *AppActor) GetActorRef() ruleEngine.Ref { return a.Ctx } func (a *AppActor) Init(ctx ruleEngine.Ctx) error { a.Ctx = ctx return nil } func (a *AppActor) Process(msg protocol.ActorMsg) error { switch msg.GetMessageType() { case protocol.APP_INIT_MSG: if err := a.initTenants(); err != nil { server.Log.Error(err) } case protocol.QUEUE_TO_RULE_ENGINE_MSG: return a.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg)) case protocol.COMPONENT_LIFE_CYCLE_MSG: return a.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg)) default: server.Log.Debugf("未知的消息类型:%s", msg.GetMessageType()) } return nil } // 消息队列向规则引擎的消息处理 func (a *AppActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error { if ref, err := a.getOrCreateTenantActor(msg.TenantId); err == nil { ref.Tell(msg) } else { return err } msg.Message.GetCallBack().OnSuccess() return nil } // 组件生命周期事件处理 func (a *AppActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error { var target ruleEngine.Ref var err error if msg.EntityId.GetEntityType() == entities.TENANT { tenantId := msg.EntityId.GetId() if msg.EventType == ruleEngine.DELETED { a.deleteTenants[tenantId] = tenantId return a.Ctx.Stop(tenantId) } else { target, err = a.getOrCreateTenantActor(msg.TenantId) if err != nil { return err } } } else { target, err = a.getOrCreateTenantActor(msg.TenantId) if err != nil { return err } } if target != nil { target.TellWithHighPriority(msg) } else { server.Log.Debugf("invalid component lifecycle msg %s:", msg.TenantId) } return nil } func (a *AppActor) initTenants() error { server.Log.Debug("starting main actor") tenants, err := a.SystemCtx.TenantService.FindTenants() if err != nil { return err } for _, t := range tenants { server.Log.Debugf("creating tenant actor :%s,%s", t.Id, t.Name) _, err := a.getOrCreateTenantActor(t.Id) if err != nil { server.Log.Error(err) continue } server.Log.Debugf("tenant actor :%s,%s created", t.Id, t.Name) } server.Log.Debugln("main actor started") return nil } func (a *AppActor) getOrCreateTenantActor(tId string) (ruleEngine.Ref, error) { return a.Ctx.GetOrCreateChildActor(tId, ruleEngine.TENANT_DISPATCHER_NAME, NewTenantActorCreator(a.SystemCtx, tId)) } func (a *AppActor) Destroy() error { return nil } func (a *AppActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy { if err != nil { return ruleEngine.Stop() } else { return ruleEngine.Resume() } } // AppActorCreator app actor creator implements creator interface type AppActorCreator struct { ruleEngine.ContextBasedCreator } func NewAppActorCreator(systemCtx *ruleEngine.SystemContext) *AppActorCreator { ins := new(AppActorCreator) ins.SystemCtx = systemCtx return ins } func (a *AppActorCreator) CreateActorId() string { return guid.S() } func (a *AppActorCreator) CreateActor() ruleEngine.Actor { appC := &AppActor{ deleteTenants: make(map[string]string), } appC.SystemCtx = a.SystemCtx return appC }