tenant_actor.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. package actors
  2. import (
  3. "errors"
  4. "sparrow/pkg/actor"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/ruleEngine"
  7. "sparrow/pkg/server"
  8. )
  9. // TODO: 先用测试数据
  10. var ruleChains = map[string]*ruleEngine.RuleChain{
  11. "11": {
  12. TenantId: "1",
  13. Name: "Chain1",
  14. FirstNodeId: "1",
  15. IsRoot: true,
  16. IsDebug: false,
  17. Config: "",
  18. ChainId: "11",
  19. },
  20. "22": {
  21. TenantId: "2",
  22. Name: "Chain2",
  23. FirstNodeId: "1",
  24. IsRoot: false,
  25. IsDebug: false,
  26. Config: "",
  27. ChainId: "22",
  28. },
  29. }
  30. // TenantActor 租户 actor
  31. type TenantActor struct {
  32. actor.ContextBasedCreator
  33. tenantId string
  34. rootChain *ruleEngine.RuleChain
  35. rootChainActor actor.Ref
  36. cantFindTenant bool
  37. }
  38. func (t *TenantActor) initRuleChains() {
  39. for _, ruleChain := range ruleChains {
  40. server.Log.Debugf("Creating rule chain actor:%s", ruleChain.ChainId)
  41. actorRef, err := t.getOrCreateActor(ruleChain.ChainId, ruleChain)
  42. if err != nil {
  43. server.Log.Errorf("Creating rule chain actor:%s err:%s", ruleChain.ChainId, err.Error())
  44. continue
  45. }
  46. if ruleChain.IsRoot {
  47. t.rootChain = ruleChain
  48. t.rootChainActor = actorRef
  49. }
  50. server.Log.Debugf("Rule chain actor created:%s", ruleChain.ChainId)
  51. }
  52. }
  53. func (t *TenantActor) destroyRuleChains() {
  54. for _, ruleChain := range ruleChains {
  55. _ = t.Ctx.Stop(ruleChain.ChainId)
  56. }
  57. }
  58. func (t *TenantActor) getOrCreateActor(ruleChainId string, ruleChain *ruleEngine.RuleChain) (actor.Ref, error) {
  59. return t.Ctx.GetOrCreateChildActor(ruleChainId,
  60. actor.RULE_DISPATCHER_NAME,
  61. NewRuleChainCreator(t.SystemCtx, t.tenantId, ruleChain, t.Ctx.GetParentRef()))
  62. }
  63. func (t *TenantActor) GetActorRef() actor.Ref {
  64. return t.Ctx
  65. }
  66. func (t *TenantActor) Init(ctx actor.Ctx) error {
  67. t.Ctx = ctx
  68. server.Log.Debugf("Starting tenant actor:%s", t.tenantId)
  69. t.initRuleChains()
  70. return nil
  71. }
  72. func (t *TenantActor) Process(msg protocol.ActorMsg) error {
  73. if t.cantFindTenant {
  74. server.Log.Debugf("Processing missing Tenant msg")
  75. if msg.GetMessageType() == protocol.QUEUE_TO_RULE_ENGINE_MSG {
  76. qMsg := msg.(*actor.QueueToRuleEngineMsg)
  77. qMsg.Message.GetCallBack().OnSuccess()
  78. } else if msg.GetMessageType() == protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG {
  79. tMsg := msg.(*actor.TransportToDeviceActorMsg)
  80. tMsg.Message.GetCallBack().OnSuccess()
  81. }
  82. return nil
  83. }
  84. switch msg.GetMessageType() {
  85. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  86. return t.onQueueToRuleEngineMsg(msg.(*actor.QueueToRuleEngineMsg))
  87. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  88. return t.onRuleChainToRuleChainMsg(msg.(*actor.RuleChainToRuleChainMsg))
  89. case protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG:
  90. //TODO:实现到设备的消息处理
  91. }
  92. return nil
  93. }
  94. // TODO:基于services查找rule chain对象
  95. func (t *TenantActor) onRuleChainToRuleChainMsg(msg *actor.RuleChainToRuleChainMsg) error {
  96. ruleChainId := msg.Message.RuleChanId
  97. ref, err := t.getOrCreateActor(ruleChainId, ruleChains[ruleChainId])
  98. if err != nil {
  99. return err
  100. }
  101. ref.Tell(msg)
  102. return nil
  103. }
  104. func (t *TenantActor) onQueueToRuleEngineMsg(msg *actor.QueueToRuleEngineMsg) error {
  105. actorMsg := msg.Message
  106. if actorMsg.RuleChanId == "" {
  107. if t.rootChainActor != nil {
  108. t.rootChainActor.Tell(msg)
  109. } else {
  110. actorMsg.GetCallBack().OnFailure(errors.New("no Root Rule Chain available"))
  111. server.Log.Errorf("no root chain:%s", t.tenantId)
  112. }
  113. } else {
  114. t.Ctx.TellActor(actorMsg.RuleChanId, msg)
  115. }
  116. actorMsg.GetCallBack().OnSuccess()
  117. return nil
  118. }
  119. func (t *TenantActor) Destroy() error {
  120. return nil
  121. }
  122. func (t *TenantActor) OnProcessFailure(err error) *actor.ProcessFailureStrategy {
  123. if err != nil {
  124. return actor.Stop()
  125. } else {
  126. return actor.Resume()
  127. }
  128. }
  129. // TenantActorCreator 租户actor creator
  130. type TenantActorCreator struct {
  131. actor.ContextBasedCreator
  132. tenantId string
  133. }
  134. func NewTenantActorCreator(sysCtx *actor.SystemContext, tenantId string) *TenantActorCreator {
  135. t := new(TenantActorCreator)
  136. t.SystemCtx = sysCtx
  137. t.tenantId = tenantId
  138. return t
  139. }
  140. func (t *TenantActorCreator) CreateActorId() string {
  141. return t.tenantId
  142. }
  143. func (t *TenantActorCreator) CreateActor() actor.Actor {
  144. ins := new(TenantActor)
  145. ins.tenantId = t.tenantId
  146. ins.SystemCtx = t.SystemCtx
  147. return ins
  148. }