tenant_actor.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. package actors
  2. import (
  3. "errors"
  4. "sparrow/pkg/entities"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/ruleEngine"
  7. "sparrow/pkg/server"
  8. )
  9. // TenantActor 租户 actor
  10. type TenantActor struct {
  11. ruleEngine.ContextBasedCreator
  12. tenantId string
  13. rootChain *ruleEngine.RuleChain
  14. rootChainActor ruleEngine.Ref
  15. cantFindTenant bool
  16. ruleChainService ruleEngine.RuleChainService
  17. }
  18. func (t *TenantActor) initRuleChains() error {
  19. ruleChains, err := t.SystemCtx.RuleChainService.FindRuleChains(t.tenantId)
  20. if err != nil {
  21. return err
  22. }
  23. for _, ruleChain := range ruleChains {
  24. server.Log.Debugf("Creating rule chain actor:%s", ruleChain.ChainId)
  25. actorRef, err := t.getOrCreateActor(ruleChain.ChainId, ruleChain)
  26. if err != nil {
  27. server.Log.Errorf("Creating rule chain actor:%s err:%s", ruleChain.ChainId, err.Error())
  28. continue
  29. }
  30. if ruleChain.IsRoot {
  31. t.rootChain = ruleChain
  32. t.rootChainActor = actorRef
  33. }
  34. server.Log.Debugf("Rule chain actor created:%s", ruleChain.ChainId)
  35. }
  36. return nil
  37. }
  38. func (t *TenantActor) destroyRuleChains() error {
  39. ruleChains, err := t.ruleChainService.FindRuleChains(t.tenantId)
  40. if err != nil {
  41. return err
  42. }
  43. for _, ruleChain := range ruleChains {
  44. _ = t.Ctx.Stop(ruleChain.ChainId)
  45. }
  46. return nil
  47. }
  48. func (t *TenantActor) getOrCreateActor(ruleChainId string, ruleChain *ruleEngine.RuleChain) (ruleEngine.Ref, error) {
  49. return t.Ctx.GetOrCreateChildActor(ruleChainId,
  50. ruleEngine.RULE_DISPATCHER_NAME,
  51. NewRuleChainCreator(t.SystemCtx, t.tenantId, ruleChain, t.Ctx.GetParentRef()))
  52. }
  53. func (t *TenantActor) GetActorRef() ruleEngine.Ref {
  54. return t.Ctx
  55. }
  56. func (t *TenantActor) Init(ctx ruleEngine.Ctx) error {
  57. t.Ctx = ctx
  58. server.Log.Debugf("Starting tenant actor:%s", t.tenantId)
  59. return t.initRuleChains()
  60. }
  61. func (t *TenantActor) Process(msg protocol.ActorMsg) error {
  62. if t.cantFindTenant {
  63. server.Log.Debugf("Processing missing Tenant msg")
  64. if msg.GetMessageType() == protocol.QUEUE_TO_RULE_ENGINE_MSG {
  65. qMsg := msg.(*ruleEngine.QueueToRuleEngineMsg)
  66. qMsg.Message.GetCallBack().OnSuccess()
  67. } else if msg.GetMessageType() == protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG {
  68. tMsg := msg.(*ruleEngine.TransportToDeviceActorMsg)
  69. tMsg.Message.GetCallBack().OnSuccess()
  70. }
  71. return nil
  72. }
  73. switch msg.GetMessageType() {
  74. case protocol.QUEUE_TO_RULE_ENGINE_MSG:
  75. return t.onQueueToRuleEngineMsg(msg.(*ruleEngine.QueueToRuleEngineMsg))
  76. case protocol.RULE_CHAIN_TO_RULE_CHAIN_MSG:
  77. return t.onRuleChainToRuleChainMsg(msg.(*ruleEngine.RuleChainToRuleChainMsg))
  78. case protocol.TRANSPORT_TO_DEVICE_ACTOR_MSG:
  79. //TODO:实现到设备的消息处理
  80. case protocol.COMPONENT_LIFE_CYCLE_MSG:
  81. return t.onComponentLifecycleMsg(msg.(*ruleEngine.ComponentLifecycleMsg))
  82. default:
  83. return errors.New("未知的消息类型")
  84. }
  85. return nil
  86. }
  87. func (t *TenantActor) onComponentLifecycleMsg(msg *ruleEngine.ComponentLifecycleMsg) error {
  88. target := t.getEntityActorRef(msg.EntityId)
  89. if target != nil {
  90. if msg.EntityId.GetEntityType() == entities.RULE_CHAIN {
  91. ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, msg.EntityId.GetId())
  92. if err != nil {
  93. return err
  94. }
  95. if ruleChain != nil {
  96. if ruleChain.IsRoot {
  97. t.rootChain = ruleChain
  98. t.rootChainActor = target
  99. }
  100. }
  101. }
  102. target.TellWithHighPriority(msg)
  103. } else {
  104. server.Log.Debugln("Invalid component lifecycle msg")
  105. }
  106. return nil
  107. }
  108. func (t *TenantActor) getEntityActorRef(id entities.EntityId) ruleEngine.Ref {
  109. if id.GetEntityType() == entities.RULE_CHAIN {
  110. ruleChain, err := t.ruleChainService.FindRuleChainById(t.tenantId, id.GetId())
  111. if err != nil {
  112. return nil
  113. }
  114. ref, err := t.getOrCreateActor(id.GetId(), ruleChain)
  115. if err != nil {
  116. return nil
  117. }
  118. return ref
  119. }
  120. return nil
  121. }
  122. func (t *TenantActor) onRuleChainToRuleChainMsg(msg *ruleEngine.RuleChainToRuleChainMsg) error {
  123. ruleChainId := msg.Message.RuleChanId
  124. ruleChain, err := t.SystemCtx.RuleChainService.FindRuleChainById(t.tenantId, ruleChainId)
  125. if err != nil {
  126. return err
  127. }
  128. ref, err := t.getOrCreateActor(ruleChainId, ruleChain)
  129. if err != nil {
  130. return err
  131. }
  132. ref.Tell(msg)
  133. return nil
  134. }
  135. func (t *TenantActor) onQueueToRuleEngineMsg(msg *ruleEngine.QueueToRuleEngineMsg) error {
  136. actorMsg := msg.Message
  137. if actorMsg.RuleChanId == "" {
  138. if t.rootChainActor != nil {
  139. t.rootChainActor.Tell(msg)
  140. } else {
  141. actorMsg.GetCallBack().OnFailure(errors.New("no Root Rule Chain available"))
  142. server.Log.Errorf("no root chain:%s", t.tenantId)
  143. }
  144. } else {
  145. t.Ctx.TellActor(actorMsg.RuleChanId, msg)
  146. }
  147. actorMsg.GetCallBack().OnSuccess()
  148. return nil
  149. }
  150. func (t *TenantActor) Destroy() error {
  151. return nil
  152. }
  153. func (t *TenantActor) OnProcessFailure(err error) *ruleEngine.ProcessFailureStrategy {
  154. if err != nil {
  155. return ruleEngine.Stop()
  156. } else {
  157. return ruleEngine.Resume()
  158. }
  159. }
  160. // TenantActorCreator 租户actor creator
  161. type TenantActorCreator struct {
  162. ruleEngine.ContextBasedCreator
  163. tenantId string
  164. }
  165. func NewTenantActorCreator(sysCtx *ruleEngine.SystemContext, tenantId string) *TenantActorCreator {
  166. t := new(TenantActorCreator)
  167. t.SystemCtx = sysCtx
  168. t.tenantId = tenantId
  169. return t
  170. }
  171. func (t *TenantActorCreator) CreateActorId() string {
  172. return t.tenantId
  173. }
  174. func (t *TenantActorCreator) CreateActor() ruleEngine.Actor {
  175. ins := new(TenantActor)
  176. ins.tenantId = t.tenantId
  177. ins.SystemCtx = t.SystemCtx
  178. return ins
  179. }