Browse Source

规则引擎测试

lijian 4 years ago
parent
commit
1f01f4a08c

+ 4 - 2
pkg/actors/rule_chain_actor.go

@@ -152,7 +152,7 @@ func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId stri
 
 // push a message to target ctx
 func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
-	if tpi.MyPartition {
+	if !tpi.MyPartition {
 		switch entityId.GetEntityType() {
 		case entities.RULE_NODE:
 			targetCtx := r.nodeActors[entityId.GetId()]
@@ -185,6 +185,7 @@ func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *p
 	if err != nil {
 		server.Log.Error(err)
 	}
+	fmt.Printf("%v", tpi)
 	r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
 }
 
@@ -276,6 +277,7 @@ func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ru
 		}
 		r.nodeRoutes[node.RuleNodeId] = rs
 	}
+	fmt.Printf("%+v", r.nodeRoutes)
 	r.firstId = ruleChain.FirstNodeId
 	r.firstNode = r.nodeActors[r.firstId]
 	r.state = ruleEngine.ACTIVE
@@ -289,7 +291,7 @@ func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error)
 	return r.Ctx.GetOrCreateChildActor(nodeId,
 		ruleEngine.RULE_DISPATCHER_NAME,
 		NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
-			r.ruleChainId, r.ruleChainName, nodeId, r.Ctx))
+			r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
 }
 
 // RuleChainCreator

+ 13 - 7
pkg/actors/rule_node_actor.go

@@ -36,10 +36,10 @@ func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
 	}
 	r.ruleNode = result
 	r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
-		TenantId:   "",
-		ChainActor: nil,
-		SelfActor:  nil,
-		Self:       nil,
+		TenantId:   r.tenantId,
+		ChainActor: r.parent,
+		SelfActor:  r.self,
+		Self:       r.ruleNode,
 	}, r.SystemCtx)
 	r.info = &protocol.RuleNodeInfo{
 		RuleNodeId:    r.ruleNodeId,
@@ -90,10 +90,16 @@ func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg)
 func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
 	server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
 	actorMsg := msg.Message
-	if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
-		r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
-		return err
+	ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
+	if ruleNodeCount < 20 {
+		if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
+			r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
+			return err
+		}
+	} else {
+		actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
 	}
+
 	return nil
 }
 

+ 11 - 8
pkg/ruleEngine/rule_chain_service.go

@@ -17,14 +17,17 @@ type TestRuleChainService struct {
 }
 
 func (t *TestRuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]*Relation, error) {
-	return []*Relation{
-		{
-			From:              "node1",
-			To:                "node2",
-			Type:              "True",
-			RelationTypeGroup: COMMON,
-		},
-	}, nil
+	if nodeId == "node1" {
+		return []*Relation{
+			{
+				From:              "node1",
+				To:                "node2",
+				Type:              "True",
+				RelationTypeGroup: COMMON,
+			},
+		}, nil
+	}
+	return nil, nil
 }
 
 func (t *TestRuleChainService) FindRuleChainById(tenantId, ruleChainId string) (*RuleChain, error) {

+ 1 - 1
services/controller/controller_test.go

@@ -19,7 +19,7 @@ func TestInit(t *testing.T) {
 		TenantId: "tenant_1",
 		Message: &protocol.Message{
 			QueueName:  "",
-			Id:         "",
+			Id:         "123",
 			Ts:         nil,
 			Type:       "POST_ATTRIBUTES_REQUEST",
 			Data:       "",