rule_chain_actor.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/entities"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/ruleEngine"
  9. "sparrow/pkg/server"
  10. "strings"
  11. )
  12. type RuleChainActor struct {
  13. ruleEngine.ContextBasedCreator
  14. ruleChain *ruleEngine.RuleChain
  15. tenantId string
  16. firstId string
  17. firstNode *ruleEngine.RuleNodeCtx
  18. started bool
  19. ruleChainId string
  20. parent ruleEngine.Ref
  21. nodeActors map[string]*ruleEngine.RuleNodeCtx
  22. nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
  23. ruleChainName string
  24. clusterService ruleEngine.ClusterService
  25. state ruleEngine.ComponentLifecycleState
  26. }
  27. func newRuleChainActor(
  28. sysCtx *ruleEngine.SystemContext,
  29. ruleChain *ruleEngine.RuleChain,
  30. tenantId string,
  31. parent ruleEngine.Ref,
  32. ) *RuleChainActor {
  33. item := &RuleChainActor{
  34. ruleChainId: ruleChain.ChainId,
  35. ruleChain: ruleChain,
  36. tenantId: tenantId,
  37. parent: parent,
  38. nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
  39. nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
  40. clusterService: sysCtx.ClusterService,
  41. }
  42. item.SystemCtx = sysCtx
  43. return item
  44. }
  45. func (r *RuleChainActor) GetActorRef() ruleEngine.Ref {
  46. return r.Ctx
  47. }
  48. func (r *RuleChainActor) Init(ctx ruleEngine.Ctx) error {
  49. if r.ruleChain != nil {
  50. r.ruleChainName = r.ruleChain.Name
  51. }
  52. r.Ctx = ctx
  53. return r.start()
  54. }
  55. func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
  56. switch msg.GetMessageType() {
  57. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  58. return r.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
  59. case protocol.RULE_TO_RULE_CHAIN_TELL_NEXT_MSG:
  60. return r.onTellNextRuleNode(msg.(*ruleEngine.RuleNodeToRuleChanTellNextMsg))
  61. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  62. return r.onRuleChainToRuleChain(msg.(*ruleEngine.RuleChainToRuleChainMsg))
  63. case protocol.COMPONENT_LIFE_CYCLE_MSG:
  64. return r.onComponentLifeCycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
  65. }
  66. return nil
  67. }
  68. // TODO:生命周期事件写入数据库
  69. func (r *RuleChainActor) onComponentLifeCycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
  70. server.Log.Debugf("%s,%s,%s onComponentLifecycleMsg", msg.TenantId, msg.EntityId.GetEntityType(), msg.EntityId.GetId())
  71. switch msg.EventType {
  72. case ruleEngine.CREATED:
  73. return r.start()
  74. case ruleEngine.UPDATED:
  75. return r.update()
  76. case ruleEngine.ACTIVATED:
  77. return r.restart()
  78. case ruleEngine.DELETED:
  79. _ = r.stop()
  80. return r.Ctx.Stop(r.ruleChainId)
  81. default:
  82. }
  83. return nil
  84. }
  85. func (r *RuleChainActor) onRuleChainToRuleChain(msg *ruleEngine.RuleChainToRuleChainMsg) error {
  86. if r.firstNode != nil {
  87. r.pushMsgToNode(r.firstNode, msg.Message, msg.FromRelationType)
  88. } else {
  89. msg.Message.GetCallBack().OnSuccess()
  90. }
  91. return nil
  92. }
  93. func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
  94. actorMsg := msg.Message
  95. server.Log.Debugf("Processing message")
  96. if len(msg.RelationTypes) == 0 {
  97. ruleNodeId := actorMsg.RuleNodeId
  98. var targetCtx *ruleEngine.RuleNodeCtx
  99. if ruleNodeId == "" {
  100. targetCtx = r.firstNode
  101. actorMsg = actorMsg.CopyWithRuleChainId(r.ruleChainId)
  102. } else {
  103. targetCtx = r.nodeActors[ruleNodeId]
  104. }
  105. if targetCtx != nil {
  106. server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, targetCtx.Self.RuleNodeId)
  107. r.pushMsgToNode(targetCtx, actorMsg, "")
  108. } else {
  109. server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
  110. actorMsg.GetCallBack().OnSuccess()
  111. }
  112. } else {
  113. r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
  114. }
  115. return nil
  116. }
  117. func (r *RuleChainActor) onTellNextRuleNode(msg *ruleEngine.RuleNodeToRuleChanTellNextMsg) error {
  118. var errStr string
  119. if msg.FailureMessage != nil {
  120. errStr = msg.FailureMessage.Error()
  121. }
  122. r.onTellNext(msg.Message, msg.RuleNodeId, msg.RelationTypes.ToStrArray(), errStr)
  123. return nil
  124. }
  125. // on tell next actor
  126. func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
  127. relationTypes []string, errMsg string) {
  128. originatorId := msg.Originator
  129. tpi := queue.ResolvePartition(ruleEngine.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
  130. var relations []*ruleEngine.RuleNodeRelation
  131. if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
  132. for _, item := range rs {
  133. if contains(relationTypes, item.Type) {
  134. relations = append(relations, item)
  135. }
  136. }
  137. }
  138. if len(relations) == 0 {
  139. server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
  140. if contains(relationTypes, "Failure") {
  141. if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
  142. msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
  143. "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
  144. } else {
  145. msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
  146. }
  147. } else {
  148. msg.GetCallBack().OnSuccess()
  149. }
  150. } else if len(relations) == 1 {
  151. for _, rl := range relations {
  152. server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
  153. r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
  154. }
  155. } else {
  156. for _, rl := range relations {
  157. target := rl.Out
  158. r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
  159. }
  160. }
  161. }
  162. // push a message to target ctx
  163. func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
  164. if tpi.MyPartition {
  165. switch entityId.GetEntityType() {
  166. case entities.RULE_NODE:
  167. targetCtx := r.nodeActors[entityId.GetId()]
  168. r.pushMsgToNode(targetCtx, msg, fromRelationType)
  169. case entities.RULE_CHAIN:
  170. r.parent.Tell(&ruleEngine.RuleChainToRuleChainMsg{
  171. TargetId: entityId.GetId(),
  172. SourceId: r.ruleChainId,
  173. Message: msg,
  174. FromRelationType: fromRelationType,
  175. })
  176. }
  177. } else {
  178. r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
  179. }
  180. }
  181. // 把消息放到队列中
  182. func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
  183. switch targetEntity.GetEntityType() {
  184. case entities.RULE_NODE:
  185. r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
  186. case entities.RULE_CHAIN:
  187. r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
  188. }
  189. }
  190. func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
  191. r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msg, queueCallback)
  192. }
  193. func contains(relations []string, relation string) bool {
  194. if len(relations) == 0 {
  195. return false
  196. }
  197. for _, item := range relations {
  198. if strings.ToLower(item) == strings.ToLower(relation) {
  199. return true
  200. }
  201. }
  202. return false
  203. }
  204. // push a message to node actor
  205. func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
  206. if targetCtx != nil {
  207. targetCtx.SelfActor.Tell(&ruleEngine.RuleChainToRuleNodeMsg{
  208. Message: msg,
  209. Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
  210. FromRelationType: relationType,
  211. })
  212. } else {
  213. server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
  214. }
  215. }
  216. func (r *RuleChainActor) Destroy() error {
  217. return nil
  218. }
  219. func (r *RuleChainActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  220. if err != nil {
  221. return ruleEngine.Stop()
  222. } else {
  223. return ruleEngine.Resume()
  224. }
  225. }
  226. func (r *RuleChainActor) start() error {
  227. if !r.started {
  228. ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
  229. if err != nil {
  230. return err
  231. }
  232. if ruleChain != nil {
  233. nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, ruleChain.ChainId)
  234. if err != nil {
  235. return err
  236. }
  237. server.Log.Debugf("starting rule chain with %d nodes", len(nodes))
  238. for _, node := range nodes {
  239. server.Log.Debugf("creating rule node actor:%s,%s", node.RuleNodeId, node.Name)
  240. ref, err := r.createNodeActor(node.RuleNodeId)
  241. if err != nil {
  242. continue
  243. }
  244. r.nodeActors[node.RuleNodeId] = &ruleEngine.RuleNodeCtx{
  245. TenantId: r.tenantId,
  246. ChainActor: r.Ctx,
  247. SelfActor: ref,
  248. Self: node,
  249. }
  250. r.started = true
  251. }
  252. return r.initRoutes(r.ruleChain, nodes)
  253. }
  254. } else {
  255. return r.update()
  256. }
  257. return nil
  258. }
  259. func (r *RuleChainActor) update() error {
  260. ruleChain, err := r.SystemCtx.RuleChainService.FindRuleChainById(r.tenantId, r.ruleChainId)
  261. if err != nil {
  262. return err
  263. }
  264. if ruleChain == nil {
  265. return errors.New(fmt.Sprintf("rule chain not found :%s", r.ruleChainId))
  266. }
  267. // 查询链上的所有节点
  268. nodes, err := r.SystemCtx.RuleChainService.GetRuleChainNodes(r.tenantId, r.ruleChainId)
  269. if err != nil {
  270. return err
  271. }
  272. for _, v := range nodes {
  273. // 如果找不到节点actor,则创建
  274. if actor, ok := r.nodeActors[v.RuleNodeId]; !ok {
  275. server.Log.Debugf("creating rule node actor:%s", v.RuleNodeId)
  276. ref, err := r.createNodeActor(v.RuleNodeId)
  277. if err != nil {
  278. return err
  279. }
  280. r.nodeActors[v.RuleNodeId] = &ruleEngine.RuleNodeCtx{
  281. TenantId: r.tenantId,
  282. ChainActor: r.Ctx,
  283. SelfActor: ref,
  284. Self: v,
  285. }
  286. } else {
  287. // 传递消息到node actor
  288. server.Log.Debugf("updating rule node actor:%s", v.RuleNodeId)
  289. actor.Self = v
  290. actor.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
  291. TenantId: r.tenantId,
  292. EntityId: &entities.RuleNodeId{Id: v.RuleNodeId},
  293. EventType: ruleEngine.UPDATED,
  294. })
  295. }
  296. }
  297. var removeNodes []string
  298. // 对比已经有节点和最新节点列表,找出差集,并移除
  299. for k := range r.nodeActors {
  300. var found = false
  301. for _, v := range nodes {
  302. if v.RuleNodeId == k {
  303. found = true
  304. break
  305. }
  306. }
  307. if !found {
  308. removeNodes = append(removeNodes, k)
  309. }
  310. }
  311. // remove actors
  312. for _, v := range removeNodes {
  313. server.Log.Debugf("remove rule node :%s", v)
  314. if ref, ok := r.nodeActors[v]; ok {
  315. ref.SelfActor.TellWithHighPriority(&ruleEngine.ComponentLifecycleMsg{
  316. TenantId: r.tenantId,
  317. EntityId: &entities.RuleNodeId{Id: v},
  318. EventType: ruleEngine.DELETED,
  319. })
  320. delete(r.nodeActors, v)
  321. }
  322. }
  323. return r.initRoutes(r.ruleChain, nodes)
  324. }
  325. func (r *RuleChainActor) restart() error {
  326. if err := r.stop(); err != nil {
  327. return err
  328. }
  329. if err := r.start(); err != nil {
  330. return err
  331. }
  332. return nil
  333. }
  334. func (r *RuleChainActor) stop() error {
  335. server.Log.Debugf("stopping rule chain with %d nodes, tenantId:%s, entityId:%s", len(r.nodeActors), r.tenantId, r.ruleChainId)
  336. for actorId := range r.nodeActors {
  337. err := r.Ctx.Stop(actorId)
  338. if err != nil {
  339. return err
  340. }
  341. }
  342. r.started = false
  343. return nil
  344. }
  345. func (r *RuleChainActor) initRoutes(ruleChain *ruleEngine.RuleChain, nodes []*ruleEngine.RuleNode) error {
  346. for _, node := range nodes {
  347. relations, err := r.SystemCtx.RuleChainService.GetRuleNodeRelations(r.tenantId, node.RuleNodeId)
  348. if err != nil {
  349. return err
  350. }
  351. var rs []*ruleEngine.RuleNodeRelation
  352. for _, relation := range relations {
  353. rs = append(rs, &ruleEngine.RuleNodeRelation{
  354. In: &entities.RuleNodeId{Id: node.RuleNodeId},
  355. Out: &entities.RuleNodeId{Id: relation.To},
  356. Type: relation.Type,
  357. })
  358. }
  359. r.nodeRoutes[node.RuleNodeId] = rs
  360. }
  361. r.firstId = ruleChain.FirstNodeId
  362. r.firstNode = r.nodeActors[r.firstId]
  363. r.state = ruleEngine.ACTIVE
  364. return nil
  365. }
  366. func (r *RuleChainActor) createNodeActor(nodeId string) (ruleEngine.Ref, error) {
  367. return r.Ctx.GetOrCreateChildActor(nodeId,
  368. ruleEngine.RULE_DISPATCHER_NAME,
  369. NewRuleNodeActorCreator(r.SystemCtx, r.tenantId,
  370. r.ruleChainId, r.ruleChainName, nodeId, r.Ctx.GetParentRef()))
  371. }
  372. // RuleChainCreator
  373. type RuleChainCreator struct {
  374. RuleChainActor
  375. }
  376. //NewRuleChainCreator create a instance
  377. func NewRuleChainCreator(
  378. sysCtx *ruleEngine.SystemContext,
  379. tenantId string,
  380. ruleChan *ruleEngine.RuleChain,
  381. parent ruleEngine.Ref,
  382. ) *RuleChainCreator {
  383. item := &RuleChainCreator{}
  384. item.tenantId = tenantId
  385. item.ruleChain = ruleChan
  386. item.parent = parent
  387. item.SystemCtx = sysCtx
  388. return item
  389. }
  390. func (r *RuleChainCreator) CreateActorId() string {
  391. return r.ruleChain.ChainId
  392. }
  393. func (r *RuleChainCreator) CreateActor() ruleEngine.Actor {
  394. return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
  395. }