rule_chain_actor.go 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/ruleEngine"
  9. "sparrow/pkg/server"
  10. "strings"
  11. )
  12. type RuleChainActor struct {
  13. ruleEngine.ContextBasedCreator
  14. ruleChain *ruleEngine.RuleChain
  15. tenantId string
  16. firstId string
  17. firstNode *ruleEngine.RuleNodeCtx
  18. started bool
  19. ruleChainId string
  20. parent ruleEngine.Ref
  21. nodeActors map[string]*ruleEngine.RuleNodeCtx
  22. nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
  23. ruleChainName string
  24. clusterService ruleEngine.ClusterService
  25. state ruleEngine.ComponentLifecycleState
  26. }
  27. func newRuleChainActor(
  28. sysCtx *ruleEngine.SystemContext,
  29. ruleChain *ruleEngine.RuleChain,
  30. tenantId string,
  31. parent ruleEngine.Ref,
  32. ) *RuleChainActor {
  33. item := &RuleChainActor{
  34. ruleChainId: ruleChain.ChainId,
  35. ruleChain: ruleChain,
  36. tenantId: tenantId,
  37. parent: parent,
  38. nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
  39. nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
  40. clusterService: sysCtx.ClusterService,
  41. }
  42. item.SystemCtx = sysCtx
  43. return item
  44. }
  45. func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
  46. return r.Ctx
  47. }
  48. func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
  49. if r.ruleChain != nil {
  50. r.ruleChainName = r.ruleChain.Name
  51. }
  52. r.Ctx = ctx
  53. return r.start()
  54. }
  55. func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
  56. switch msg.GetMessageType() {
  57. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  58. return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
  59. case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  60. return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
  61. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  62. return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
  63. }
  64. return nil
  65. }
  66. func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
  67. if r.firstNode != nil {
  68. r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
  69. } else {
  70. msg.Message.GetCallBack().OnSuccess()
  71. }
  72. return nil
  73. }
  74. func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
  75. actorMsg := msg.Message
  76. server.Log.Debugf("Processing message")
  77. if len(msg.RelationTypes) == 0 {
  78. ruleNodeId := actorMsg.RuleNodeId
  79. var targetCtx *ruleEngine.RuleNodeCtx
  80. if ruleNodeId == "" {
  81. targetCtx = r.firstNode
  82. actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
  83. } else {
  84. targetCtx = r.nodeActors[ruleNodeId]
  85. }
  86. if targetCtx != nil {
  87. server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
  88. r.pushMsgToNode(targetCtx, actorMsg, "")
  89. } else {
  90. server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
  91. actorMsg.GetCallBack().OnSuccess()
  92. }
  93. } else {
  94. r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
  95. }
  96. return nil
  97. }
  98. func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
  99. var errStr string
  100. if msg.FailureMessage != nil {
  101. errStr = msg.FailureMessage.Error()
  102. }
  103. r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
  104. return nil
  105. }
  106. // on tell next actor
  107. func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
  108. relationTypes []string, errMsg string) {
  109. originatorId := msg.Originator
  110. tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
  111. var relations []*ruleEngine.RuleNodeRelation
  112. if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
  113. for _, item := range rs {
  114. if contains(relationTypes, item.Type) {
  115. relations = append(relations, item)
  116. }
  117. }
  118. }
  119. if len(relations) == 0 {
  120. server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
  121. if contains(relationTypes, string(protocol.Failure)) {
  122. if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
  123. msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
  124. "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
  125. } else {
  126. msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
  127. }
  128. } else {
  129. msg.GetCallBack().OnSuccess()
  130. }
  131. } else if len(relations) == 1 {
  132. for _, rl := range relations {
  133. server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
  134. r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
  135. }
  136. } else {
  137. for _, rl := range relations {
  138. target := rl.Out
  139. r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
  140. }
  141. }
  142. }
  143. // push a message to target ctx
  144. func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
  145. if !tpi.MyPartition {
  146. switch entityId.GetEntityType() {
  147. case entities.RULE_NODE:
  148. targetCtx := r.nodeActors[entityId.GetId()]
  149. r.pushMsgToNode(targetCtx, msg, fromRelationType)
  150. case entities.RULE_CHAIN:
  151. r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
  152. TargetId: entityId.GetId(),
  153. SourceId: r.ruleChainId,
  154. Message: msg,
  155. FromRelationType: fromRelationType,
  156. })
  157. }
  158. } else {
  159. r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
  160. }
  161. }
  162. // 把消息放到队列中
  163. func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
  164. switch targetEntity.GetEntityType() {
  165. case entities.RULE_NODE:
  166. r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
  167. case entities.RULE_CHAIN:
  168. r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
  169. }
  170. }
  171. func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
  172. msgBytes, err := msg.Encode()
  173. if err != nil {
  174. server.Log.Error(err)
  175. }
  176. fmt.Printf("%v", tpi)
  177. r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
  178. }
  179. func contains(relations []string, relation string) bool {
  180. if len(relations) == 0 {
  181. return true
  182. }
  183. for _, item := range relations {
  184. if strings.ToLower(item) == strings.ToLower(relation) {
  185. return true
  186. }
  187. }
  188. return false
  189. }
  190. // push a message to node actor
  191. func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
  192. if targetCtx != nil {
  193. targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
  194. Message: msg,
  195. Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
  196. FromRelationType: relationType,
  197. })
  198. } else {
  199. server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
  200. }
  201. }
  202. func (r *RuleChainActor) Destroy() error {
  203. return nil
  204. }
  205. func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  206. if err != nil {
  207. return ruleEngine.Stop()
  208. } else {
  209. return ruleEngine.Resume()
  210. }
  211. }
  212. func (r *RuleChainActor) start() error {
  213. if !r.started {
  214. ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
  215. if err != nil {
  216. return err
  217. }
  218. if ruleChain != nil {
  219. nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
  220. if err != nil {
  221. return err
  222. }
  223. server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
  224. for _, node := range nodes {
  225. server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
  226. ref, err := r.createNodeActor(node.RuleNodeId)
  227. if err != nil {
  228. continue
  229. }
  230. r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
  231. TenantId: r.tenantId,
  232. ChainActor: r.Ctx,
  233. SelfActor: ref,
  234. Self: node,
  235. }
  236. r.started = true
  237. }
  238. r.initRoutes(r.ruleChain, nodes)
  239. }
  240. } else {
  241. return r.update()
  242. }
  243. return nil
  244. }
  245. func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) {
  246. for _, node := range nodes {
  247. relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
  248. if err != nil {
  249. continue
  250. }
  251. var rs []*ruleEngine.RuleNodeRelation
  252. for _, relation := range relations {
  253. rs = append(rs, &ruleEngine.RuleNodeRelation{
  254. In: &entities.RuleNodeId{Id: node.RuleNodeId},
  255. Out: &entities.RuleNodeId{Id: relation.To},
  256. Type: relation.Type,
  257. })
  258. }
  259. r.nodeRoutes[node.RuleNodeId] = rs
  260. }
  261. fmt.Printf("%+v", r.nodeRoutes)
  262. r.firstId = ruleChain.FirstNodeId
  263. r.firstNode = r.nodeActors[r.firstId]
  264. r.state = ruleEngine.ACTIVE
  265. }
  266. func (r *RuleChainActor) update() error {
  267. return nil
  268. }
  269. func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
  270. return r.Ctx.GetOrCreateChildActor(nodeId,
  271. ruleEngine.RULE_DISPATCHER_NAME,
  272. NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
  273. r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
  274. }
  275. // RuleChainCreator
  276. type RuleChainCreator struct {
  277. RuleChainActor
  278. }
  279. //NewRuleChainCreator create a instance
  280. func NewRuleChainCreator(
  281. sysCtx *ruleEngine.SystemContext,
  282. tenantId string,
  283. ruleChan *ruleEngine.RuleChain,
  284. parent ruleEngine.Ref,
  285. ) *RuleChainCreator {
  286. item := &RuleChainCreator{}
  287. item.tenantId = tenantId
  288. item.ruleChain = ruleChan
  289. item.parent = parent
  290. item.SystemCtx = sysCtx
  291. return item
  292. }
  293. func (r *RuleChainCreator) CreateActorId() string {
  294. return r.ruleChain.ChainId
  295. }
  296. func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
  297. return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
  298. }