rule_chain_actor.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package actors
  2. import (
  3. "errors"
  4. "fmt"
  5. "sparrow/pkg/actor"
  6. "sparrow/pkg/entities"
  7. "sparrow/pkg/protocol"
  8. "sparrow/pkg/queue"
  9. "sparrow/pkg/ruleEngine"
  10. "sparrow/pkg/server"
  11. "strings"
  12. )
  13. var ruleNodes = map[string]*ruleEngine.RuleNode{
  14. "1": {
  15. RuleNodeId: "1",
  16. RuleChainId: "11",
  17. Type: "MsgTypeFilterNode",
  18. Name: "simple node",
  19. IsDebug: true,
  20. Config: "",
  21. },
  22. }
  23. type RuleChainActor struct {
  24. actor.ContextBasedCreator
  25. ruleChain *ruleEngine.RuleChain
  26. tenantId string
  27. firstId string
  28. firstNode *ruleEngine.RuleNodeCtx
  29. started bool
  30. ruleChainId string
  31. parent actor.Ref
  32. nodeActors map[string]*ruleEngine.RuleNodeCtx
  33. nodeRoutes map[string][]*ruleEngine.RuleNodeRelation
  34. ruleChainName string
  35. clusterService queue.ClusterService
  36. }
  37. func newRuleChainActor(
  38. sysCtx *actor.SystemContext,
  39. ruleChain *ruleEngine.RuleChain,
  40. tenantId string,
  41. parent actor.Ref,
  42. ) *RuleChainActor {
  43. item := &RuleChainActor{
  44. ruleChainId: ruleChain.ChainId,
  45. ruleChain: ruleChain,
  46. tenantId: tenantId,
  47. parent: parent,
  48. nodeActors: make(map[string]*ruleEngine.RuleNodeCtx),
  49. nodeRoutes: make(map[string][]*ruleEngine.RuleNodeRelation),
  50. clusterService: sysCtx.ClusterService,
  51. }
  52. item.SystemCtx = sysCtx
  53. return item
  54. }
  55. func (r *RuleChainActor) GetActorRef() actor.Ref {
  56. return r.Ctx
  57. }
  58. func (r *RuleChainActor) Init(ctx actor.Ctx) error {
  59. if r.ruleChain != nil {
  60. r.ruleChainName = r.ruleChain.Name
  61. }
  62. r.Ctx = ctx
  63. return nil
  64. }
  65. func (r *RuleChainActor) Process(msg protocol.ActorMsg) error {
  66. switch msg.GetMessageType() {
  67. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  68. return r.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg))
  69. }
  70. return nil
  71. }
  72. func (r *RuleChainActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error {
  73. actorMsg := msg.Message
  74. server.Log.Debugf("Processing message")
  75. if len(msg.RelationTypes) == 0 {
  76. ruleNodeId := actorMsg.RuleNodeId
  77. var targetCtx *ruleEngine.RuleNodeCtx
  78. if ruleNodeId == "" {
  79. targetCtx = r.firstNode
  80. } else {
  81. targetCtx = r.nodeActors[ruleNodeId]
  82. }
  83. if targetCtx != nil {
  84. server.Log.Debugf("pushing message to target rule node,%s, %s", r.ruleChainId, ruleNodeId)
  85. r.pushMsgToNode(targetCtx, actorMsg, "")
  86. } else {
  87. server.Log.Debugf("Rule node dose not exist. probably old message,%s, %s", r.ruleChainId, ruleNodeId)
  88. actorMsg.GetCallBack().OnSuccess()
  89. }
  90. } else {
  91. r.onTellNext(actorMsg, actorMsg.RuleNodeId, msg.RelationTypes, msg.FailureMessage.Error())
  92. }
  93. return nil
  94. }
  95. // on tell next actor
  96. func (r *RuleChainActor) onTellNext(msg *protocol.Message, originatorNodeId string,
  97. relationTypes []string, errMsg string) {
  98. originatorId := msg.Originator
  99. tpi := queue.ResolvePartition(queue.RULE_ENGINE, msg.GetQueueName(), r.tenantId, originatorId)
  100. var relations []*ruleEngine.RuleNodeRelation
  101. if rs, ok := r.nodeRoutes[originatorNodeId]; ok {
  102. for _, item := range rs {
  103. if contains(relationTypes, item.Type) {
  104. relations = append(relations, item)
  105. }
  106. }
  107. }
  108. if len(relations) == 0 {
  109. server.Log.Debugf("No outbound relations to process,%s, %s", originatorId, r.tenantId)
  110. if contains(relationTypes, string(protocol.Failure)) {
  111. if ruleNodeCtx, ok := r.nodeActors[originatorNodeId]; ok {
  112. msg.GetCallBack().OnFailure(errors.New(fmt.Sprintf(""+
  113. "[%s], ruleChainName:%s, ruleNodeId:%s", errMsg, r.ruleChainName, ruleNodeCtx.Self.RuleNodeId)))
  114. } else {
  115. msg.GetCallBack().OnFailure(errors.New("failure during message processing by Rule Node"))
  116. }
  117. } else {
  118. msg.GetCallBack().OnSuccess()
  119. }
  120. } else if len(relations) == 1 {
  121. for _, rl := range relations {
  122. server.Log.Debugf("push message to single target,%s, %s, %s, %s", r.tenantId, originatorId, msg.Id, rl.Out)
  123. r.pushMsgToTarget(tpi, msg, rl.Out, rl.Type)
  124. }
  125. } else {
  126. for _, rl := range relations {
  127. target := rl.Out
  128. r.putToQueue(tpi, msg, queue.NewMultipleMsgCallbackWrapper(int32(len(relations)), msg.GetCallBack()), target)
  129. }
  130. }
  131. }
  132. // push a message to target ctx
  133. func (r *RuleChainActor) pushMsgToTarget(tpi *queue.TopicPartitionInfo, msg *protocol.Message, entityId entities.EntityId, fromRelationType string) {
  134. if tpi.MyPartition {
  135. switch entityId.GetEntityType() {
  136. case entities.RULE_NODE:
  137. targetCtx := r.nodeActors[entityId.GetId()]
  138. r.pushMsgToNode(targetCtx, msg, fromRelationType)
  139. case entities.RULE_CHAIN:
  140. r.parent.Tell(&actor.RuleChainToRuleChainMsg{
  141. TargetId: entityId.GetId(),
  142. SourceId: r.ruleChainId,
  143. Message: msg,
  144. FromRelationType: fromRelationType,
  145. })
  146. }
  147. } else {
  148. r.putToQueue(tpi, msg, queue.NewMsgCallbackWrapper(msg.GetCallBack()), entityId)
  149. }
  150. }
  151. // 把消息放到队列中
  152. func (r *RuleChainActor) putToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback, targetEntity entities.EntityId) {
  153. switch targetEntity.GetEntityType() {
  154. case entities.RULE_NODE:
  155. r.putMessageToQueue(tpi, msg.CopyWithRuleNodeId(targetEntity.GetId()), queueCallback)
  156. case entities.RULE_CHAIN:
  157. r.putMessageToQueue(tpi, msg.CopyWithRuleChainId(targetEntity.GetId()), queueCallback)
  158. }
  159. }
  160. func (r *RuleChainActor) putMessageToQueue(tpi *queue.TopicPartitionInfo, msg *protocol.Message, queueCallback queue.Callback) {
  161. msgBytes, err := msg.Encode()
  162. if err != nil {
  163. server.Log.Error(err)
  164. }
  165. r.clusterService.PushMessageToRuleEngine(tpi, msg.Id, msgBytes, queueCallback)
  166. }
  167. func contains(relations []string, relation string) bool {
  168. if len(relations) == 0 {
  169. return true
  170. }
  171. for _, item := range relations {
  172. if strings.ToLower(item) == strings.ToLower(relation) {
  173. return true
  174. }
  175. }
  176. return false
  177. }
  178. // push a message to node actor
  179. func (r *RuleChainActor) pushMsgToNode(targetCtx *ruleEngine.RuleNodeCtx, msg *protocol.Message, relationType string) {
  180. if targetCtx != nil {
  181. targetCtx.SelfActor.Tell(&actor.RuleChainToRuleNodeMsg{
  182. Message: msg,
  183. Ctx: ruleEngine.NewDefaultContext(targetCtx, r.SystemCtx),
  184. FromRelationType: relationType,
  185. })
  186. } else {
  187. server.Log.Error("targetCtx is empty, %s, %s", r.ruleChainId, r.ruleChainName)
  188. }
  189. }
  190. func (r *RuleChainActor) Destroy() error {
  191. return nil
  192. }
  193. func (r *RuleChainActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy {
  194. if err != nil {
  195. return actor.Stop()
  196. } else {
  197. return actor.Resume()
  198. }
  199. }
  200. // RuleChainCreator
  201. type RuleChainCreator struct {
  202. RuleChainActor
  203. }
  204. //NewRuleChainCreator create a instance
  205. func NewRuleChainCreator(
  206. sysCtx *actor.SystemContext,
  207. tenantId string,
  208. ruleChan *ruleEngine.RuleChain,
  209. parent actor.Ref,
  210. ) *RuleChainCreator {
  211. item := &RuleChainCreator{}
  212. item.tenantId = tenantId
  213. item.ruleChain = ruleChan
  214. item.parent = parent
  215. item.SystemCtx = sysCtx
  216. return item
  217. }
  218. func (r *RuleChainCreator) CreateActorId() string {
  219. return r.ruleChain.ChainId
  220. }
  221. func (r *RuleChainCreator) CreateActor() actor.Actor {
  222. return newRuleChainActor(r.SystemCtx, r.ruleChain, r.tenantId, r.parent)
  223. }