package actors import ( "errors" "fmt" "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "strings" ) type RuleChainActor struct { ruleEngine.ContextBasedCreator ruleChain *ruleEngine.RuleChain tenantId string firstId string firstNode *ruleEngine.RuleNodeCtx started bool ruleChainId string parent ruleEngine.Ref nodeActors map[string]*ruleEngine.RuleNodeCtx nodeRoutes map[string][]*ruleEngine.RuleNodeRelation ruleChainName string clusterService ruleEngine.ClusterService state ruleEngine.ComponentLifecycleState } func newRuleChainActor( sysCtx *ruleEngine.SystemContext, ruleChain *ruleEngine.RuleChain, tenantId string, parent ruleEngine.Ref, ) *RuleChainActor { item := &RuleChainActor{ ruleChainId: ruleChain.ChainId, ruleChain: ruleChain, tenantId: tenantId, parent: parent, nodeActors: make(map[string]*ruleEngine.RuleNodeCtx), nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation), clusterService: sysCtx.ClusterService, } item.SystemCtx = sysCtx return item } func (r *RuleChainActor) GetActorRef() ruleEngine.Ref { return r.Ctx } func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error { if r.ruleChain != nil { r.ruleChainName = r.ruleChain.Name } r.Ctx = ctx return r.start() } func (r *RuleChainActor) Process(msg protocol.ActorMsg) error { switch msg.GetMessageType() { case protocol.QUEUE_TO_RULE_ENGINE_MSG: return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg)) case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG: return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg)) case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG: return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg)) case protocol.COMPONENT_LIFE_CYCLE_MSG: return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg)) } return nil } // TODO:生命周期事件写入数据库 func (r *RuleChainActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error { server.Log.Debugf("%s,%s,%s onComponentLifecycleMsg", msg.TenantId, msg.EntityId.GetEntityType(), msg.EntityId.GetId()) switch msg.EventType { case ruleEngine.CREATED: return r.start() case ruleEngine.UPDATED: return r.update() case ruleEngine.ACTIVATED: return r.restart() case ruleEngine.DELETED: _ = r.stop() return r.Ctx.Stop(r.ruleChainId) default: } return nil } func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error { if r.firstNode != nil { r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType) } else { msg.Message.GetCallBack().OnSuccess() } return nil } func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error { actorMsg := msg.Message server.Log.Debugf("Processing message") if len(msg.RelationTypes) == 0 { ruleNodeId := actorMsg.RuleNodeId var targetCtx *ruleEngine.RuleNodeCtx if ruleNodeId == "" { targetCtx = r.firstNode actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId) } else { targetCtx = r.nodeActors[ruleNodeId] } if targetCtx != nil { server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId) r.pushMsgToNode(targetCtx, actorMsg, "") } else { server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId) actorMsg.GetCallBack().OnSuccess() } } else { r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error()) } return nil } func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error { var errStr string if msg.FailureMessage != nil { errStr = msg.FailureMessage.Error() } r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr) return nil } // on tell next actor func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string, relationTypes []string, errMsg string) { originatorId := msg.Originator tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId) var relations []*ruleEngine.RuleNodeRelation if rs, ok := r.nodeRoutes[originatorNodeId]; ok { for _, item := range rs { if contains(relationTypes, item.Type) { relations = append(relations, item) } } } if len(relations) == 0 { server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId) if contains(relationTypes, "Failure") { if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok { msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+ "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId))) } else { msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node")) } } else { msg.GetCallBack().OnSuccess() } } else if len(relations) == 1 { for _, rl := range relations { server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out) r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type) } } else { for _, rl := range relations { target := rl.Out r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target) } } } // push a message to target ctx func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) { if tpi.MyPartition { switch entityId.GetEntityType() { case entities.RULE_NODE: targetCtx := r.nodeActors[entityId.GetId()] r.pushMsgToNode(targetCtx, msg, fromRelationType) case entities.RULE_CHAIN: r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{ TargetId: entityId.GetId(), SourceId: r.ruleChainId, Message: msg, FromRelationType: fromRelationType, }) } } else { r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId) } } // 把消息放到队列中 func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) { switch targetEntity.GetEntityType() { case entities.RULE_NODE: r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback) case entities.RULE_CHAIN: r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback) } } func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) { r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback) } func contains(relations []string, relation string) bool { if len(relations) == 0 { return false } for _, item := range relations { if strings.ToLower(item) == strings.ToLower(relation) { return true } } return false } // push a message to node actor func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) { if targetCtx != nil { targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{ Message: msg, Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx), FromRelationType: relationType, }) } else { server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName) } } func (r *RuleChainActor) Destroy() error { return nil } func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy { if err != nil { return ruleEngine.Stop() } else { return ruleEngine.Resume() } } func (r *RuleChainActor) start() error { if !r.started { ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId) if err != nil { return err } if ruleChain != nil { nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId) if err != nil { return err } server.Log.Debugf("starting rule chain with %d nodes", len(nodes)) for _, node := range nodes { server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name) ref, err := r.createNodeActor(node.RuleNodeId) if err != nil { continue } r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{ TenantId: r.tenantId, ChainActor: r.Ctx, SelfActor: ref, Self: node, } r.started = true } return r.initRoutes(r.ruleChain, nodes) } } else { return r.update() } return nil } func (r *RuleChainActor) update() error { ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId) if err != nil { return err } if ruleChain == nil { return errors.New(fmt.Sprintf("rule chain not found :%s", r.ruleChainId)) } // 查询链上的所有节点 nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, r.ruleChainId) if err != nil { return err } for _, v := range nodes { // 如果找不到节点actor,则创建 if actor, ok := r.nodeActors[v.RuleNodeId]; !ok { server.Log.Debugf("creating rule node actor:%s", v.RuleNodeId) ref, err := r.createNodeActor(v.RuleNodeId) if err != nil { return err } r.nodeActors[v.RuleNodeId] = &ruleEngine.RuleNodeCtx{ TenantId: r.tenantId, ChainActor: r.Ctx, SelfActor: ref, Self: v, } } else { // 传递消息到node actor server.Log.Debugf("updating rule node actor:%s", v.RuleNodeId) actor.Self = v actor.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{ TenantId: r.tenantId, EntityId: &entities.RuleNodeId{Id: v.RuleNodeId}, EventType: ruleEngine.UPDATED, }) } } var removeNodes []string // 对比已经有节点和最新节点列表,找出差集,并移除 for k := range r.nodeActors { var found = false for _, v := range nodes { if v.RuleNodeId == k { found = true break } } if !found { removeNodes = append(removeNodes, k) } } // remove actors for _, v := range removeNodes { server.Log.Debugf("remove rule node :%s", v) if ref, ok := r.nodeActors[v]; ok { ref.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{ TenantId: r.tenantId, EntityId: &entities.RuleNodeId{Id: v}, EventType: ruleEngine.DELETED, }) delete(r.nodeActors, v) } } return r.initRoutes(r.ruleChain, nodes) } func (r *RuleChainActor) restart() error { if err := r.stop(); err != nil { return err } if err := r.start(); err != nil { return err } return nil } func (r *RuleChainActor) stop() error { server.Log.Debugf("stopping rule chain with %d nodes, tenantId:%s, entityId:%s", len(r.nodeActors), r.tenantId, r.ruleChainId) for actorId := range r.nodeActors { err := r.Ctx.Stop(actorId) if err != nil { return err } } r.started = false return nil } func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) error { for _, node := range nodes { relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId) if err != nil { return err } var rs []*ruleEngine.RuleNodeRelation for _, relation := range relations { rs = append(rs, &ruleEngine.RuleNodeRelation{ In: &entities.RuleNodeId{Id: node.RuleNodeId}, Out: &entities.RuleNodeId{Id: relation.To}, Type: relation.Type, }) } r.nodeRoutes[node.RuleNodeId] = rs } r.firstId = ruleChain.FirstNodeId r.firstNode = r.nodeActors[r.firstId] r.state = ruleEngine.ACTIVE return nil } func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) { return r.Ctx.GetOrCreateChildActor(nodeId, ruleEngine.RULE_DISPATCHER_NAME, NewRuleNodeActorCreator(r.SystemCtx, r.tenantId, r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef())) } // RuleChainCreator type RuleChainCreator struct { RuleChainActor } //NewRuleChainCreator create a instance func NewRuleChainCreator( sysCtx *ruleEngine.SystemContext, tenantId string, ruleChan *ruleEngine.RuleChain, parent ruleEngine.Ref, ) *RuleChainCreator { item := &RuleChainCreator{} item.tenantId = tenantId item.ruleChain = ruleChan item.parent = parent item.SystemCtx = sysCtx return item } func (r *RuleChainCreator) CreateActorId() string { return r.ruleChain.ChainId } func (r *RuleChainCreator) CreateActor() ruleEngine.Actor { return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent) }