123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- package actors
- import (
- "errors"
- "fmt"
- "sparrow/pkg/actor"
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- "strings"
- )
- var ruleNodes = map[string]*ruleEngine.RuleNode{
- "1": {
- RuleNodeId: "1",
- RuleChainId: "11",
- Type: "MsgTypeFilterNode",
- Name: "simple node",
- IsDebug: true,
- Config: "",
- },
- }
- type RuleChainActor struct {
- actor.ContextBasedCreator
- ruleChain *ruleEngine.RuleChain
- tenantId string
- firstId string
- firstNode *ruleEngine.RuleNodeCtx
- started bool
- ruleChainId string
- parent actor.Ref
- nodeActors map[string]*ruleEngine.RuleNodeCtx
- nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
- ruleChainName string
- clusterService queue.ClusterService
- }
- func newRuleChainActor(
- sysCtx *actor.SystemContext,
- ruleChain *ruleEngine.RuleChain,
- tenantId string,
- parent actor.Ref,
- ) *RuleChainActor {
- item := &RuleChainActor{
- ruleChainId: ruleChain.ChainId,
- ruleChain: ruleChain,
- tenantId: tenantId,
- parent: parent,
- nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
- nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
- clusterService: sysCtx.ClusterService,
- }
- item.SystemCtx = sysCtx
- return item
- }
- func (r *RuleChainActor) GetActorRef() actor.Ref {
- return r.Ctx
- }
- func (r *RuleChainActor) Init(ctx actor.Ctx) error {
- if r.ruleChain != nil {
- r.ruleChainName = r.ruleChain.Name
- }
- r.Ctx = ctx
- return nil
- }
- func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
- switch msg.GetMessageType() {
- case protocol.QUEUE_TO_RULE_ENGINE_MSG:
- return r.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg))
- }
- return nil
- }
- func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error {
- actorMsg := msg.Message
- server.Log.Debugf("Processing message")
- if len(msg.RelationTypes) == 0 {
- ruleNodeId := actorMsg.RuleNodeId
- var targetCtx *ruleEngine.RuleNodeCtx
- if ruleNodeId == "" {
- targetCtx = r.firstNode
- } else {
- targetCtx = r.nodeActors[ruleNodeId]
- }
- if targetCtx != nil {
- server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, ruleNodeId)
- r.pushMsgToNode(targetCtx, actorMsg, "")
- } else {
- server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
- actorMsg.GetCallBack().OnSuccess()
- }
- } else {
- r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
- }
- return nil
- }
- // on tell next actor
- func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
- relationTypes []string, errMsg string) {
- originatorId := msg.Originator
- tpi := queue.ResolvePartition(queue.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
- var relations []*ruleEngine.RuleNodeRelation
- if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
- for _, item := range rs {
- if contains(relationTypes, item.Type) {
- relations = append(relations, item)
- }
- }
- }
- if len(relations) == 0 {
- server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
- if contains(relationTypes, string(protocol.Failure)) {
- if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
- msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
- "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
- } else {
- msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
- }
- } else {
- msg.GetCallBack().OnSuccess()
- }
- } else if len(relations) == 1 {
- for _, rl := range relations {
- server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
- r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
- }
- } else {
- for _, rl := range relations {
- target := rl.Out
- r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
- }
- }
- }
- // push a message to target ctx
- func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
- if tpi.MyPartition {
- switch entityId.GetEntityType() {
- case entities.RULE_NODE:
- targetCtx := r.nodeActors[entityId.GetId()]
- r.pushMsgToNode(targetCtx, msg, fromRelationType)
- case entities.RULE_CHAIN:
- r.parent.Tell(&actor.RuleChainToRuleChainMsg{
- TargetId: entityId.GetId(),
- SourceId: r.ruleChainId,
- Message: msg,
- FromRelationType: fromRelationType,
- })
- }
- } else {
- r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
- }
- }
- // 把消息放到队列中
- func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
- switch targetEntity.GetEntityType() {
- case entities.RULE_NODE:
- r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
- case entities.RULE_CHAIN:
- r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
- }
- }
- func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
- msgBytes, err := msg.Encode()
- if err != nil {
- server.Log.Error(err)
- }
- r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
- }
- func contains(relations []string, relation string) bool {
- if len(relations) == 0 {
- return true
- }
- for _, item := range relations {
- if strings.ToLower(item) == strings.ToLower(relation) {
- return true
- }
- }
- return false
- }
- // push a message to node actor
- func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
- if targetCtx != nil {
- targetCtx.SelfActor.Tell(&actor.RuleChainToRuleNodeMsg{
- Message: msg,
- Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
- FromRelationType: relationType,
- })
- } else {
- server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
- }
- }
- func (r *RuleChainActor) Destroy() error {
- return nil
- }
- func (r *RuleChainActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy {
- if err != nil {
- return actor.Stop()
- } else {
- return actor.Resume()
- }
- }
- // RuleChainCreator
- type RuleChainCreator struct {
- RuleChainActor
- }
- //NewRuleChainCreator create a instance
- func NewRuleChainCreator(
- sysCtx *actor.SystemContext,
- tenantId string,
- ruleChan *ruleEngine.RuleChain,
- parent actor.Ref,
- ) *RuleChainCreator {
- item := &RuleChainCreator{}
- item.tenantId = tenantId
- item.ruleChain = ruleChan
- item.parent = parent
- item.SystemCtx = sysCtx
- return item
- }
- func (r *RuleChainCreator) CreateActorId() string {
- return r.ruleChain.ChainId
- }
- func (r *RuleChainCreator) CreateActor() actor.Actor {
- return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
- }
|