rule_chain_actor.go 9.5 KB


  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/ruleEngine"
  9. "sparrow/pkg/server"
  10. "strings"
  11. )
  12. type RuleChainActor struct {
  13. ruleEngine.ContextBasedCreator
  14. ruleChain *ruleEngine.RuleChain
  15. tenantId string
  16. firstId string
  17. firstNode *ruleEngine.RuleNodeCtx
  18. started bool
  19. ruleChainId string
  20. parent ruleEngine.Ref
  21. nodeActors map[string]*ruleEngine.RuleNodeCtx
  22. nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
  23. ruleChainName string
  24. clusterService ruleEngine.ClusterService
  25. state ruleEngine.ComponentLifecycleState
  26. }
  27. func newRuleChainActor(
  28. sysCtx *ruleEngine.SystemContext,
  29. ruleChain *ruleEngine.RuleChain,
  30. tenantId string,
  31. parent ruleEngine.Ref,
  32. ) *RuleChainActor {
  33. item := &RuleChainActor{
  34. ruleChainId: ruleChain.ChainId,
  35. ruleChain: ruleChain,
  36. tenantId: tenantId,
  37. parent: parent,
  38. nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
  39. nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
  40. clusterService: sysCtx.ClusterService,
  41. }
  42. item.SystemCtx = sysCtx
  43. return item
  44. }
  45. func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
  46. return r.Ctx
  47. }
  48. func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
  49. if r.ruleChain != nil {
  50. r.ruleChainName = r.ruleChain.Name
  51. }
  52. r.Ctx = ctx
  53. return r.start()
  54. }
  55. func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
  56. switch msg.GetMessageType() {
  57. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  58. return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
  59. case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  60. return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
  61. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  62. return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
  63. }
  64. return nil
  65. }
  66. func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
  67. if r.firstNode != nil {
  68. r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
  69. } else {
  70. msg.Message.GetCallBack().OnSuccess()
  71. }
  72. return nil
  73. }
  74. func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
  75. actorMsg := msg.Message
  76. server.Log.Debugf("Processing message")
  77. if len(msg.RelationTypes) == 0 {
  78. ruleNodeId := actorMsg.RuleNodeId
  79. var targetCtx *ruleEngine.RuleNodeCtx
  80. if ruleNodeId == "" {
  81. targetCtx = r.firstNode
  82. actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
  83. } else {
  84. targetCtx = r.nodeActors[ruleNodeId]
  85. }
  86. if targetCtx != nil {
  87. server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
  88. r.pushMsgToNode(targetCtx, actorMsg, "")
  89. } else {
  90. server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
  91. actorMsg.GetCallBack().OnSuccess()
  92. }
  93. } else {
  94. r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
  95. }
  96. return nil
  97. }
  98. func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
  99. var errStr string
  100. if msg.FailureMessage != nil {
  101. errStr = msg.FailureMessage.Error()
  102. }
  103. r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
  104. return nil
  105. }
  106. // on tell next actor
  107. func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
  108. relationTypes []string, errMsg string) {
  109. originatorId := msg.Originator
  110. tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
  111. var relations []*ruleEngine.RuleNodeRelation
  112. if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
  113. for _, item := range rs {
  114. if contains(relationTypes, item.Type) {
  115. relations = append(relations, item)
  116. }
  117. }
  118. }
  119. fmt.Printf("+++++++++nodeid:%s, creator:%s, %+v, types:%v", originatorNodeId, originatorId, relations, relationTypes)
  120. if len(relations) == 0 {
  121. server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
  122. if contains(relationTypes, "Failure") {
  123. if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
  124. msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
  125. "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
  126. } else {
  127. msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
  128. }
  129. } else {
  130. msg.GetCallBack().OnSuccess()
  131. }
  132. } else if len(relations) == 1 {
  133. for _, rl := range relations {
  134. server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
  135. r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
  136. }
  137. } else {
  138. for _, rl := range relations {
  139. target := rl.Out
  140. r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
  141. }
  142. }
  143. }
  144. // push a message to target ctx
  145. func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
  146. if tpi.MyPartition {
  147. switch entityId.GetEntityType() {
  148. case entities.RULE_NODE:
  149. targetCtx := r.nodeActors[entityId.GetId()]
  150. r.pushMsgToNode(targetCtx, msg, fromRelationType)
  151. case entities.RULE_CHAIN:
  152. r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
  153. TargetId: entityId.GetId(),
  154. SourceId: r.ruleChainId,
  155. Message: msg,
  156. FromRelationType: fromRelationType,
  157. })
  158. }
  159. } else {
  160. r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
  161. }
  162. }
  163. // 把消息放到队列中
  164. func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
  165. switch targetEntity.GetEntityType() {
  166. case entities.RULE_NODE:
  167. r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
  168. case entities.RULE_CHAIN:
  169. r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
  170. }
  171. }
  172. func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
  173. r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback)
  174. }
  175. func contains(relations []string, relation string) bool {
  176. if len(relations) == 0 {
  177. return false
  178. }
  179. for _, item := range relations {
  180. if strings.ToLower(item) == strings.ToLower(relation) {
  181. return true
  182. }
  183. }
  184. return false
  185. }
  186. // push a message to node actor
  187. func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
  188. if targetCtx != nil {
  189. targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
  190. Message: msg,
  191. Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
  192. FromRelationType: relationType,
  193. })
  194. } else {
  195. server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
  196. }
  197. }
  198. func (r *RuleChainActor) Destroy() error {
  199. return nil
  200. }
  201. func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  202. if err != nil {
  203. return ruleEngine.Stop()
  204. } else {
  205. return ruleEngine.Resume()
  206. }
  207. }
  208. func (r *RuleChainActor) start() error {
  209. if !r.started {
  210. ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
  211. if err != nil {
  212. return err
  213. }
  214. if ruleChain != nil {
  215. nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
  216. if err != nil {
  217. return err
  218. }
  219. server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
  220. for _, node := range nodes {
  221. server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
  222. ref, err := r.createNodeActor(node.RuleNodeId)
  223. if err != nil {
  224. continue
  225. }
  226. r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
  227. TenantId: r.tenantId,
  228. ChainActor: r.Ctx,
  229. SelfActor: ref,
  230. Self: node,
  231. }
  232. r.started = true
  233. }
  234. r.initRoutes(r.ruleChain, nodes)
  235. }
  236. } else {
  237. return r.update()
  238. }
  239. return nil
  240. }
  241. func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) {
  242. for _, node := range nodes {
  243. relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
  244. if err != nil {
  245. continue
  246. }
  247. var rs []*ruleEngine.RuleNodeRelation
  248. for _, relation := range relations {
  249. rs = append(rs, &ruleEngine.RuleNodeRelation{
  250. In: &entities.RuleNodeId{Id: node.RuleNodeId},
  251. Out: &entities.RuleNodeId{Id: relation.To},
  252. Type: relation.Type,
  253. })
  254. }
  255. r.nodeRoutes[node.RuleNodeId] = rs
  256. }
  257. fmt.Printf("%+v", r.nodeRoutes)
  258. r.firstId = ruleChain.FirstNodeId
  259. r.firstNode = r.nodeActors[r.firstId]
  260. r.state = ruleEngine.ACTIVE
  261. }
  262. func (r *RuleChainActor) update() error {
  263. return nil
  264. }
  265. func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
  266. return r.Ctx.GetOrCreateChildActor(nodeId,
  267. ruleEngine.RULE_DISPATCHER_NAME,
  268. NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
  269. r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
  270. }
  271. // RuleChainCreator
  272. type RuleChainCreator struct {
  273. RuleChainActor
  274. }
  275. //NewRuleChainCreator create a instance
  276. func NewRuleChainCreator(
  277. sysCtx *ruleEngine.SystemContext,
  278. tenantId string,
  279. ruleChan *ruleEngine.RuleChain,
  280. parent ruleEngine.Ref,
  281. ) *RuleChainCreator {
  282. item := &RuleChainCreator{}
  283. item.tenantId = tenantId
  284. item.ruleChain = ruleChan
  285. item.parent = parent
  286. item.SystemCtx = sysCtx
  287. return item
  288. }
  289. func (r *RuleChainCreator) CreateActorId() string {
  290. return r.ruleChain.ChainId
  291. }
  292. func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
  293. return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
  294. }