rule_node_actor.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/ruleEngine"
  8. "sparrow/pkg/ruleEngine/nodes"
  9. "sparrow/pkg/server"
  10. )
  11. type RuleNodeActor struct {
  12. ruleChainName string
  13. self ruleEngine.Ref
  14. ruleNode *ruleEngine.RuleNode
  15. node ruleEngine.Node
  16. ruleChainNodeId string
  17. ruleNodeId string
  18. ruleEngine.ContextBasedCreator
  19. tenantId string
  20. defaultContext ruleEngine.Context
  21. parent ruleEngine.Ref
  22. info *protocol.RuleNodeInfo
  23. }
  24. func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref {
  25. return r.Ctx
  26. }
  27. func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
  28. r.Ctx = ctx
  29. result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  30. if err != nil {
  31. return err
  32. }
  33. r.ruleNode = result
  34. r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
  35. TenantId: r.tenantId,
  36. ChainActor: r.parent,
  37. SelfActor: r.self,
  38. Self: r.ruleNode,
  39. }, r.SystemCtx)
  40. r.info = &protocol.RuleNodeInfo{
  41. RuleNodeId: r.ruleNodeId,
  42. RuleChainName: r.ruleChainName,
  43. RuleNodeName: r.ruleNode.Name,
  44. }
  45. node, err := r.initComponent(r.ruleNode)
  46. if err != nil {
  47. return err
  48. }
  49. if node != nil {
  50. r.node = node
  51. }
  52. return nil
  53. }
  54. // 实例化node
  55. func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) {
  56. if ruleNode != nil {
  57. s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config)
  58. if !ok {
  59. return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type))
  60. }
  61. return s.(ruleEngine.Node), nil
  62. }
  63. return nil, nil
  64. }
  65. func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
  66. switch msg.GetMessageType() {
  67. case protocol.RULE_CHAIN_TO_RULE_MSG:
  68. return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg))
  69. case protocol.RULE_TO_SELF_MSG:
  70. return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
  71. case protocol.RULE_TO_SELF_ERROR_MSG:
  72. return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
  73. default:
  74. return errors.New("未知的消息类型")
  75. }
  76. }
  77. func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
  78. server.Log.Error(msg.Err)
  79. return nil
  80. }
  81. // TODO:处理消息数量支持实现以及最大处理能力逻辑
  82. func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
  83. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  84. actorMsg := msg.Message
  85. ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  86. if ruleNodeCount < 20 {
  87. if r.ruleNode.IsDebug {
  88. _ = r.SystemCtx.PersistDebugInput(r.tenantId,
  89. &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil)
  90. }
  91. if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
  92. r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
  93. return err
  94. }
  95. } else {
  96. actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
  97. }
  98. return nil
  99. }
  100. func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error {
  101. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  102. msg.Message.GetCallBack().OnProcessingStart(r.info)
  103. actorMsg := msg.Message
  104. // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  105. if r.ruleNode.IsDebug {
  106. _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil)
  107. }
  108. err := r.node.OnMessage(msg.Ctx, actorMsg)
  109. if err != nil {
  110. msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
  111. return err
  112. }
  113. return nil
  114. }
  115. func (r *RuleNodeActor) Destroy() error {
  116. return nil
  117. }
  118. func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  119. if err != nil {
  120. return ruleEngine.Stop()
  121. } else {
  122. return ruleEngine.Resume()
  123. }
  124. }
  125. type RuleNodeActorCreator struct {
  126. RuleNodeActor
  127. }
  128. func NewRuleNodeActorCreator(
  129. sysCtx *ruleEngine.SystemContext,
  130. tenantId string,
  131. ruleChainId string,
  132. ruleChainName string,
  133. ruleNodeId string,
  134. parent ruleEngine.Ref,
  135. ) *RuleNodeActorCreator {
  136. item := new(RuleNodeActorCreator)
  137. item.SystemCtx = sysCtx
  138. item.ruleNodeId = ruleNodeId
  139. item.ruleChainName = ruleChainName
  140. item.tenantId = tenantId
  141. item.ruleChainNodeId = ruleChainId
  142. item.parent = parent
  143. return item
  144. }
  145. func (r *RuleNodeActorCreator) CreateActorId() string {
  146. return r.ruleNodeId
  147. }
  148. func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor {
  149. item := &RuleNodeActor{
  150. ruleChainName: r.ruleChainName,
  151. ruleChainNodeId: r.ruleChainNodeId,
  152. ruleNodeId: r.ruleNodeId,
  153. tenantId: r.tenantId,
  154. parent: r.parent,
  155. }
  156. item.SystemCtx = r.SystemCtx
  157. return item
  158. }