rule_node_actor.go 6.0 KB


  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/ruleEngine"
  8. "sparrow/pkg/ruleEngine/nodes"
  9. "sparrow/pkg/server"
  10. )
  11. type RuleNodeActor struct {
  12. ruleChainName string
  13. self ruleEngine.Ref
  14. ruleNode *ruleEngine.RuleNode
  15. node ruleEngine.Node
  16. ruleChainNodeId string
  17. ruleNodeId string
  18. ruleEngine.ContextBasedCreator
  19. tenantId string
  20. defaultContext ruleEngine.Context
  21. parent ruleEngine.Ref
  22. state ruleEngine.ComponentLifecycleState
  23. info *protocol.RuleNodeInfo
  24. }
  25. func (r *RuleNodeActor) GetActorRef() ruleEngine.Ref {
  26. return r.Ctx
  27. }
  28. func (r *RuleNodeActor) Init(ctx ruleEngine.Ctx) error {
  29. r.Ctx = ctx
  30. result, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  31. if err != nil {
  32. return err
  33. }
  34. r.ruleNode = result
  35. r.defaultContext = ruleEngine.NewDefaultContext(&ruleEngine.RuleNodeCtx{
  36. TenantId: r.tenantId,
  37. ChainActor: r.parent,
  38. SelfActor: r.self,
  39. Self: r.ruleNode,
  40. }, r.SystemCtx)
  41. r.info = &protocol.RuleNodeInfo{
  42. RuleNodeId: r.ruleNodeId,
  43. RuleChainName: r.ruleChainName,
  44. RuleNodeName: r.ruleNode.Name,
  45. }
  46. return r.start()
  47. }
  48. // 实例化node
  49. func (r *RuleNodeActor) initComponent(ruleNode *ruleEngine.RuleNode) (ruleEngine.Node, error) {
  50. if ruleNode != nil {
  51. s, ok := nodes.CreateNodeByConfig(ruleNode.Type, r.defaultContext, r.ruleNode.Config)
  52. if !ok {
  53. return nil, errors.New(fmt.Sprintf("create node instance faild, %s", ruleNode.Type))
  54. }
  55. return s.(ruleEngine.Node), nil
  56. }
  57. return nil, nil
  58. }
  59. func (r *RuleNodeActor) Process(msg protocol.ActorMsg) error {
  60. switch msg.GetMessageType() {
  61. case protocol.RULE_CHAIN_TO_RULE_MSG:
  62. return r.onRuleChainToNodeMsg(msg.(*ruleEngine.RuleChainToRuleNodeMsg))
  63. case protocol.RULE_TO_SELF_MSG:
  64. return r.onRuleToSelfMsg(msg.(*ruleEngine.RuleToSelfMsg))
  65. case protocol.RULE_TO_SELF_ERROR_MSG:
  66. return r.onRuleToSelfErrorMsg(msg.(*ruleEngine.RuleToSelfErrorMsg))
  67. case protocol.COMPONENT_LIFE_CYCLE_MSG:
  68. return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
  69. default:
  70. return errors.New("未知的消息类型")
  71. }
  72. }
  73. // node actor lifecycle handle
  74. func (r *RuleNodeActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
  75. switch msg.EventType {
  76. case ruleEngine.CREATED:
  77. return r.start()
  78. case ruleEngine.UPDATED:
  79. return r.update()
  80. case ruleEngine.ACTIVATED:
  81. if err := r.stop(); err != nil {
  82. return err
  83. }
  84. if err := r.start();err != nil {
  85. return err
  86. }
  87. case ruleEngine.DELETED:
  88. if err := r.stop(); err != nil {
  89. return err
  90. }
  91. return r.Ctx.Stop(r.Ctx.GetActorId())
  92. default:
  93. }
  94. return nil
  95. }
  96. func (r *RuleNodeActor) onRuleToSelfErrorMsg(msg *ruleEngine.RuleToSelfErrorMsg) error {
  97. server.Log.Error(msg.Err)
  98. return nil
  99. }
  100. // TODO:处理消息数量支持实现以及最大处理能力逻辑
  101. func (r *RuleNodeActor) onRuleToSelfMsg(msg *ruleEngine.RuleToSelfMsg) error {
  102. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  103. actorMsg := msg.Message
  104. ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  105. if ruleNodeCount < 20 {
  106. if r.ruleNode.IsDebug {
  107. _ = r.SystemCtx.PersistDebugInput(r.tenantId,
  108. &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, "Self", nil)
  109. }
  110. if err := r.node.OnMessage(r.defaultContext, actorMsg); err != nil {
  111. r.defaultContext.TellError(actorMsg, errors.New("onRuleToSelfMsg error"))
  112. return err
  113. }
  114. } else {
  115. actorMsg.GetCallBack().OnFailure(errors.New(fmt.Sprintf("message is processed by more than %d rule nodes", ruleNodeCount)))
  116. }
  117. return nil
  118. }
  119. func (r *RuleNodeActor) onRuleChainToNodeMsg(msg *ruleEngine.RuleChainToRuleNodeMsg) error {
  120. server.Log.Debugf("Going to process rule msg:%s,%s", r.ruleChainName, r.ruleNode.Name)
  121. msg.Message.GetCallBack().OnProcessingStart(r.info)
  122. actorMsg := msg.Message
  123. // ruleNodeCount := actorMsg.GetAndIncrementRuleNodeCounter()
  124. if r.ruleNode.IsDebug {
  125. _ = r.SystemCtx.PersistDebugInput(r.tenantId, &entities.RuleNodeId{Id: r.ruleNode.RuleNodeId}, actorMsg, msg.FromRelationType, nil)
  126. }
  127. err := r.node.OnMessage(msg.Ctx, actorMsg)
  128. if err != nil {
  129. msg.Ctx.TellError(actorMsg, errors.New("onRuleChainToNodeMsg error"))
  130. return err
  131. }
  132. return nil
  133. }
  134. func (r *RuleNodeActor) Destroy() error {
  135. return nil
  136. }
  137. func (r *RuleNodeActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  138. if err != nil {
  139. return ruleEngine.Stop()
  140. } else {
  141. return ruleEngine.Resume()
  142. }
  143. }
  144. func (r *RuleNodeActor) start() error {
  145. node, err := r.initComponent(r.ruleNode)
  146. if err != nil {
  147. return err
  148. }
  149. if node != nil {
  150. r.state = ruleEngine.ACTIVE
  151. r.node = node
  152. }
  153. return nil
  154. }
  155. func (r *RuleNodeActor) update() error {
  156. node, err := r.SystemCtx.RuleChainService.FindRuleNodeById(r.tenantId, r.ruleNodeId)
  157. if err != nil {
  158. return err
  159. }
  160. if node == nil {
  161. return errors.New(fmt.Sprintf("node not found %s", r.ruleNodeId))
  162. }
  163. r.info = &protocol.RuleNodeInfo{
  164. RuleNodeId: node.RuleNodeId,
  165. RuleChainName: r.ruleChainName,
  166. RuleNodeName: node.Name,
  167. }
  168. r.ruleNode = node
  169. r.defaultContext.GetRuleNodeCtx().Self = node
  170. return r.start()
  171. }
  172. func (r *RuleNodeActor) stop() error {
  173. if r.node != nil {
  174. r.state = ruleEngine.SUSPENDED
  175. }
  176. return nil
  177. }
  178. type RuleNodeActorCreator struct {
  179. RuleNodeActor
  180. }
  181. func NewRuleNodeActorCreator(
  182. sysCtx *ruleEngine.SystemContext,
  183. tenantId string,
  184. ruleChainId string,
  185. ruleChainName string,
  186. ruleNodeId string,
  187. parent ruleEngine.Ref,
  188. ) *RuleNodeActorCreator {
  189. item := new(RuleNodeActorCreator)
  190. item.SystemCtx = sysCtx
  191. item.ruleNodeId = ruleNodeId
  192. item.ruleChainName = ruleChainName
  193. item.tenantId = tenantId
  194. item.ruleChainNodeId = ruleChainId
  195. item.parent = parent
  196. return item
  197. }
  198. func (r *RuleNodeActorCreator) CreateActorId() string {
  199. return r.ruleNodeId
  200. }
  201. func (r *RuleNodeActorCreator) CreateActor() ruleEngine.Actor {
  202. item := &RuleNodeActor{
  203. ruleChainName: r.ruleChainName,
  204. ruleChainNodeId: r.ruleChainNodeId,
  205. ruleNodeId: r.ruleNodeId,
  206. tenantId: r.tenantId,
  207. parent: r.parent,
  208. }
  209. item.SystemCtx = r.SystemCtx
  210. return item
  211. }