package actors import ( "errors" "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" ) // TenantActor 租户 actor type TenantActor struct { ruleEngine.ContextBasedCreator tenantId string rootChain *ruleEngine.RuleChain rootChainActor ruleEngine.Ref cantFindTenant bool ruleChainService ruleEngine.RuleChainService } func (t *TenantActor) initRuleChains() error { ruleChains, err := t.SystemCtx.RuleChainService.FindRuleChains(t.tenantId) if err != nil { return err } for _, ruleChain := range ruleChains { server.Log.Debugf("Creating rule chain actor:%s", ruleChain.ChainId) actorRef, err := t.getOrCreateActor(ruleChain.ChainId, ruleChain) if err != nil { server.Log.Errorf("Creating rule chain actor:%s err:%s", ruleChain.ChainId, err.Error()) continue } if ruleChain.IsRoot { t.rootChain = ruleChain t.rootChainActor = actorRef } server.Log.Debugf("Rule chain actor created:%s", ruleChain.ChainId) } return nil } func (t *TenantActor) destroyRuleChains() error { ruleChains, err := t.ruleChainService.FindRuleChains(t.tenantId) if err != nil { return err } for _, ruleChain := range ruleChains { _ = t.Ctx.Stop(ruleChain.ChainId) } return nil } func (t *TenantActor) getOrCreateActor(ruleChainId string, ruleChain *ruleEngine.RuleChain) (ruleEngine.Ref, error) { return t.Ctx.GetOrCreateChildActor(ruleChainId, ruleEngine.RULE_DISPATCHER_NAME, NewRuleChainCreator(t.SystemCtx, t.tenantId, ruleChain, t.Ctx.GetParentRef())) } func (t *TenantActor) GetActorRef() ruleEngine.Ref { return t.Ctx } func (t *TenantActor) Init(ctx ruleEngine.Ctx) error { t.Ctx = ctx server.Log.Debugf("Starting tenant actor:%s", t.tenantId) return t.initRuleChains() } func (t *TenantActor) Process(msg protocol.ActorMsg) error { if t.cantFindTenant { server.Log.Debugf("Processing missing Tenant msg") if msg.GetMessageType() == protocol.QUEUE_TO_RULE_ENGINE_MSG { qMsg := msg.(*ruleEngine.QueueToRuleEngineMsg) qMsg.Message.GetCallBack().OnSuccess() } else if msg.GetMessageType() == protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG { tMsg := msg.(*ruleEngine.TransportToDeviceActorMsg) tMsg.Message.GetCallBack().OnSuccess() } return nil } switch msg.GetMessageType() { case protocol.QUEUE_TO_RULE_ENGINE_MSG: return t.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg)) case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG: return t.onRuleChainToRuleChainMsg(msg.(*ruleEngine.RuleChainToRuleChainMsg)) case protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG: //TODO:实现到设备的消息处理 case protocol.COMPONENT_LIFE_CYCLE_MSG: return t.onComponentLifecycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg)) default: return errors.New("未知的消息类型") } return nil } func (t *TenantActor) onComponentLifecycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error { target := t.getEntityActorRef(msg.EntityId) if target != nil { if msg.EntityId.GetEntityType() == entities.RULE_CHAIN { ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, msg.EntityId.GetId()) if err != nil { return err } if ruleChain != nil { if ruleChain.IsRoot { t.rootChain = ruleChain t.rootChainActor = target } } } target.TellWithHighPriority(msg) } else { server.Log.Debugln("Invalid component lifecycle msg") } return nil } func (t *TenantActor) getEntityActorRef(id entities.EntityId) ruleEngine.Ref { if id.GetEntityType() == entities.RULE_CHAIN { ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, id.GetId()) if err != nil { return nil } ref, err := t.getOrCreateActor(id.GetId(), ruleChain) if err != nil { return nil } return ref } return nil } func (t *TenantActor) onRuleChainToRuleChainMsg(msg *ruleEngine.RuleChainToRuleChainMsg) error { ruleChainId := msg.Message.RuleChanId ruleChain, err := t.SystemCtx.RuleChainService.FindRuleChainById(t.tenantId, ruleChainId) if err != nil { return err } ref, err := t.getOrCreateActor(ruleChainId, ruleChain) if err != nil { return err } ref.Tell(msg) return nil } func (t *TenantActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error { actorMsg := msg.Message if actorMsg.RuleChanId == "" { if t.rootChainActor != nil { t.rootChainActor.Tell(msg) } else { actorMsg.GetCallBack().OnFailure(errors.New("no Root Rule Chain available")) server.Log.Errorf("no root chain:%s", t.tenantId) } } else { t.Ctx.TellActor(actorMsg.RuleChanId, msg) } actorMsg.GetCallBack().OnSuccess() return nil } func (t *TenantActor) Destroy() error { return nil } func (t *TenantActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy { if err != nil { return ruleEngine.Stop() } else { return ruleEngine.Resume() } } // TenantActorCreator 租户actor creator type TenantActorCreator struct { ruleEngine.ContextBasedCreator tenantId string } func NewTenantActorCreator(sysCtx *ruleEngine.SystemContext, tenantId string) *TenantActorCreator { t := new(TenantActorCreator) t.SystemCtx = sysCtx t.tenantId = tenantId return t } func (t *TenantActorCreator) CreateActorId() string { return t.tenantId } func (t *TenantActorCreator) CreateActor() ruleEngine.Actor { ins := new(TenantActor) ins.tenantId = t.tenantId ins.SystemCtx = t.SystemCtx ins.ruleChainService = t.SystemCtx.RuleChainService return ins }