rule_node_actor.go 6.0 KB


  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/ruleEngine"
  8. "sparrow/pkg/ruleEngine/nodes"
  9. "sparrow/pkg/server"
  10. )
  11. type RuleNodeActor struct {
  12. ruleChainName string
  13. self ruleEngine.Ref
  14. ruleNode *ruleEngine.RuleNode
  15. node ruleEngine.Node
  16. ruleChainNodeId string
  17. ruleNodeId string
  18. ruleEngine.ContextBasedCreator
  19. tenantId string
  20. defaultContext ruleEngine.Context
  21. parent ruleEngine.Ref
  22. state ruleEngine.ComponentLifecycleState
  23. info *protocol.RuleNodeInfo
  24. }
  25. func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref {
  26. return r.Ctx
  27. }
  28. func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
  29. r.Ctx = ctx
  30. result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  31. if err != nil {
  32. return err
  33. }
  34. r.ruleNode = result
  35. r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
  36. TenantId: r.tenantId,
  37. ChainActor: r.parent,
  38. SelfActor: r.self,
  39. Self: r.ruleNode,
  40. }, r.SystemCtx)
  41. r.info = &protocol.RuleNodeInfo{
  42. RuleNodeId: r.ruleNodeId,
  43. RuleChainName: r.ruleChainName,
  44. RuleNodeName: r.ruleNode.Name,
  45. }
  46. return r.start()
  47. }
  48. // 实例化node
  49. func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) {
  50. if ruleNode != nil {
  51. s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config)
  52. if !ok {
  53. return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type))
  54. }
  55. return s.(ruleEngine.Node), nil
  56. }
  57. return nil, nil
  58. }
  59. func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
  60. switch msg.GetMessageType() {
  61. case protocol.RULE_CHAIN_TO_RULE_MSG:
  62. return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg))
  63. case protocol.RULE_TO_SELF_MSG:
  64. return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
  65. case protocol.RULE_TO_SELF_ERROR_MSG:
  66. return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
  67. case protocol.COMPONENT_LIFE_CYCLE_MSG:
  68. return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
  69. default:
  70. return errors.New("未知的消息类型")
  71. }
  72. }
  73. // node actor lifecycle handle
  74. func (r *RuleNodeActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
  75. switch msg.EventType {
  76. case ruleEngine.CREATED:
  77. return r.start()
  78. case ruleEngine.UPDATED:
  79. return r.update()
  80. case ruleEngine.ACTIVATED:
  81. if err := r.stop(); err != nil {
  82. return err
  83. }
  84. if err := r.start();err != nil {
  85. return err
  86. }
  87. case ruleEngine.DELETED:
  88. if err := r.stop(); err != nil {
  89. return err
  90. }
  91. return r.Ctx.Stop(r.Ctx.GetActorId())
  92. default:
  93. }
  94. }
  95. func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
  96. server.Log.Error(msg.Err)
  97. return nil
  98. }
  99. // TODO:处理消息数量支持实现以及最大处理能力逻辑
  100. func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
  101. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  102. actorMsg := msg.Message
  103. ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  104. if ruleNodeCount < 20 {
  105. if r.ruleNode.IsDebug {
  106. _ = r.SystemCtx.PersistDebugInput(r.tenantId,
  107. &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil)
  108. }
  109. if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
  110. r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
  111. return err
  112. }
  113. } else {
  114. actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
  115. }
  116. return nil
  117. }
  118. func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error {
  119. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  120. msg.Message.GetCallBack().OnProcessingStart(r.info)
  121. actorMsg := msg.Message
  122. // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  123. if r.ruleNode.IsDebug {
  124. _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil)
  125. }
  126. err := r.node.OnMessage(msg.Ctx, actorMsg)
  127. if err != nil {
  128. msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
  129. return err
  130. }
  131. return nil
  132. }
  133. func (r *RuleNodeActor) Destroy() error {
  134. return nil
  135. }
  136. func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  137. if err != nil {
  138. return ruleEngine.Stop()
  139. } else {
  140. return ruleEngine.Resume()
  141. }
  142. }
  143. func (r *RuleNodeActor) start() error {
  144. node, err := r.initComponent(r.ruleNode)
  145. if err != nil {
  146. return err
  147. }
  148. if node != nil {
  149. r.state = ruleEngine.ACTIVE
  150. }
  151. return nil
  152. }
  153. func (r *RuleNodeActor) update() error {
  154. node, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  155. if err != nil {
  156. return err
  157. }
  158. if node == nil {
  159. return errors.New(fmt.Sprintf("node not found %s", r.ruleNodeId))
  160. }
  161. r.info = &protocol.RuleNodeInfo{
  162. RuleNodeId: node.RuleNodeId,
  163. RuleChainName: r.ruleChainName,
  164. RuleNodeName: node.Name,
  165. }
  166. r.ruleNode = node
  167. r.defaultContext.GetRuleNodeCtx().Self = node
  168. return r.start()
  169. }
  170. func (r *RuleNodeActor) stop() error {
  171. if r.node != nil {
  172. r.state = ruleEngine.SUSPENDED
  173. }
  174. }
  175. type RuleNodeActorCreator struct {
  176. RuleNodeActor
  177. }
  178. func NewRuleNodeActorCreator(
  179. sysCtx *ruleEngine.SystemContext,
  180. tenantId string,
  181. ruleChainId string,
  182. ruleChainName string,
  183. ruleNodeId string,
  184. parent ruleEngine.Ref,
  185. ) *RuleNodeActorCreator {
  186. item := new(RuleNodeActorCreator)
  187. item.SystemCtx = sysCtx
  188. item.ruleNodeId = ruleNodeId
  189. item.ruleChainName = ruleChainName
  190. item.tenantId = tenantId
  191. item.ruleChainNodeId = ruleChainId
  192. item.parent = parent
  193. return item
  194. }
  195. func (r *RuleNodeActorCreator) CreateActorId() string {
  196. return r.ruleNodeId
  197. }
  198. func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor {
  199. item := &RuleNodeActor{
  200. ruleChainName: r.ruleChainName,
  201. ruleChainNodeId: r.ruleChainNodeId,
  202. ruleNodeId: r.ruleNodeId,
  203. tenantId: r.tenantId,
  204. parent: r.parent,
  205. }
  206. item.SystemCtx = r.SystemCtx
  207. return item
  208. }