rule_chain_actor.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/ruleEngine"
  9. "sparrow/pkg/server"
  10. "strings"
  11. )
  12. type RuleChainActor struct {
  13. ruleEngine.ContextBasedCreator
  14. ruleChain *ruleEngine.RuleChain
  15. tenantId string
  16. firstId string
  17. firstNode *ruleEngine.RuleNodeCtx
  18. started bool
  19. ruleChainId string
  20. parent ruleEngine.Ref
  21. nodeActors map[string]*ruleEngine.RuleNodeCtx
  22. nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
  23. ruleChainName string
  24. clusterService ruleEngine.ClusterService
  25. state ruleEngine.ComponentLifecycleState
  26. }
  27. func newRuleChainActor(
  28. sysCtx *ruleEngine.SystemContext,
  29. ruleChain *ruleEngine.RuleChain,
  30. tenantId string,
  31. parent ruleEngine.Ref,
  32. ) *RuleChainActor {
  33. item := &RuleChainActor{
  34. ruleChainId: ruleChain.ChainId,
  35. ruleChain: ruleChain,
  36. tenantId: tenantId,
  37. parent: parent,
  38. nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
  39. nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
  40. clusterService: sysCtx.ClusterService,
  41. }
  42. item.SystemCtx = sysCtx
  43. return item
  44. }
  45. func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
  46. return r.Ctx
  47. }
  48. func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
  49. if r.ruleChain != nil {
  50. r.ruleChainName = r.ruleChain.Name
  51. }
  52. r.Ctx = ctx
  53. return r.start()
  54. }
  55. func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
  56. switch msg.GetMessageType() {
  57. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  58. return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
  59. case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  60. return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
  61. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  62. return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
  63. }
  64. return nil
  65. }
  66. func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
  67. if r.firstNode != nil {
  68. r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
  69. } else {
  70. msg.Message.GetCallBack().OnSuccess()
  71. }
  72. return nil
  73. }
  74. func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
  75. actorMsg := msg.Message
  76. server.Log.Debugf("Processing message")
  77. if len(msg.RelationTypes) == 0 {
  78. ruleNodeId := actorMsg.RuleNodeId
  79. var targetCtx *ruleEngine.RuleNodeCtx
  80. if ruleNodeId == "" {
  81. targetCtx = r.firstNode
  82. actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
  83. } else {
  84. targetCtx = r.nodeActors[ruleNodeId]
  85. }
  86. if targetCtx != nil {
  87. server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
  88. r.pushMsgToNode(targetCtx, actorMsg, "")
  89. } else {
  90. server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
  91. actorMsg.GetCallBack().OnSuccess()
  92. }
  93. } else {
  94. r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
  95. }
  96. return nil
  97. }
  98. func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
  99. var errStr string
  100. if msg.FailureMessage != nil {
  101. errStr = msg.FailureMessage.Error()
  102. }
  103. r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
  104. return nil
  105. }
  106. // on tell next actor
  107. func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
  108. relationTypes []string, errMsg string) {
  109. originatorId := msg.Originator
  110. tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
  111. var relations []*ruleEngine.RuleNodeRelation
  112. if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
  113. for _, item := range rs {
  114. if contains(relationTypes, item.Type) {
  115. relations = append(relations, item)
  116. }
  117. }
  118. }
  119. if len(relations) == 0 {
  120. server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
  121. if contains(relationTypes, "Failure") {
  122. if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
  123. msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
  124. "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
  125. } else {
  126. msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
  127. }
  128. } else {
  129. msg.GetCallBack().OnSuccess()
  130. }
  131. } else if len(relations) == 1 {
  132. for _, rl := range relations {
  133. server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
  134. r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
  135. }
  136. } else {
  137. for _, rl := range relations {
  138. target := rl.Out
  139. r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
  140. }
  141. }
  142. }
  143. // push a message to target ctx
  144. func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
  145. if tpi.MyPartition {
  146. switch entityId.GetEntityType() {
  147. case entities.RULE_NODE:
  148. targetCtx := r.nodeActors[entityId.GetId()]
  149. r.pushMsgToNode(targetCtx, msg, fromRelationType)
  150. case entities.RULE_CHAIN:
  151. r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
  152. TargetId: entityId.GetId(),
  153. SourceId: r.ruleChainId,
  154. Message: msg,
  155. FromRelationType: fromRelationType,
  156. })
  157. }
  158. } else {
  159. r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
  160. }
  161. }
  162. // 把消息放到队列中
  163. func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
  164. switch targetEntity.GetEntityType() {
  165. case entities.RULE_NODE:
  166. r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
  167. case entities.RULE_CHAIN:
  168. r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
  169. }
  170. }
  171. func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
  172. r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback)
  173. }
  174. func contains(relations []string, relation string) bool {
  175. if len(relations) == 0 {
  176. return false
  177. }
  178. for _, item := range relations {
  179. if strings.ToLower(item) == strings.ToLower(relation) {
  180. return true
  181. }
  182. }
  183. return false
  184. }
  185. // push a message to node actor
  186. func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
  187. if targetCtx != nil {
  188. targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
  189. Message: msg,
  190. Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
  191. FromRelationType: relationType,
  192. })
  193. } else {
  194. server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
  195. }
  196. }
  197. func (r *RuleChainActor) Destroy() error {
  198. return nil
  199. }
  200. func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  201. if err != nil {
  202. return ruleEngine.Stop()
  203. } else {
  204. return ruleEngine.Resume()
  205. }
  206. }
  207. func (r *RuleChainActor) start() error {
  208. if !r.started {
  209. ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
  210. if err != nil {
  211. return err
  212. }
  213. if ruleChain != nil {
  214. nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
  215. if err != nil {
  216. return err
  217. }
  218. server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
  219. for _, node := range nodes {
  220. server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
  221. ref, err := r.createNodeActor(node.RuleNodeId)
  222. if err != nil {
  223. continue
  224. }
  225. r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
  226. TenantId: r.tenantId,
  227. ChainActor: r.Ctx,
  228. SelfActor: ref,
  229. Self: node,
  230. }
  231. r.started = true
  232. }
  233. r.initRoutes(r.ruleChain, nodes)
  234. }
  235. } else {
  236. return r.update()
  237. }
  238. return nil
  239. }
  240. func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) {
  241. for _, node := range nodes {
  242. relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
  243. if err != nil {
  244. continue
  245. }
  246. var rs []*ruleEngine.RuleNodeRelation
  247. for _, relation := range relations {
  248. rs = append(rs, &ruleEngine.RuleNodeRelation{
  249. In: &entities.RuleNodeId{Id: node.RuleNodeId},
  250. Out: &entities.RuleNodeId{Id: relation.To},
  251. Type: relation.Type,
  252. })
  253. }
  254. r.nodeRoutes[node.RuleNodeId] = rs
  255. }
  256. fmt.Printf("%+v", r.nodeRoutes)
  257. r.firstId = ruleChain.FirstNodeId
  258. r.firstNode = r.nodeActors[r.firstId]
  259. r.state = ruleEngine.ACTIVE
  260. }
  261. func (r *RuleChainActor) update() error {
  262. return nil
  263. }
  264. func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
  265. return r.Ctx.GetOrCreateChildActor(nodeId,
  266. ruleEngine.RULE_DISPATCHER_NAME,
  267. NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
  268. r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
  269. }
  270. // RuleChainCreator
  271. type RuleChainCreator struct {
  272. RuleChainActor
  273. }
  274. //NewRuleChainCreator create a instance
  275. func NewRuleChainCreator(
  276. sysCtx *ruleEngine.SystemContext,
  277. tenantId string,
  278. ruleChan *ruleEngine.RuleChain,
  279. parent ruleEngine.Ref,
  280. ) *RuleChainCreator {
  281. item := &RuleChainCreator{}
  282. item.tenantId = tenantId
  283. item.ruleChain = ruleChan
  284. item.parent = parent
  285. item.SystemCtx = sysCtx
  286. return item
  287. }
  288. func (r *RuleChainCreator) CreateActorId() string {
  289. return r.ruleChain.ChainId
  290. }
  291. func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
  292. return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
  293. }