mqtt_provider.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package main
  2. import (
  3. "sparrow/pkg/protocol"
  4. "sparrow/pkg/rpcs"
  5. "sparrow/pkg/server"
  6. )
  7. type MQTTProvider struct{}
  8. func NewMQTTProvider() *MQTTProvider {
  9. return &MQTTProvider{}
  10. }
  11. func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error {
  12. args := rpcs.ArgsValidateDeviceAccessToken{
  13. Id: deviceid,
  14. AccessToken: token,
  15. }
  16. reply := rpcs.ReplyValidateDeviceAccessToken{}
  17. err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
  18. if err != nil {
  19. server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
  20. return err
  21. }
  22. return nil
  23. }
  24. func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
  25. reply := rpcs.ReplyGetOnline{}
  26. err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOnline", args, &reply)
  27. if err != nil {
  28. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  29. }
  30. return err
  31. }
  32. func (mp *MQTTProvider) OnDeviceOffline(deviceid string) error {
  33. args := rpcs.ArgsGetOffline{
  34. Id: deviceid,
  35. }
  36. reply := rpcs.ReplyGetOffline{}
  37. err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOffline", args, &reply)
  38. if err != nil {
  39. server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
  40. }
  41. return err
  42. }
  43. func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
  44. args := rpcs.ArgsDeviceId{
  45. Id: deviceid,
  46. }
  47. reply := rpcs.ReplyHeartBeat{}
  48. err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.HeartBeat", args, &reply)
  49. if err != nil {
  50. server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
  51. }
  52. return err
  53. }
  54. func (mp *MQTTProvider) OnDeviceMessage(deviceid string, msgtype string, message []byte) {
  55. server.Log.Infof("device {%v} message {%v} : %x", deviceid, msgtype, message)
  56. switch msgtype {
  57. case "s":
  58. // it's a status
  59. data := &protocol.Data{}
  60. err := data.UnMarshal(message)
  61. if err != nil {
  62. server.Log.Errorf("unmarshal data error : %v", err)
  63. return
  64. }
  65. // if there is a realtime query
  66. ch, exist := StatusChan[deviceid]
  67. if exist {
  68. ch <- data
  69. return
  70. }
  71. // it's a normal report.
  72. reply := rpcs.ReplyOnStatus{}
  73. args := rpcs.ArgsOnStatus{
  74. DeviceId: deviceid,
  75. Timestamp: data.Head.Timestamp,
  76. Subdata: data.SubData,
  77. }
  78. err = server.RPCCallByName(nil, "controller", "Controller.OnStatus", args, &reply)
  79. if err != nil {
  80. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  81. return
  82. }
  83. case "e":
  84. // it's an event report
  85. event := &protocol.Event{}
  86. err := event.UnMarshal(message)
  87. if err != nil {
  88. server.Log.Errorf("unmarshal event error : %v", err)
  89. return
  90. }
  91. reply := rpcs.ReplyOnEvent{}
  92. args := rpcs.ArgsOnEvent{
  93. DeviceId: deviceid,
  94. TimeStamp: event.Head.Timestamp,
  95. SubDevice: event.Head.SubDeviceid,
  96. No: event.Head.No,
  97. Priority: event.Head.Priority,
  98. Params: event.Params,
  99. }
  100. err = server.RPCCallByName(nil, "controller", "Controller.OnEvent", args, &reply)
  101. if err != nil {
  102. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  103. return
  104. }
  105. default:
  106. server.Log.Infof("unkown message type: %v", msgtype)
  107. }
  108. }