123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package actors
- import (
- "errors"
- "fmt"
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/ruleEngine/nodes"
- "sparrow/pkg/server"
- )
- type RuleNodeActor struct {
- ruleChainName string
- self ruleEngine.Ref
- ruleNode *ruleEngine.RuleNode
- node ruleEngine.Node
- ruleChainNodeId string
- ruleNodeId string
- ruleEngine.ContextBasedCreator
- tenantId string
- defaultContext ruleEngine.Context
- parent ruleEngine.Ref
- info *protocol.RuleNodeInfo
- }
- func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref {
- return r.Ctx
- }
- func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
- r.Ctx = ctx
- result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
- if err != nil {
- return err
- }
- r.ruleNode = result
- r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
- TenantId: r.tenantId,
- ChainActor: r.parent,
- SelfActor: r.self,
- Self: r.ruleNode,
- }, r.SystemCtx)
- r.info = &protocol.RuleNodeInfo{
- RuleNodeId: r.ruleNodeId,
- RuleChainName: r.ruleChainName,
- RuleNodeName: r.ruleNode.Name,
- }
- node, err := r.initComponent(r.ruleNode)
- if err != nil {
- return err
- }
- if node != nil {
- r.node = node
- }
- return nil
- }
- // 实例化node
- func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) {
- if ruleNode != nil {
- s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config)
- if !ok {
- return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type))
- }
- return s.(ruleEngine.Node), nil
- }
- return nil, nil
- }
- func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
- switch msg.GetMessageType() {
- case protocol.RULE_CHAIN_TO_RULE_MSG:
- return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg))
- case protocol.RULE_TO_SELF_MSG:
- return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
- case protocol.RULE_TO_SELF_ERROR_MSG:
- return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
- default:
- return errors.New("未知的消息类型")
- }
- }
- func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
- server.Log.Error(msg.Err)
- return nil
- }
- // TODO:处理消息数量支持实现以及最大处理能力逻辑
- func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
- server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
- actorMsg := msg.Message
- ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
- if ruleNodeCount < 20 {
- if r.ruleNode.IsDebug {
- _ = r.SystemCtx.PersistDebugInput(r.tenantId,
- &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil)
- }
- if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
- r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
- return err
- }
- } else {
- actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
- }
- return nil
- }
- func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error {
- server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
- msg.Message.GetCallBack().OnProcessingStart(r.info)
- actorMsg := msg.Message
- // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
- if r.ruleNode.IsDebug {
- _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil)
- }
- err := r.node.OnMessage(msg.Ctx, actorMsg)
- if err != nil {
- msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
- return err
- }
- return nil
- }
- func (r *RuleNodeActor) Destroy() error {
- return nil
- }
- func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
- if err != nil {
- return ruleEngine.Stop()
- } else {
- return ruleEngine.Resume()
- }
- }
- type RuleNodeActorCreator struct {
- RuleNodeActor
- }
- func NewRuleNodeActorCreator(
- sysCtx *ruleEngine.SystemContext,
- tenantId string,
- ruleChainId string,
- ruleChainName string,
- ruleNodeId string,
- parent ruleEngine.Ref,
- ) *RuleNodeActorCreator {
- item := new(RuleNodeActorCreator)
- item.SystemCtx = sysCtx
- item.ruleNodeId = ruleNodeId
- item.ruleChainName = ruleChainName
- item.tenantId = tenantId
- item.ruleChainNodeId = ruleChainId
- item.parent = parent
- return item
- }
- func (r *RuleNodeActorCreator) CreateActorId() string {
- return r.ruleNodeId
- }
- func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor {
- item := &RuleNodeActor{
- ruleChainName: r.ruleChainName,
- ruleChainNodeId: r.ruleChainNodeId,
- ruleNodeId: r.ruleNodeId,
- tenantId: r.tenantId,
- parent: r.parent,
- }
- item.SystemCtx = r.SystemCtx
- return item
- }
|