123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- package actors
- import (
- "errors"
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- )
- // TenantActor 租户 actor
- type TenantActor struct {
- ruleEngine.ContextBasedCreator
- tenantId string
- rootChain *ruleEngine.RuleChain
- rootChainActor ruleEngine.Ref
- cantFindTenant bool
- ruleChainService ruleEngine.RuleChainService
- }
- func (t *TenantActor) initRuleChains() error {
- ruleChains, err := t.SystemCtx.RuleChainService.FindRuleChains(t.tenantId)
- if err != nil {
- return err
- }
- for _, ruleChain := range ruleChains {
- server.Log.Debugf("Creating rule chain actor:%s", ruleChain.ChainId)
- actorRef, err := t.getOrCreateActor(ruleChain.ChainId, ruleChain)
- if err != nil {
- server.Log.Errorf("Creating rule chain actor:%s err:%s", ruleChain.ChainId, err.Error())
- continue
- }
- if ruleChain.IsRoot {
- t.rootChain = ruleChain
- t.rootChainActor = actorRef
- }
- server.Log.Debugf("Rule chain actor created:%s", ruleChain.ChainId)
- }
- return nil
- }
- func (t *TenantActor) destroyRuleChains() error {
- ruleChains, err := t.ruleChainService.FindRuleChains(t.tenantId)
- if err != nil {
- return err
- }
- for _, ruleChain := range ruleChains {
- _ = t.Ctx.Stop(ruleChain.ChainId)
- }
- return nil
- }
- func (t *TenantActor) getOrCreateActor(ruleChainId string, ruleChain *ruleEngine.RuleChain) (ruleEngine.Ref, error) {
- return t.Ctx.GetOrCreateChildActor(ruleChainId,
- ruleEngine.RULE_DISPATCHER_NAME,
- NewRuleChainCreator(t.SystemCtx, t.tenantId, ruleChain, t.Ctx.GetParentRef()))
- }
- func (t *TenantActor) GetActorRef() ruleEngine.Ref {
- return t.Ctx
- }
- func (t *TenantActor) Init(ctx ruleEngine.Ctx) error {
- t.Ctx = ctx
- server.Log.Debugf("Starting tenant actor:%s", t.tenantId)
- return t.initRuleChains()
- }
- func (t *TenantActor) Process(msg protocol.ActorMsg) error {
- if t.cantFindTenant {
- server.Log.Debugf("Processing missing Tenant msg")
- if msg.GetMessageType() == protocol.QUEUE_TO_RULE_ENGINE_MSG {
- qMsg := msg.(*ruleEngine.QueueToRuleEngineMsg)
- qMsg.Message.GetCallBack().OnSuccess()
- } else if msg.GetMessageType() == protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG {
- tMsg := msg.(*ruleEngine.TransportToDeviceActorMsg)
- tMsg.Message.GetCallBack().OnSuccess()
- }
- return nil
- }
- switch msg.GetMessageType() {
- case protocol.QUEUE_TO_RULE_ENGINE_MSG:
- return t.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
- case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
- return t.onRuleChainToRuleChainMsg(msg.(*ruleEngine.RuleChainToRuleChainMsg))
- case protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG:
- //TODO:实现到设备的消息处理
- case protocol.COMPONENT_LIFE_CYCLE_MSG:
- return t.onComponentLifecycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
- default:
- return errors.New("未知的消息类型")
- }
- return nil
- }
- func (t *TenantActor) onComponentLifecycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
- target := t.getEntityActorRef(msg.EntityId)
- if target != nil {
- if msg.EntityId.GetEntityType() == entities.RULE_CHAIN {
- ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, msg.EntityId.GetId())
- if err != nil {
- return err
- }
- if ruleChain != nil {
- if ruleChain.IsRoot {
- t.rootChain = ruleChain
- t.rootChainActor = target
- }
- }
- }
- target.TellWithHighPriority(msg)
- } else {
- server.Log.Debugln("Invalid component lifecycle msg")
- }
- return nil
- }
- func (t *TenantActor) getEntityActorRef(id entities.EntityId) ruleEngine.Ref {
- if id.GetEntityType() == entities.RULE_CHAIN {
- ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, id.GetId())
- if err != nil {
- return nil
- }
- ref, err := t.getOrCreateActor(id.GetId(), ruleChain)
- if err != nil {
- return nil
- }
- return ref
- }
- return nil
- }
- func (t *TenantActor) onRuleChainToRuleChainMsg(msg *ruleEngine.RuleChainToRuleChainMsg) error {
- ruleChainId := msg.Message.RuleChanId
- ruleChain, err := t.SystemCtx.RuleChainService.FindRuleChainById(t.tenantId, ruleChainId)
- if err != nil {
- return err
- }
- ref, err := t.getOrCreateActor(ruleChainId, ruleChain)
- if err != nil {
- return err
- }
- ref.Tell(msg)
- return nil
- }
- func (t *TenantActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
- actorMsg := msg.Message
- if actorMsg.RuleChanId == "" {
- if t.rootChainActor != nil {
- t.rootChainActor.Tell(msg)
- } else {
- actorMsg.GetCallBack().OnFailure(errors.New("no Root Rule Chain available"))
- server.Log.Errorf("no root chain:%s", t.tenantId)
- }
- } else {
- t.Ctx.TellActor(actorMsg.RuleChanId, msg)
- }
- actorMsg.GetCallBack().OnSuccess()
- return nil
- }
- func (t *TenantActor) Destroy() error {
- return nil
- }
- func (t *TenantActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
- if err != nil {
- return ruleEngine.Stop()
- } else {
- return ruleEngine.Resume()
- }
- }
- // TenantActorCreator 租户actor creator
- type TenantActorCreator struct {
- ruleEngine.ContextBasedCreator
- tenantId string
- }
- func NewTenantActorCreator(sysCtx *ruleEngine.SystemContext, tenantId string) *TenantActorCreator {
- t := new(TenantActorCreator)
- t.SystemCtx = sysCtx
- t.tenantId = tenantId
- return t
- }
- func (t *TenantActorCreator) CreateActorId() string {
- return t.tenantId
- }
- func (t *TenantActorCreator) CreateActor() ruleEngine.Actor {
- ins := new(TenantActor)
- ins.tenantId = t.tenantId
- ins.SystemCtx = t.SystemCtx
- return ins
- }
|