controller.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. package main
  2. import (
  3. "sparrow/pkg/actors"
  4. "sparrow/pkg/mongo"
  5. "sparrow/pkg/queue"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/rule"
  8. "sparrow/pkg/ruleEngine"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. const (
  13. mongoSetName = "pandocloud"
  14. topicEvents = "events"
  15. topicStatus = "status"
  16. )
  17. type Controller struct {
  18. commandRecorder *mongo.Recorder
  19. eventRecorder *mongo.Recorder
  20. dataRecorder *mongo.Recorder
  21. eventsQueue *queue.Queue
  22. statusQueue *queue.Queue
  23. timer *rule.Timer
  24. ift *rule.Ifttt
  25. }
  26. func NewController(mongohost string, rabbithost string) (*Controller, error) {
  27. cmdr, err := mongo.NewRecorder(mongohost, mongoSetName, "commands")
  28. if err != nil {
  29. return nil, err
  30. }
  31. ever, err := mongo.NewRecorder(mongohost, mongoSetName, "events")
  32. if err != nil {
  33. return nil, err
  34. }
  35. datar, err := mongo.NewRecorder(mongohost, mongoSetName, "datas")
  36. if err != nil {
  37. return nil, err
  38. }
  39. eq, err := queue.New(rabbithost, topicEvents)
  40. if err != nil {
  41. return nil, err
  42. }
  43. sq, err := queue.New(rabbithost, topicStatus)
  44. if err != nil {
  45. return nil, err
  46. }
  47. // timer
  48. t := rule.NewTimer()
  49. t.Run()
  50. // ifttt
  51. ttt := rule.NewIfttt()
  52. return &Controller{
  53. commandRecorder: cmdr,
  54. eventRecorder: ever,
  55. dataRecorder: datar,
  56. eventsQueue: eq,
  57. statusQueue: sq,
  58. timer: t,
  59. ift: ttt,
  60. }, nil
  61. }
  62. func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  63. rpchost, err := getAccessRPCHost(args.DeviceId)
  64. if err != nil {
  65. return err
  66. }
  67. return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply)
  68. }
  69. func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  70. rpchost, err := getAccessRPCHost(args.Id)
  71. if err != nil {
  72. return err
  73. }
  74. return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
  75. }
  76. func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
  77. err := c.dataRecorder.Insert(args)
  78. if err != nil {
  79. return err
  80. }
  81. err = c.statusQueue.Send(args)
  82. if err != nil {
  83. return err
  84. }
  85. return nil
  86. }
  87. func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
  88. go func() {
  89. err := c.ift.Check(args.DeviceId, args.No)
  90. if err != nil {
  91. server.Log.Warnf("perform ifttt rules error : %v", err)
  92. }
  93. }()
  94. err := c.eventRecorder.Insert(args)
  95. if err != nil {
  96. return err
  97. }
  98. err = c.eventsQueue.Send(args)
  99. if err != nil {
  100. return err
  101. }
  102. return nil
  103. }
  104. func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  105. rpchost, err := getAccessRPCHost(args.DeviceId)
  106. if err != nil {
  107. return err
  108. }
  109. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  110. }
  111. func getAccessRPCHost(deviceid uint64) (string, error) {
  112. args := rpcs.ArgsGetDeviceOnlineStatus{
  113. Id: deviceid,
  114. }
  115. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  116. err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply)
  117. if err != nil {
  118. return "", err
  119. }
  120. return reply.AccessRPCHost, nil
  121. }
  122. type ActorSystem struct {
  123. rootActor ruleEngine.Ref
  124. }
  125. func initActorSystem() (*ActorSystem, error) {
  126. actorContext := new(ruleEngine.SystemContext)
  127. system := ruleEngine.NewDefaultActorSystem(&ruleEngine.DefaultActorSystemConfig{
  128. SchedulerPoolSize: 5,
  129. AppDispatcherPoolSize: 4,
  130. TenantDispatcherPoolSize: 4,
  131. RuleEngineDispatcherPoolSize: 4,
  132. })
  133. system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(5))
  134. system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
  135. system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4))
  136. actorContext.ActorSystem = system
  137. // init services
  138. tenantService := &ruleEngine.TestTenantService{}
  139. rulechainService := &ruleEngine.TestRuleChainService{}
  140. actorContext.TenantService = tenantService
  141. actorContext.RuleChainService = rulechainService
  142. appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME,
  143. actors.NewAppActorCreator(actorContext))
  144. if err != nil {
  145. return nil, err
  146. }
  147. actorContext.AppActor = appActor
  148. server.Log.Debugln("actor system initialized")
  149. time.Sleep(time.Second * 1)
  150. appActor.Tell(&ruleEngine.AppInitMsg{})
  151. return &ActorSystem{rootActor: appActor}, nil
  152. }