notifier.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. package main
  2. import (
  3. "encoding/json"
  4. "sparrow/pkg/models"
  5. "sparrow/pkg/productconfig"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "sparrow/pkg/utils"
  11. "time"
  12. )
  13. const (
  14. topicEvents = "events"
  15. topicStatus = "status"
  16. )
  17. // report structure
  18. type ReportPack struct {
  19. Tag string `json:"tag"`
  20. Identifier string `json:"identifier"`
  21. TimeStamp int64 `json:"timestamp"`
  22. Data map[string][]interface{} `json:"data"`
  23. }
  24. var notifier *Notifier
  25. type Notifier struct {
  26. eventsQueue *queue.Queue
  27. statusQueue *queue.Queue
  28. apps []*models.Application
  29. }
  30. func NewNotifier(rabbithost string) (*Notifier, error) {
  31. eq, err := queue.New(rabbithost, topicEvents)
  32. if err != nil {
  33. return nil, err
  34. }
  35. sq, err := queue.New(rabbithost, topicStatus)
  36. if err != nil {
  37. return nil, err
  38. }
  39. return &Notifier{
  40. eventsQueue: eq,
  41. statusQueue: sq,
  42. apps: []*models.Application{},
  43. }, nil
  44. }
  45. // TODO
  46. func (n *Notifier) reportStatus(event rpcs.ArgsOnStatus) error {
  47. return nil
  48. }
  49. // TODO
  50. func (n *Notifier) processStatus() error {
  51. return nil
  52. }
  53. func (n *Notifier) updateApplications() error {
  54. for {
  55. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.GetApplications", 0, &n.apps)
  56. if err != nil {
  57. server.Log.Errorf("get applications error : %v", err)
  58. }
  59. time.Sleep(time.Minute)
  60. }
  61. }
  62. func (n *Notifier) reportEvent(event rpcs.ArgsOnEvent) error {
  63. server.Log.Debugf("reporting event %v", event)
  64. device := &models.Device{}
  65. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", event.DeviceId, device)
  66. if err != nil {
  67. server.Log.Errorf("find device error : %v", err)
  68. return err
  69. }
  70. product := &models.Product{}
  71. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  72. if err != nil {
  73. server.Log.Errorf("find product error : %v", err)
  74. return err
  75. }
  76. c, err := productconfig.New(product.ProductConfig)
  77. if err != nil {
  78. server.Log.Errorf("product config error : %v", err)
  79. return err
  80. }
  81. ev := &protocol.Event{}
  82. ev.Head.No = event.No
  83. ev.Head.SubDeviceid = event.SubDevice
  84. ev.Params = event.Params
  85. m, err := c.EventToMap(ev)
  86. if err != nil {
  87. server.Log.Errorf("gen event json error : %v", err)
  88. return err
  89. }
  90. res := ReportPack{
  91. Tag: "event",
  92. Identifier: device.DeviceIdentifier,
  93. Data: m,
  94. }
  95. jsonRes, err := json.Marshal(res)
  96. if err != nil {
  97. server.Log.Errorf("json marshal error : %v", err)
  98. return err
  99. }
  100. reqHead := map[string]string{}
  101. reqHead["Content-Type"] = "application/json"
  102. for _, app := range n.apps {
  103. if nil == checkAppDomain(app.AppDomain, device.DeviceIdentifier) {
  104. reqHead["App-Token"] = app.AppToken
  105. _, err := utils.SendHttpRequest(app.ReportUrl, string(jsonRes), "POST", reqHead)
  106. if err != nil {
  107. server.Log.Errorf("http post json error : %v", err)
  108. }
  109. server.Log.Debugf("http post json succ : %v", string(jsonRes))
  110. }
  111. }
  112. return nil
  113. }
  114. func (n *Notifier) processEvents() error {
  115. for {
  116. event := rpcs.ArgsOnEvent{}
  117. err := n.eventsQueue.Receive(&event)
  118. if err != nil {
  119. server.Log.Errorf("error when receiving from queue : %v", err)
  120. return err
  121. }
  122. go n.reportEvent(event)
  123. }
  124. return nil
  125. }
  126. func (n *Notifier) Run() error {
  127. go n.updateApplications()
  128. go n.processEvents()
  129. go n.processStatus()
  130. return nil
  131. }
  132. func RunNotifier() error {
  133. if notifier == nil {
  134. notifier, err := NewNotifier(*confRabbitHost)
  135. if err != nil {
  136. server.Log.Error(err)
  137. }
  138. err = notifier.Run()
  139. if err != nil {
  140. server.Log.Error(err)
  141. }
  142. }
  143. return nil
  144. }