rule_node_actor.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/ruleEngine"
  7. "sparrow/pkg/ruleEngine/nodes"
  8. "sparrow/pkg/server"
  9. )
  10. type RuleNodeActor struct {
  11. ruleChainName string
  12. self ruleEngine.Ref
  13. ruleNode *ruleEngine.RuleNode
  14. node ruleEngine.Node
  15. ruleChainNodeId string
  16. ruleNodeId string
  17. ruleEngine.ContextBasedCreator
  18. tenantId string
  19. defaultContext ruleEngine.Context
  20. parent ruleEngine.Ref
  21. info *protocol.RuleNodeInfo
  22. }
  23. func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref {
  24. return r.Ctx
  25. }
  26. func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
  27. r.Ctx = ctx
  28. result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  29. if err != nil {
  30. return err
  31. }
  32. r.ruleNode = result
  33. r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
  34. TenantId: r.tenantId,
  35. ChainActor: r.parent,
  36. SelfActor: r.self,
  37. Self: r.ruleNode,
  38. }, r.SystemCtx)
  39. r.info = &protocol.RuleNodeInfo{
  40. RuleNodeId: r.ruleNodeId,
  41. RuleChainName: r.ruleChainName,
  42. RuleNodeName: r.ruleNode.Name,
  43. }
  44. node, err := r.initComponent(r.ruleNode)
  45. if err != nil {
  46. return err
  47. }
  48. if node != nil {
  49. r.node = node
  50. }
  51. return nil
  52. }
  53. // 实例化node
  54. func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) {
  55. if ruleNode != nil {
  56. s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config)
  57. if !ok {
  58. return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type))
  59. }
  60. return s.(ruleEngine.Node), nil
  61. }
  62. return nil, nil
  63. }
  64. func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
  65. switch msg.GetMessageType() {
  66. case protocol.RULE_CHAIN_TO_RULE_MSG:
  67. return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg))
  68. case protocol.RULE_TO_SELF_MSG:
  69. return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
  70. case protocol.RULE_TO_SELF_ERROR_MSG:
  71. return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
  72. default:
  73. return errors.New("未知的消息类型")
  74. }
  75. }
  76. func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
  77. server.Log.Error(msg.Err)
  78. return nil
  79. }
  80. // TODO:处理消息数量支持实现以及最大处理能力逻辑
  81. func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
  82. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  83. actorMsg := msg.Message
  84. ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  85. if ruleNodeCount < 20 {
  86. if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
  87. r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
  88. return err
  89. }
  90. } else {
  91. actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
  92. }
  93. return nil
  94. }
  95. func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error {
  96. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  97. msg.Message.GetCallBack().OnProcessingStart(r.info)
  98. actorMsg := msg.Message
  99. // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  100. err := r.node.OnMessage(msg.Ctx, actorMsg)
  101. if err != nil {
  102. msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
  103. return err
  104. }
  105. return nil
  106. }
  107. func (r *RuleNodeActor) Destroy() error {
  108. panic("implement me")
  109. }
  110. func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  111. panic("implement me")
  112. }
  113. type RuleNodeActorCreator struct {
  114. RuleNodeActor
  115. }
  116. func NewRuleNodeActorCreator(
  117. sysCtx *ruleEngine.SystemContext,
  118. tenantId string,
  119. ruleChainId string,
  120. ruleChainName string,
  121. ruleNodeId string,
  122. parent ruleEngine.Ref,
  123. ) *RuleNodeActorCreator {
  124. item := new(RuleNodeActorCreator)
  125. item.SystemCtx = sysCtx
  126. item.ruleNodeId = ruleNodeId
  127. item.ruleChainName = ruleChainName
  128. item.tenantId = tenantId
  129. item.ruleChainNodeId = ruleChainId
  130. item.parent = parent
  131. return item
  132. }
  133. func (r *RuleNodeActorCreator) CreateActorId() string {
  134. return r.ruleNodeId
  135. }
  136. func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor {
  137. item := &RuleNodeActor{
  138. ruleChainName: r.ruleChainName,
  139. ruleChainNodeId: r.ruleChainNodeId,
  140. ruleNodeId: r.ruleNodeId,
  141. tenantId: r.tenantId,
  142. parent: r.parent,
  143. }
  144. item.SystemCtx = r.SystemCtx
  145. return item
  146. }