|
- package actors
- import (
- "errors"
- "fmt"
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- "strings"
- )
- type RuleChainActor struct {
- ruleEngine.ContextBasedCreator
- ruleChain *ruleEngine.RuleChain
- tenantId string
- firstId string
- firstNode *ruleEngine.RuleNodeCtx
- started bool
- ruleChainId string
- parent ruleEngine.Ref
- nodeActors map[string]*ruleEngine.RuleNodeCtx
- nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
- ruleChainName string
- clusterService ruleEngine.ClusterService
- state ruleEngine.ComponentLifecycleState
- }
- func newRuleChainActor(
- sysCtx *ruleEngine.SystemContext,
- ruleChain *ruleEngine.RuleChain,
- tenantId string,
- parent ruleEngine.Ref,
- ) *RuleChainActor {
- item := &RuleChainActor{
- ruleChainId: ruleChain.ChainId,
- ruleChain: ruleChain,
- tenantId: tenantId,
- parent: parent,
- nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
- nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
- clusterService: sysCtx.ClusterService,
- }
- item.SystemCtx = sysCtx
- return item
- }
- func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
- return r.Ctx
- }
- func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
- if r.ruleChain != nil {
- r.ruleChainName = r.ruleChain.Name
- }
- r.Ctx = ctx
- return r.start()
- }
- func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
- switch msg.GetMessageType() {
- case protocol.QUEUE_TO_RULE_ENGINE_MSG:
- return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
- case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
- return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
- case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
- return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
- case protocol.COMPONENT_LIFE_CYCLE_MSG:
- return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
- }
- return nil
- }
- // TODO:生命周期事件写入数据库
- func (r *RuleChainActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
- server.Log.Debugf("%s,%s,%s onComponentLifecycleMsg", msg.TenantId, msg.EntityId.GetEntityType(), msg.EntityId.GetId())
- switch msg.EventType {
- case ruleEngine.CREATED:
- return r.start()
- case ruleEngine.UPDATED:
- return r.update()
- case ruleEngine.ACTIVATED:
- return r.restart()
- case ruleEngine.DELETED:
- _ = r.stop()
- return r.Ctx.Stop(r.ruleChainId)
- default:
- }
- return nil
- }
- func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
- if r.firstNode != nil {
- r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
- } else {
- msg.Message.GetCallBack().OnSuccess()
- }
- return nil
- }
- func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
- actorMsg := msg.Message
- server.Log.Debugf("Processing message")
- if len(msg.RelationTypes) == 0 {
- ruleNodeId := actorMsg.RuleNodeId
- var targetCtx *ruleEngine.RuleNodeCtx
- if ruleNodeId == "" {
- targetCtx = r.firstNode
- actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
- } else {
- targetCtx = r.nodeActors[ruleNodeId]
- }
- if targetCtx != nil {
- server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
- r.pushMsgToNode(targetCtx, actorMsg, "")
- } else {
- server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
- actorMsg.GetCallBack().OnSuccess()
- }
- } else {
- r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
- }
- return nil
- }
- func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
- var errStr string
- if msg.FailureMessage != nil {
- errStr = msg.FailureMessage.Error()
- }
- r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
- return nil
- }
- // on tell next actor
- func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
- relationTypes []string, errMsg string) {
- originatorId := msg.Originator
- tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
- var relations []*ruleEngine.RuleNodeRelation
- if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
- for _, item := range rs {
- if contains(relationTypes, item.Type) {
- relations = append(relations, item)
- }
- }
- }
- if len(relations) == 0 {
- server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
- if contains(relationTypes, "Failure") {
- if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
- msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
- "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
- } else {
- msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
- }
- } else {
- msg.GetCallBack().OnSuccess()
- }
- } else if len(relations) == 1 {
- for _, rl := range relations {
- server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
- r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
- }
- } else {
- for _, rl := range relations {
- target := rl.Out
- r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
- }
- }
- }
- // push a message to target ctx
- func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
- if tpi.MyPartition {
- switch entityId.GetEntityType() {
- case entities.RULE_NODE:
- targetCtx := r.nodeActors[entityId.GetId()]
- r.pushMsgToNode(targetCtx, msg, fromRelationType)
- case entities.RULE_CHAIN:
- r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
- TargetId: entityId.GetId(),
- SourceId: r.ruleChainId,
- Message: msg,
- FromRelationType: fromRelationType,
- })
- }
- } else {
- r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
- }
- }
- // 把消息放到队列中
- func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
- switch targetEntity.GetEntityType() {
- case entities.RULE_NODE:
- r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
- case entities.RULE_CHAIN:
- r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
- }
- }
- func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
- r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback)
- }
- func contains(relations []string, relation string) bool {
- if len(relations) == 0 {
- return false
- }
- for _, item := range relations {
- if strings.ToLower(item) == strings.ToLower(relation) {
- return true
- }
- }
- return false
- }
- // push a message to node actor
- func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
- if targetCtx != nil {
- targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
- Message: msg,
- Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
- FromRelationType: relationType,
- })
- } else {
- server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
- }
- }
- func (r *RuleChainActor) Destroy() error {
- return nil
- }
- func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
- if err != nil {
- return ruleEngine.Stop()
- } else {
- return ruleEngine.Resume()
- }
- }
- func (r *RuleChainActor) start() error {
- if !r.started {
- ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
- if err != nil {
- return err
- }
- if ruleChain != nil {
- nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
- if err != nil {
- return err
- }
- server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
- for _, node := range nodes {
- server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
- ref, err := r.createNodeActor(node.RuleNodeId)
- if err != nil {
- continue
- }
- r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
- TenantId: r.tenantId,
- ChainActor: r.Ctx,
- SelfActor: ref,
- Self: node,
- }
- r.started = true
- }
- return r.initRoutes(r.ruleChain, nodes)
- }
- } else {
- return r.update()
- }
- return nil
- }
- func (r *RuleChainActor) update() error {
- ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
- if err != nil {
- return err
- }
- if ruleChain == nil {
- return errors.New(fmt.Sprintf("rule chain not found :%s", r.ruleChainId))
- }
- // 查询链上的所有节点
- nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, r.ruleChainId)
- if err != nil {
- return err
- }
- for _, v := range nodes {
- // 如果找不到节点actor,则创建
- if actor, ok := r.nodeActors[v.RuleNodeId]; !ok {
- server.Log.Debugf("creating rule node actor:%s", v.RuleNodeId)
- ref, err := r.createNodeActor(v.RuleNodeId)
- if err != nil {
- return err
- }
- r.nodeActors[v.RuleNodeId] = &ruleEngine.RuleNodeCtx{
- TenantId: r.tenantId,
- ChainActor: r.Ctx,
- SelfActor: ref,
- Self: v,
- }
- } else {
- // 传递消息到node actor
- server.Log.Debugf("updating rule node actor:%s", v.RuleNodeId)
- actor.Self = v
- actor.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
- TenantId: r.tenantId,
- EntityId: &entities.RuleNodeId{Id: v.RuleNodeId},
- EventType: ruleEngine.UPDATED,
- })
- }
- }
- var removeNodes []string
- // 对比已经有节点和最新节点列表,找出差集,并移除
- for k := range r.nodeActors {
- var found = false
- for _, v := range nodes {
- if v.RuleNodeId == k {
- found = true
- break
- }
- }
- if !found {
- removeNodes = append(removeNodes, k)
- }
- }
- // remove actors
- for _, v := range removeNodes {
- server.Log.Debugf("remove rule node :%s", v)
- if ref, ok := r.nodeActors[v]; ok {
- ref.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
- TenantId: r.tenantId,
- EntityId: &entities.RuleNodeId{Id: v},
- EventType: ruleEngine.DELETED,
- })
- delete(r.nodeActors, v)
- }
- }
- return r.initRoutes(r.ruleChain, nodes)
- }
- func (r *RuleChainActor) restart() error {
- if err := r.stop(); err != nil {
- return err
- }
- if err := r.start(); err != nil {
- return err
- }
- return nil
- }
- func (r *RuleChainActor) stop() error {
- server.Log.Debugf("stopping rule chain with %d nodes, tenantId:%s, entityId:%s", len(r.nodeActors), r.tenantId, r.ruleChainId)
- for actorId := range r.nodeActors {
- err := r.Ctx.Stop(actorId)
- if err != nil {
- return err
- }
- }
- r.started = false
- return nil
- }
- func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) error {
- for _, node := range nodes {
- relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
- if err != nil {
- return err
- }
- var rs []*ruleEngine.RuleNodeRelation
- for _, relation := range relations {
- rs = append(rs, &ruleEngine.RuleNodeRelation{
- In: &entities.RuleNodeId{Id: node.RuleNodeId},
- Out: &entities.RuleNodeId{Id: relation.To},
- Type: relation.Type,
- })
- }
- r.nodeRoutes[node.RuleNodeId] = rs
- }
- r.firstId = ruleChain.FirstNodeId
- r.firstNode = r.nodeActors[r.firstId]
- r.state = ruleEngine.ACTIVE
- return nil
- }
- func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
- return r.Ctx.GetOrCreateChildActor(nodeId,
- ruleEngine.RULE_DISPATCHER_NAME,
- NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
- r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
- }
- // RuleChainCreator
- type RuleChainCreator struct {
- RuleChainActor
- }
- //NewRuleChainCreator create a instance
- func NewRuleChainCreator(
- sysCtx *ruleEngine.SystemContext,
- tenantId string,
- ruleChan *ruleEngine.RuleChain,
- parent ruleEngine.Ref,
- ) *RuleChainCreator {
- item := &RuleChainCreator{}
- item.tenantId = tenantId
- item.ruleChain = ruleChan
- item.parent = parent
- item.SystemCtx = sysCtx
- return item
- }
- func (r *RuleChainCreator) CreateActorId() string {
- return r.ruleChain.ChainId
- }
- func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
- return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
- }
|