package actors import ( "errors" "fmt" "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/ruleEngine/nodes" "sparrow/pkg/server" ) type RuleNodeActor struct { ruleChainName string self ruleEngine.Ref ruleNode *ruleEngine.RuleNode node ruleEngine.Node ruleChainNodeId string ruleNodeId string ruleEngine.ContextBasedCreator tenantId string defaultContext ruleEngine.Context parent ruleEngine.Ref state ruleEngine.ComponentLifecycleState info *protocol.RuleNodeInfo } func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref { return r.Ctx } func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error { r.Ctx = ctx result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId) if err != nil { return err } r.ruleNode = result r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{ TenantId: r.tenantId, ChainActor: r.parent, SelfActor: r.self, Self: r.ruleNode, }, r.SystemCtx) r.info = &protocol.RuleNodeInfo{ RuleNodeId: r.ruleNodeId, RuleChainName: r.ruleChainName, RuleNodeName: r.ruleNode.Name, } return r.start() } // 实例化node func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) { if ruleNode != nil { s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config) if !ok { return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type)) } return s.(ruleEngine.Node), nil } return nil, nil } func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error { switch msg.GetMessageType() { case protocol.RULE_CHAIN_TO_RULE_MSG: return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg)) case protocol.RULE_TO_SELF_MSG: return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg)) case protocol.RULE_TO_SELF_ERROR_MSG: return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg)) case protocol.COMPONENT_LIFE_CYCLE_MSG: return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg)) default: return errors.New("未知的消息类型") } } // node actor lifecycle handle func (r *RuleNodeActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error { switch msg.EventType { case ruleEngine.CREATED: return r.start() case ruleEngine.UPDATED: return r.update() case ruleEngine.ACTIVATED: if err := r.stop(); err != nil { return err } if err := r.start();err != nil { return err } case ruleEngine.DELETED: if err := r.stop(); err != nil { return err } return r.Ctx.Stop(r.Ctx.GetActorId()) default: } return nil } func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error { server.Log.Error(msg.Err) return nil } // TODO:处理消息数量支持实现以及最大处理能力逻辑 func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error { server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name) actorMsg := msg.Message ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter() if ruleNodeCount < 20 { if r.ruleNode.IsDebug { _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil) } if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil { r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error")) return err } } else { actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount))) } return nil } func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error { server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name) msg.Message.GetCallBack().OnProcessingStart(r.info) actorMsg := msg.Message // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter() if r.ruleNode.IsDebug { _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil) } err := r.node.OnMessage(msg.Ctx, actorMsg) if err != nil { msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error")) return err } return nil } func (r *RuleNodeActor) Destroy() error { return nil } func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy { if err != nil { return ruleEngine.Stop() } else { return ruleEngine.Resume() } } func (r *RuleNodeActor) start() error { node, err := r.initComponent(r.ruleNode) if err != nil { return err } if node != nil { r.state = ruleEngine.ACTIVE r.node = node } return nil } func (r *RuleNodeActor) update() error { node, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId) if err != nil { return err } if node == nil { return errors.New(fmt.Sprintf("node not found %s", r.ruleNodeId)) } r.info = &protocol.RuleNodeInfo{ RuleNodeId: node.RuleNodeId, RuleChainName: r.ruleChainName, RuleNodeName: node.Name, } r.ruleNode = node r.defaultContext.GetRuleNodeCtx().Self = node return r.start() } func (r *RuleNodeActor) stop() error { if r.node != nil { r.state = ruleEngine.SUSPENDED } return nil } type RuleNodeActorCreator struct { RuleNodeActor } func NewRuleNodeActorCreator( sysCtx *ruleEngine.SystemContext, tenantId string, ruleChainId string, ruleChainName string, ruleNodeId string, parent ruleEngine.Ref, ) *RuleNodeActorCreator { item := new(RuleNodeActorCreator) item.SystemCtx = sysCtx item.ruleNodeId = ruleNodeId item.ruleChainName = ruleChainName item.tenantId = tenantId item.ruleChainNodeId = ruleChainId item.parent = parent return item } func (r *RuleNodeActorCreator) CreateActorId() string { return r.ruleNodeId } func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor { item := &RuleNodeActor{ ruleChainName: r.ruleChainName, ruleChainNodeId: r.ruleChainNodeId, ruleNodeId: r.ruleNodeId, tenantId: r.tenantId, parent: r.parent, } item.SystemCtx = r.SystemCtx return item }