controller.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package main
  2. import (
  3. "sparrow/pkg/mongo"
  4. "sparrow/pkg/queue"
  5. "sparrow/pkg/rpcs"
  6. "sparrow/pkg/rule"
  7. "sparrow/pkg/server"
  8. )
  9. const (
  10. mongoSetName = "pandocloud"
  11. topicEvents = "events"
  12. topicStatus = "status"
  13. )
  14. type Controller struct {
  15. commandRecorder *mongo.Recorder
  16. eventRecorder *mongo.Recorder
  17. dataRecorder *mongo.Recorder
  18. eventsQueue *queue.Queue
  19. statusQueue *queue.Queue
  20. timer *rule.Timer
  21. ift *rule.Ifttt
  22. }
  23. func NewController(mongohost string, rabbithost string) (*Controller, error) {
  24. cmdr, err := mongo.NewRecorder(mongohost, mongoSetName, "commands")
  25. if err != nil {
  26. return nil, err
  27. }
  28. ever, err := mongo.NewRecorder(mongohost, mongoSetName, "events")
  29. if err != nil {
  30. return nil, err
  31. }
  32. datar, err := mongo.NewRecorder(mongohost, mongoSetName, "datas")
  33. if err != nil {
  34. return nil, err
  35. }
  36. eq, err := queue.New(rabbithost, topicEvents)
  37. if err != nil {
  38. return nil, err
  39. }
  40. sq, err := queue.New(rabbithost, topicStatus)
  41. if err != nil {
  42. return nil, err
  43. }
  44. // timer
  45. t := rule.NewTimer()
  46. t.Run()
  47. // ifttt
  48. ttt := rule.NewIfttt()
  49. return &Controller{
  50. commandRecorder: cmdr,
  51. eventRecorder: ever,
  52. dataRecorder: datar,
  53. eventsQueue: eq,
  54. statusQueue: sq,
  55. timer: t,
  56. ift: ttt,
  57. }, nil
  58. }
  59. func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  60. rpchost, err := getAccessRPCHost(args.DeviceId)
  61. if err != nil {
  62. return err
  63. }
  64. return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply)
  65. }
  66. func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  67. rpchost, err := getAccessRPCHost(args.Id)
  68. if err != nil {
  69. return err
  70. }
  71. return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
  72. }
  73. func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
  74. err := c.dataRecorder.Insert(args)
  75. if err != nil {
  76. return err
  77. }
  78. err = c.statusQueue.Send(args)
  79. if err != nil {
  80. return err
  81. }
  82. return nil
  83. }
  84. func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
  85. go func() {
  86. err := c.ift.Check(args.DeviceId, args.No)
  87. if err != nil {
  88. server.Log.Warnf("perform ifttt rules error : %v", err)
  89. }
  90. }()
  91. err := c.eventRecorder.Insert(args)
  92. if err != nil {
  93. return err
  94. }
  95. err = c.eventsQueue.Send(args)
  96. if err != nil {
  97. return err
  98. }
  99. return nil
  100. }
  101. func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  102. rpchost, err := getAccessRPCHost(args.DeviceId)
  103. if err != nil {
  104. return err
  105. }
  106. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  107. }
  108. func getAccessRPCHost(deviceid uint64) (string, error) {
  109. args := rpcs.ArgsGetDeviceOnlineStatus{
  110. Id: deviceid,
  111. }
  112. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  113. err := server.RPCCallByName("devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply)
  114. if err != nil {
  115. return "", err
  116. }
  117. return reply.AccessRPCHost, nil
  118. }