|
@@ -0,0 +1,247 @@
|
|
|
|
+package actors
|
|
|
|
+
|
|
|
|
+import (
|
|
|
|
+ "errors"
|
|
|
|
+ "fmt"
|
|
|
|
+ "sparrow/pkg/actor"
|
|
|
|
+ "sparrow/pkg/entities"
|
|
|
|
+ "sparrow/pkg/protocol"
|
|
|
|
+ "sparrow/pkg/queue"
|
|
|
|
+ "sparrow/pkg/ruleEngine"
|
|
|
|
+ "sparrow/pkg/server"
|
|
|
|
+ "strings"
|
|
|
|
+)
|
|
|
|
+
|
|
|
|
+var ruleNodes = map[string]*ruleEngine.RuleNode{
|
|
|
|
+ "1": {
|
|
|
|
+ RuleNodeId: "1",
|
|
|
|
+ RuleChainId: "11",
|
|
|
|
+ Type: "MsgTypeFilterNode",
|
|
|
|
+ Name: "simple node",
|
|
|
|
+ IsDebug: true,
|
|
|
|
+ Config: "",
|
|
|
|
+ },
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+type RuleChainActor struct {
|
|
|
|
+ actor.ContextBasedCreator
|
|
|
|
+ ruleChain *ruleEngine.RuleChain
|
|
|
|
+ tenantId string
|
|
|
|
+ firstId string
|
|
|
|
+ firstNode *ruleEngine.RuleNodeCtx
|
|
|
|
+ started bool
|
|
|
|
+ ruleChainId string
|
|
|
|
+ parent actor.Ref
|
|
|
|
+ nodeActors map[string]*ruleEngine.RuleNodeCtx
|
|
|
|
+ nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
|
|
|
|
+
|
|
|
|
+ ruleChainName string
|
|
|
|
+
|
|
|
|
+ clusterService queue.ClusterService
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func newRuleChainActor(
|
|
|
|
+ sysCtx *actor.SystemContext,
|
|
|
|
+ ruleChain *ruleEngine.RuleChain,
|
|
|
|
+ tenantId string,
|
|
|
|
+ parent actor.Ref,
|
|
|
|
+) *RuleChainActor {
|
|
|
|
+ item := &RuleChainActor{
|
|
|
|
+ ruleChainId: ruleChain.ChainId,
|
|
|
|
+ ruleChain: ruleChain,
|
|
|
|
+ tenantId: tenantId,
|
|
|
|
+ parent: parent,
|
|
|
|
+ nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
|
|
|
|
+ nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
|
|
|
|
+ clusterService: sysCtx.ClusterService,
|
|
|
|
+ }
|
|
|
|
+ item.SystemCtx = sysCtx
|
|
|
|
+ return item
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) GetActorRef() actor.Ref {
|
|
|
|
+ return r.Ctx
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) Init(ctx actor.Ctx) error {
|
|
|
|
+ if r.ruleChain != nil {
|
|
|
|
+ r.ruleChainName = r.ruleChain.Name
|
|
|
|
+ }
|
|
|
|
+ r.Ctx = ctx
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
|
|
|
|
+ switch msg.GetMessageType() {
|
|
|
|
+ case protocol.QUEUE_TO_RULE_ENGINE_MSG:
|
|
|
|
+ return r.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg))
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error {
|
|
|
|
+ actorMsg := msg.Message
|
|
|
|
+ server.Log.Debugf("Processing message")
|
|
|
|
+ if len(msg.RelationTypes) == 0 {
|
|
|
|
+ ruleNodeId := actorMsg.RuleNodeId
|
|
|
|
+ var targetCtx *ruleEngine.RuleNodeCtx
|
|
|
|
+ if ruleNodeId == "" {
|
|
|
|
+ targetCtx = r.firstNode
|
|
|
|
+ } else {
|
|
|
|
+ targetCtx = r.nodeActors[ruleNodeId]
|
|
|
|
+ }
|
|
|
|
+ if targetCtx != nil {
|
|
|
|
+ server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, ruleNodeId)
|
|
|
|
+ r.pushMsgToNode(targetCtx, actorMsg, "")
|
|
|
|
+ } else {
|
|
|
|
+ server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
|
|
|
|
+ actorMsg.GetCallBack().OnSuccess()
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
|
|
|
|
+ }
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// on tell next actor
|
|
|
|
+func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
|
|
|
|
+ relationTypes []string, errMsg string) {
|
|
|
|
+ originatorId := msg.Originator
|
|
|
|
+ tpi := queue.ResolvePartition(queue.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
|
|
|
|
+ var relations []*ruleEngine.RuleNodeRelation
|
|
|
|
+ if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
|
|
|
|
+ for _, item := range rs {
|
|
|
|
+ if contains(relationTypes, item.Type) {
|
|
|
|
+ relations = append(relations, item)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if len(relations) == 0 {
|
|
|
|
+ server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
|
|
|
|
+ if contains(relationTypes, string(protocol.Failure)) {
|
|
|
|
+ if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
|
|
|
|
+ msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
|
|
|
|
+ "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
|
|
|
|
+ } else {
|
|
|
|
+ msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ msg.GetCallBack().OnSuccess()
|
|
|
|
+ }
|
|
|
|
+ } else if len(relations) == 1 {
|
|
|
|
+ for _, rl := range relations {
|
|
|
|
+ server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
|
|
|
|
+ r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+
|
|
|
|
+ for _, rl := range relations {
|
|
|
|
+ target := rl.Out
|
|
|
|
+ r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// push a message to target ctx
|
|
|
|
+func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
|
|
|
|
+ if tpi.MyPartition {
|
|
|
|
+ switch entityId.GetEntityType() {
|
|
|
|
+ case entities.RULE_NODE:
|
|
|
|
+ targetCtx := r.nodeActors[entityId.GetId()]
|
|
|
|
+ r.pushMsgToNode(targetCtx, msg, fromRelationType)
|
|
|
|
+ case entities.RULE_CHAIN:
|
|
|
|
+ r.parent.Tell(&actor.RuleChainToRuleChainMsg{
|
|
|
|
+ TargetId: entityId.GetId(),
|
|
|
|
+ SourceId: r.ruleChainId,
|
|
|
|
+ Message: msg,
|
|
|
|
+ FromRelationType: fromRelationType,
|
|
|
|
+ })
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// 把消息放到队列中
|
|
|
|
+func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
|
|
|
|
+ switch targetEntity.GetEntityType() {
|
|
|
|
+ case entities.RULE_NODE:
|
|
|
|
+ r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
|
|
|
|
+ case entities.RULE_CHAIN:
|
|
|
|
+ r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
|
|
|
|
+ msgBytes, err := msg.Encode()
|
|
|
|
+ if err != nil {
|
|
|
|
+ server.Log.Error(err)
|
|
|
|
+ }
|
|
|
|
+ r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func contains(relations []string, relation string) bool {
|
|
|
|
+ if len(relations) == 0 {
|
|
|
|
+ return true
|
|
|
|
+ }
|
|
|
|
+ for _, item := range relations {
|
|
|
|
+ if strings.ToLower(item) == strings.ToLower(relation) {
|
|
|
|
+ return true
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return false
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// push a message to node actor
|
|
|
|
+func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
|
|
|
|
+ if targetCtx != nil {
|
|
|
|
+ targetCtx.SelfActor.Tell(&actor.RuleChainToRuleNodeMsg{
|
|
|
|
+ Message: msg,
|
|
|
|
+ Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
|
|
|
|
+ FromRelationType: relationType,
|
|
|
|
+ })
|
|
|
|
+ } else {
|
|
|
|
+ server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) Destroy() error {
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy {
|
|
|
|
+ if err != nil {
|
|
|
|
+ return actor.Stop()
|
|
|
|
+ } else {
|
|
|
|
+ return actor.Resume()
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// RuleChainCreator
|
|
|
|
+type RuleChainCreator struct {
|
|
|
|
+ RuleChainActor
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+//NewRuleChainCreator create a instance
|
|
|
|
+func NewRuleChainCreator(
|
|
|
|
+ sysCtx *actor.SystemContext,
|
|
|
|
+ tenantId string,
|
|
|
|
+ ruleChan *ruleEngine.RuleChain,
|
|
|
|
+ parent actor.Ref,
|
|
|
|
+) *RuleChainCreator {
|
|
|
|
+ item := &RuleChainCreator{}
|
|
|
|
+ item.tenantId = tenantId
|
|
|
|
+ item.ruleChain = ruleChan
|
|
|
|
+ item.parent = parent
|
|
|
|
+ item.SystemCtx = sysCtx
|
|
|
|
+ return item
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainCreator) CreateActorId() string {
|
|
|
|
+ return r.ruleChain.ChainId
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func (r *RuleChainCreator) CreateActor() actor.Actor {
|
|
|
|
+ return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
|
|
|
|
+}
|