mqtt_provider.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package main
  2. import (
  3. "github.com/gogf/gf/encoding/gjson"
  4. "sparrow/pkg/klink"
  5. "sparrow/pkg/rpcs"
  6. "sparrow/pkg/server"
  7. )
  8. type MQTTProvider struct{}
  9. func NewMQTTProvider() *MQTTProvider {
  10. return &MQTTProvider{}
  11. }
  12. func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error {
  13. args := rpcs.ArgsValidateDeviceAccessToken{
  14. Id: deviceid,
  15. AccessToken: token,
  16. }
  17. reply := rpcs.ReplyValidateDeviceAccessToken{}
  18. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.ValidateDeviceAccessToken", args, &reply)
  19. if err != nil {
  20. server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
  21. return err
  22. }
  23. return nil
  24. }
  25. func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline, VendorId string) error {
  26. reply := rpcs.ReplyGetOnline{}
  27. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
  28. if err != nil {
  29. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  30. }
  31. var cReply rpcs.ReplyEmptyResult
  32. var cArgs rpcs.ArgsGetStatus
  33. cArgs.VendorId = VendorId
  34. cArgs.Id = args.Id
  35. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  36. return err
  37. }
  38. return err
  39. }
  40. func (mp *MQTTProvider) OnDeviceOffline(deviceid string, vendorId string) error {
  41. args := rpcs.ArgsGetOffline{
  42. Id: deviceid,
  43. VendorId: vendorId,
  44. }
  45. reply := rpcs.ReplyGetOffline{}
  46. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  47. if err != nil {
  48. server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
  49. }
  50. var cReply rpcs.ReplyEmptyResult
  51. var cArgs rpcs.ArgsGetStatus
  52. cArgs.VendorId = vendorId
  53. cArgs.Id = args.Id
  54. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Offline", &cArgs, &cReply); err != nil {
  55. return err
  56. }
  57. return err
  58. }
  59. func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
  60. args := rpcs.ArgsDeviceId{
  61. Id: deviceid,
  62. }
  63. reply := rpcs.ReplyHeartBeat{}
  64. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.HeartBeat", args, &reply)
  65. if err != nil {
  66. server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
  67. }
  68. return err
  69. }
  70. func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype string, message *gjson.Json) {
  71. server.Log.Infof("device {%v} message {%v} : %s", deviceid, msgtype, message.MustToJsonString())
  72. switch msgtype {
  73. case "s":
  74. act := klink.PacketAction(message.GetString("action"))
  75. if act != "" {
  76. switch act {
  77. case klink.DevSendAction:
  78. processReportStatus(deviceid, vendorId, message)
  79. //case klink.DevLoginAction:
  80. // _ = processDevLogin(deviceid)
  81. //case klink.DevLogoutAction:
  82. // _ = processDevLogout(deviceid)
  83. }
  84. }
  85. case "e":
  86. reply := rpcs.ReplyOnEvent{}
  87. args := rpcs.ArgsOnEvent{
  88. DeviceId: deviceid,
  89. TimeStamp: message.GetUint64("timestamp"),
  90. SubDeviceId: message.GetString("subDeviceId"),
  91. SubData: message.GetJson("data").MustToJson(),
  92. VendorId: vendorId,
  93. }
  94. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  95. if err != nil {
  96. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  97. return
  98. }
  99. default:
  100. server.Log.Infof("unknown message type: %v", msgtype)
  101. }
  102. }
  103. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  104. reply := rpcs.ReplyOnStatus{}
  105. args := rpcs.ArgsOnStatus{
  106. DeviceId: deviceid,
  107. Timestamp: message.GetUint64("timestamp"),
  108. SubData: message.GetJson("data").MustToJson(),
  109. VendorId: vendorId,
  110. SubDeviceId: message.GetString("subDeviceId"),
  111. Action: klink.PacketAction(message.GetString("action")),
  112. }
  113. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  114. if err != nil {
  115. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  116. return
  117. }
  118. }
  119. func processDevLogin(subDeviceId string) error {
  120. reply := rpcs.ReplyGetOnline{}
  121. var args rpcs.ArgsGetOnline
  122. args.Id = subDeviceId
  123. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
  124. if err != nil {
  125. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  126. }
  127. return err
  128. }
  129. func processDevLogout(subDeviceId string) error {
  130. args := rpcs.ArgsGetOffline{
  131. Id: subDeviceId,
  132. }
  133. reply := rpcs.ReplyGetOffline{}
  134. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  135. if err != nil {
  136. server.Log.Errorf("device offline error. deviceid: %v, error: %v", subDeviceId, err)
  137. }
  138. return err
  139. }