123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- package main
- import (
- "encoding/json"
- "sparrow/pkg/models"
- "sparrow/pkg/productconfig"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "sparrow/pkg/utils"
- "time"
- )
- const (
- topicEvents = "events"
- topicStatus = "status"
- )
- // report structure
- type ReportPack struct {
- Tag string `json:"tag"`
- Identifier string `json:"identifier"`
- TimeStamp int64 `json:"timestamp"`
- Data map[string][]interface{} `json:"data"`
- }
- var notifier *Notifier
- type Notifier struct {
- eventsQueue *queue.Queue
- statusQueue *queue.Queue
- apps []*models.Application
- }
- func NewNotifier(rabbithost string) (*Notifier, error) {
- eq, err := queue.New(rabbithost, topicEvents)
- if err != nil {
- return nil, err
- }
- sq, err := queue.New(rabbithost, topicStatus)
- if err != nil {
- return nil, err
- }
- return &Notifier{
- eventsQueue: eq,
- statusQueue: sq,
- apps: []*models.Application{},
- }, nil
- }
- // TODO
- func (n *Notifier) reportStatus(event rpcs.ArgsOnStatus) error {
- return nil
- }
- // TODO
- func (n *Notifier) processStatus() error {
- return nil
- }
- func (n *Notifier) updateApplications() error {
- for {
- err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.GetApplications", 0, &n.apps)
- if err != nil {
- server.Log.Errorf("get applications error : %v", err)
- }
- time.Sleep(time.Minute)
- }
- }
- func (n *Notifier) reportEvent(event rpcs.ArgsOnEvent) error {
- server.Log.Debugf("reporting event %v", event)
- device := &models.Device{}
- err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceById", event.DeviceId, device)
- if err != nil {
- server.Log.Errorf("find device error : %v", err)
- return err
- }
- product := &models.Product{}
- err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
- if err != nil {
- server.Log.Errorf("find product error : %v", err)
- return err
- }
- c, err := productconfig.New(product.ProductConfig)
- if err != nil {
- server.Log.Errorf("product config error : %v", err)
- return err
- }
- ev := &protocol.Event{}
- ev.Head.No = event.No
- ev.Head.SubDeviceid = event.SubDevice
- ev.Params = event.Params
- m, err := c.EventToMap(ev)
- if err != nil {
- server.Log.Errorf("gen event json error : %v", err)
- return err
- }
- res := ReportPack{
- Tag: "event",
- Identifier: device.DeviceIdentifier,
- Data: m,
- }
- jsonRes, err := json.Marshal(res)
- if err != nil {
- server.Log.Errorf("json marshal error : %v", err)
- return err
- }
- reqHead := map[string]string{}
- reqHead["Content-Type"] = "application/json"
- for _, app := range n.apps {
- if nil == checkAppDomain(app.AppDomain, device.DeviceIdentifier) {
- reqHead["App-Token"] = app.AppToken
- _, err := utils.SendHttpRequest(app.ReportUrl, string(jsonRes), "POST", reqHead)
- if err != nil {
- server.Log.Errorf("http post json error : %v", err)
- }
- server.Log.Debugf("http post json succ : %v", string(jsonRes))
- }
- }
- return nil
- }
- func (n *Notifier) processEvents() error {
- for {
- event := rpcs.ArgsOnEvent{}
- err := n.eventsQueue.Receive(&event)
- if err != nil {
- server.Log.Errorf("error when receiving from queue : %v", err)
- return err
- }
- go n.reportEvent(event)
- }
- return nil
- }
- func (n *Notifier) Run() error {
- go n.updateApplications()
- go n.processEvents()
- go n.processStatus()
- return nil
- }
- func RunNotifier() error {
- if notifier == nil {
- notifier, err := NewNotifier(*confRabbitHost)
- if err != nil {
- server.Log.Error(err)
- }
- err = notifier.Run()
- if err != nil {
- server.Log.Error(err)
- }
- }
- return nil
- }
|