package actors import ( "errors" "fmt" "sparrow/pkg/actor" "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "strings" ) var ruleNodes = map[string]*ruleEngine.RuleNode{ "1": { RuleNodeId: "1", RuleChainId: "11", Type: "MsgTypeFilterNode", Name: "simple node", IsDebug: true, Config: "", }, } type RuleChainActor struct { actor.ContextBasedCreator ruleChain *ruleEngine.RuleChain tenantId string firstId string firstNode *ruleEngine.RuleNodeCtx started bool ruleChainId string parent actor.Ref nodeActors map[string]*ruleEngine.RuleNodeCtx nodeRoutes map[string][]*ruleEngine.RuleNodeRelation ruleChainName string clusterService queue.ClusterService } func newRuleChainActor( sysCtx *actor.SystemContext, ruleChain *ruleEngine.RuleChain, tenantId string, parent actor.Ref, ) *RuleChainActor { item := &RuleChainActor{ ruleChainId: ruleChain.ChainId, ruleChain: ruleChain, tenantId: tenantId, parent: parent, nodeActors: make(map[string]*ruleEngine.RuleNodeCtx), nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation), clusterService: sysCtx.ClusterService, } item.SystemCtx = sysCtx return item } func (r *RuleChainActor) GetActorRef() actor.Ref { return r.Ctx } func (r *RuleChainActor) Init(ctx actor.Ctx) error { if r.ruleChain != nil { r.ruleChainName = r.ruleChain.Name } r.Ctx = ctx return nil } func (r *RuleChainActor) Process(msg protocol.ActorMsg) error { switch msg.GetMessageType() { case protocol.QUEUE_TO_RULE_ENGINE_MSG: return r.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg)) } return nil } func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error { actorMsg := msg.Message server.Log.Debugf("Processing message") if len(msg.RelationTypes) == 0 { ruleNodeId := actorMsg.RuleNodeId var targetCtx *ruleEngine.RuleNodeCtx if ruleNodeId == "" { targetCtx = r.firstNode } else { targetCtx = r.nodeActors[ruleNodeId] } if targetCtx != nil { server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, ruleNodeId) r.pushMsgToNode(targetCtx, actorMsg, "") } else { server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId) actorMsg.GetCallBack().OnSuccess() } } else { r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error()) } return nil } // on tell next actor func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string, relationTypes []string, errMsg string) { originatorId := msg.Originator tpi := queue.ResolvePartition(queue.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId) var relations []*ruleEngine.RuleNodeRelation if rs, ok := r.nodeRoutes[originatorNodeId]; ok { for _, item := range rs { if contains(relationTypes, item.Type) { relations = append(relations, item) } } } if len(relations) == 0 { server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId) if contains(relationTypes, string(protocol.Failure)) { if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok { msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+ "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId))) } else { msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node")) } } else { msg.GetCallBack().OnSuccess() } } else if len(relations) == 1 { for _, rl := range relations { server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out) r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type) } } else { for _, rl := range relations { target := rl.Out r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target) } } } // push a message to target ctx func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) { if tpi.MyPartition { switch entityId.GetEntityType() { case entities.RULE_NODE: targetCtx := r.nodeActors[entityId.GetId()] r.pushMsgToNode(targetCtx, msg, fromRelationType) case entities.RULE_CHAIN: r.parent.Tell(&actor.RuleChainToRuleChainMsg{ TargetId: entityId.GetId(), SourceId: r.ruleChainId, Message: msg, FromRelationType: fromRelationType, }) } } else { r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId) } } // 把消息放到队列中 func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) { switch targetEntity.GetEntityType() { case entities.RULE_NODE: r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback) case entities.RULE_CHAIN: r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback) } } func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) { msgBytes, err := msg.Encode() if err != nil { server.Log.Error(err) } r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback) } func contains(relations []string, relation string) bool { if len(relations) == 0 { return true } for _, item := range relations { if strings.ToLower(item) == strings.ToLower(relation) { return true } } return false } // push a message to node actor func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) { if targetCtx != nil { targetCtx.SelfActor.Tell(&actor.RuleChainToRuleNodeMsg{ Message: msg, Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx), FromRelationType: relationType, }) } else { server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName) } } func (r *RuleChainActor) Destroy() error { return nil } func (r *RuleChainActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy { if err != nil { return actor.Stop() } else { return actor.Resume() } } // RuleChainCreator type RuleChainCreator struct { RuleChainActor } //NewRuleChainCreator create a instance func NewRuleChainCreator( sysCtx *actor.SystemContext, tenantId string, ruleChan *ruleEngine.RuleChain, parent actor.Ref, ) *RuleChainCreator { item := &RuleChainCreator{} item.tenantId = tenantId item.ruleChain = ruleChan item.parent = parent item.SystemCtx = sysCtx return item } func (r *RuleChainCreator) CreateActorId() string { return r.ruleChain.ChainId } func (r *RuleChainCreator) CreateActor() actor.Actor { return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent) }