|
@@ -3,7 +3,6 @@ package actors
|
|
|
import (
|
|
|
"errors"
|
|
|
"fmt"
|
|
|
- "sparrow/pkg/actor"
|
|
|
"sparrow/pkg/entities"
|
|
|
"sparrow/pkg/protocol"
|
|
|
"sparrow/pkg/queue"
|
|
@@ -12,39 +11,28 @@ import (
|
|
|
"strings"
|
|
|
)
|
|
|
|
|
|
-var ruleNodes = map[string]*ruleEngine.RuleNode{
|
|
|
- "1": {
|
|
|
- RuleNodeId: "1",
|
|
|
- RuleChainId: "11",
|
|
|
- Type: "MsgTypeFilterNode",
|
|
|
- Name: "simple node",
|
|
|
- IsDebug: true,
|
|
|
- Config: "",
|
|
|
- },
|
|
|
-}
|
|
|
-
|
|
|
type RuleChainActor struct {
|
|
|
- actor.ContextBasedCreator
|
|
|
+ ruleEngine.ContextBasedCreator
|
|
|
ruleChain *ruleEngine.RuleChain
|
|
|
tenantId string
|
|
|
firstId string
|
|
|
firstNode *ruleEngine.RuleNodeCtx
|
|
|
started bool
|
|
|
ruleChainId string
|
|
|
- parent actor.Ref
|
|
|
+ parent ruleEngine.Ref
|
|
|
nodeActors map[string]*ruleEngine.RuleNodeCtx
|
|
|
nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
|
|
|
|
|
|
- ruleChainName string
|
|
|
-
|
|
|
- clusterService queue.ClusterService
|
|
|
+ ruleChainName string
|
|
|
+ clusterService ruleEngine.ClusterService
|
|
|
+ state ruleEngine.ComponentLifecycleState
|
|
|
}
|
|
|
|
|
|
func newRuleChainActor(
|
|
|
- sysCtx *actor.SystemContext,
|
|
|
+ sysCtx *ruleEngine.SystemContext,
|
|
|
ruleChain *ruleEngine.RuleChain,
|
|
|
tenantId string,
|
|
|
- parent actor.Ref,
|
|
|
+ parent ruleEngine.Ref,
|
|
|
) *RuleChainActor {
|
|
|
item := &RuleChainActor{
|
|
|
ruleChainId: ruleChain.ChainId,
|
|
@@ -59,28 +47,38 @@ func newRuleChainActor(
|
|
|
return item
|
|
|
}
|
|
|
|
|
|
-func (r *RuleChainActor) GetActorRef() actor.Ref {
|
|
|
+func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
|
|
|
return r.Ctx
|
|
|
}
|
|
|
|
|
|
-func (r *RuleChainActor) Init(ctx actor.Ctx) error {
|
|
|
+func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
|
|
|
if r.ruleChain != nil {
|
|
|
r.ruleChainName = r.ruleChain.Name
|
|
|
}
|
|
|
r.Ctx = ctx
|
|
|
- return nil
|
|
|
+ return r.start()
|
|
|
}
|
|
|
|
|
|
func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
|
|
|
switch msg.GetMessageType() {
|
|
|
case protocol.QUEUE_TO_RULE_ENGINE_MSG:
|
|
|
- return r.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg))
|
|
|
-
|
|
|
+ return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
|
|
|
+ case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
|
|
|
+ return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
|
|
|
+ case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
|
|
|
+ return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
-func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error {
|
|
|
+func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
|
|
|
+ if r.firstNode != nil {
|
|
|
+ r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
|
|
|
+ } else {
|
|
|
+ msg.Message.GetCallBack().OnSuccess()
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
|
|
|
actorMsg := msg.Message
|
|
|
server.Log.Debugf("Processing message")
|
|
|
if len(msg.RelationTypes) == 0 {
|
|
@@ -88,11 +86,12 @@ func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg)
|
|
|
var targetCtx *ruleEngine.RuleNodeCtx
|
|
|
if ruleNodeId == "" {
|
|
|
targetCtx = r.firstNode
|
|
|
+ actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
|
|
|
} else {
|
|
|
targetCtx = r.nodeActors[ruleNodeId]
|
|
|
}
|
|
|
if targetCtx != nil {
|
|
|
- server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, ruleNodeId)
|
|
|
+ server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
|
|
|
r.pushMsgToNode(targetCtx, actorMsg, "")
|
|
|
} else {
|
|
|
server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
|
|
@@ -104,11 +103,20 @@ func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
|
|
|
+ var errStr string
|
|
|
+ if msg.FailureMessage != nil {
|
|
|
+ errStr = msg.FailureMessage.Error()
|
|
|
+ }
|
|
|
+ r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
// on tell next actor
|
|
|
func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
|
|
|
relationTypes []string, errMsg string) {
|
|
|
originatorId := msg.Originator
|
|
|
- tpi := queue.ResolvePartition(queue.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
|
|
|
+ tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
|
|
|
var relations []*ruleEngine.RuleNodeRelation
|
|
|
if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
|
|
|
for _, item := range rs {
|
|
@@ -135,7 +143,6 @@ func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId stri
|
|
|
r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
|
|
|
}
|
|
|
} else {
|
|
|
-
|
|
|
for _, rl := range relations {
|
|
|
target := rl.Out
|
|
|
r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
|
|
@@ -151,7 +158,7 @@ func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *pro
|
|
|
targetCtx := r.nodeActors[entityId.GetId()]
|
|
|
r.pushMsgToNode(targetCtx, msg, fromRelationType)
|
|
|
case entities.RULE_CHAIN:
|
|
|
- r.parent.Tell(&actor.RuleChainToRuleChainMsg{
|
|
|
+ r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
|
|
|
TargetId: entityId.GetId(),
|
|
|
SourceId: r.ruleChainId,
|
|
|
Message: msg,
|
|
@@ -196,7 +203,7 @@ func contains(relations []string, relation string) bool {
|
|
|
// push a message to node actor
|
|
|
func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
|
|
|
if targetCtx != nil {
|
|
|
- targetCtx.SelfActor.Tell(&actor.RuleChainToRuleNodeMsg{
|
|
|
+ targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
|
|
|
Message: msg,
|
|
|
Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
|
|
|
FromRelationType: relationType,
|
|
@@ -210,12 +217,79 @@ func (r *RuleChainActor) Destroy() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (r *RuleChainActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy {
|
|
|
+func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
|
|
|
if err != nil {
|
|
|
- return actor.Stop()
|
|
|
+ return ruleEngine.Stop()
|
|
|
+ } else {
|
|
|
+ return ruleEngine.Resume()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RuleChainActor) start() error {
|
|
|
+ if !r.started {
|
|
|
+ ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if ruleChain != nil {
|
|
|
+ nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
|
|
|
+ for _, node := range nodes {
|
|
|
+ server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
|
|
|
+ ref, err := r.createNodeActor(node.RuleNodeId)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
|
|
|
+ TenantId: r.tenantId,
|
|
|
+ ChainActor: r.Ctx,
|
|
|
+ SelfActor: ref,
|
|
|
+ Self: node,
|
|
|
+ }
|
|
|
+ r.started = true
|
|
|
+ }
|
|
|
+ r.initRoutes(r.ruleChain, nodes)
|
|
|
+ }
|
|
|
} else {
|
|
|
- return actor.Resume()
|
|
|
+ return r.update()
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) {
|
|
|
+ for _, node := range nodes {
|
|
|
+ relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
|
|
|
+ if err != nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ var rs []*ruleEngine.RuleNodeRelation
|
|
|
+ for _, relation := range relations {
|
|
|
+
|
|
|
+ rs = append(rs, &ruleEngine.RuleNodeRelation{
|
|
|
+ In: &entities.RuleNodeId{Id: node.RuleNodeId},
|
|
|
+ Out: &entities.RuleNodeId{Id: relation.To},
|
|
|
+ Type: relation.Type,
|
|
|
+ })
|
|
|
+ }
|
|
|
+ r.nodeRoutes[node.RuleNodeId] = rs
|
|
|
}
|
|
|
+ r.firstId = ruleChain.FirstNodeId
|
|
|
+ r.firstNode = r.nodeActors[r.firstId]
|
|
|
+ r.state = ruleEngine.ACTIVE
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RuleChainActor) update() error {
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
|
|
|
+ return r.Ctx.GetOrCreateChildActor(nodeId,
|
|
|
+ ruleEngine.RULE_DISPATCHER_NAME,
|
|
|
+ NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
|
|
|
+ r.ruleChainId, r.ruleChainName, nodeId, r.Ctx))
|
|
|
}
|
|
|
|
|
|
// RuleChainCreator
|
|
@@ -225,10 +299,10 @@ type RuleChainCreator struct {
|
|
|
|
|
|
//NewRuleChainCreator create a instance
|
|
|
func NewRuleChainCreator(
|
|
|
- sysCtx *actor.SystemContext,
|
|
|
+ sysCtx *ruleEngine.SystemContext,
|
|
|
tenantId string,
|
|
|
ruleChan *ruleEngine.RuleChain,
|
|
|
- parent actor.Ref,
|
|
|
+ parent ruleEngine.Ref,
|
|
|
) *RuleChainCreator {
|
|
|
item := &RuleChainCreator{}
|
|
|
item.tenantId = tenantId
|
|
@@ -242,6 +316,6 @@ func (r *RuleChainCreator) CreateActorId() string {
|
|
|
return r.ruleChain.ChainId
|
|
|
}
|
|
|
|
|
|
-func (r *RuleChainCreator) CreateActor() actor.Actor {
|
|
|
+func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
|
|
|
return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
|
|
|
}
|