mqtt_provider.go 7.0 KB


  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/encoding/gjson"
  6. "sparrow/pkg/klink"
  7. "sparrow/pkg/models"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/server"
  10. "time"
  11. )
  12. type MQTTProvider struct{}
  13. func NewMQTTProvider() *MQTTProvider {
  14. return &MQTTProvider{}
  15. }
  16. func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error {
  17. args := rpcs.ArgsValidateDeviceAccessToken{
  18. Id: deviceid,
  19. AccessToken: token,
  20. }
  21. reply := rpcs.ReplyValidateDeviceAccessToken{}
  22. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.ValidateDeviceAccessToken", args, &reply)
  23. if err != nil {
  24. return err
  25. }
  26. return nil
  27. }
  28. func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline, VendorId string) error {
  29. deviceOnlineCount.Inc()
  30. reply := rpcs.ReplyGetOnline{}
  31. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
  32. if err != nil {
  33. server.Log.Errorf("device online error. args: %v, error: %v", args, err)
  34. }
  35. var cReply rpcs.ReplyEmptyResult
  36. var cArgs rpcs.ArgsGetStatus
  37. cArgs.VendorId = VendorId
  38. cArgs.Id = args.Id
  39. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
  40. return err
  41. }
  42. // 下发时间同步指令
  43. var cmdArgs rpcs.ArgsSendCommand
  44. cmdArgs.Cmd = "timeSync"
  45. cmdArgs.DeviceId = args.Id
  46. cmdArgs.SubDevice = "01"
  47. cmdArgs.Params = map[string]interface{}{
  48. "value": time.Now().Unix(),
  49. }
  50. err = server.RPCCallByName(nil, rpcs.MQTTAccessName, "Access.SendCommand", cmdArgs, &cReply)
  51. if err != nil {
  52. server.Log.Errorf("发送时间同步指令:%v", cmdArgs, err)
  53. }
  54. return err
  55. }
  56. func (mp *MQTTProvider) OnDeviceOffline(deviceid string, vendorId string) error {
  57. if deviceid != "" {
  58. deviceOnlineCount.Dec()
  59. }
  60. args := rpcs.ArgsGetOffline{
  61. Id: deviceid,
  62. VendorId: vendorId,
  63. }
  64. reply := rpcs.ReplyGetOffline{}
  65. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
  66. if err != nil {
  67. server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
  68. }
  69. var cReply rpcs.ReplyEmptyResult
  70. var cArgs rpcs.ArgsGetStatus
  71. cArgs.VendorId = vendorId
  72. cArgs.Id = args.Id
  73. if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Offline", &cArgs, &cReply); err != nil {
  74. return err
  75. }
  76. return err
  77. }
  78. func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
  79. deviceMessageCount.WithLabelValues(deviceid).Inc()
  80. args := rpcs.ArgsDeviceId{
  81. Id: deviceid,
  82. }
  83. reply := rpcs.ReplyHeartBeat{}
  84. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.HeartBeat", args, &reply)
  85. if err != nil {
  86. server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
  87. }
  88. return err
  89. }
  90. func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype string, message *gjson.Json) {
  91. deviceMessageCount.WithLabelValues(deviceid).Inc()
  92. server.Log.Debugf("device {%v} message {%v} : %s", deviceid, msgtype, message.MustToJsonString())
  93. switch msgtype {
  94. case "s":
  95. act := klink.PacketAction(message.GetString("action"))
  96. if act != "" {
  97. switch act {
  98. case klink.DevSendAction:
  99. processReportStatus(deviceid, vendorId, message)
  100. case klink.DevLoginAction:
  101. _ = processDevLogin(deviceid, message.GetString("subDeviceId"))
  102. case klink.DevLogoutAction:
  103. _ = processDevLogout(deviceid, message.GetString("subDeviceId"))
  104. case klink.DevNetConfigAction:
  105. _ = processDevNetConfig(deviceid, message.GetString("md5"))
  106. case klink.ReportFirmwareAction:
  107. _ = processDeviceReportUpgrade(deviceid, message.GetString("version"))
  108. }
  109. }
  110. case "e":
  111. reply := rpcs.ReplyOnEvent{}
  112. args := rpcs.ArgsOnEvent{
  113. DeviceId: deviceid,
  114. TimeStamp: message.GetUint64("timestamp"),
  115. SubDeviceId: message.GetString("subDeviceId"),
  116. SubData: message.GetJson("data").MustToJson(),
  117. VendorId: vendorId,
  118. }
  119. err := server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnEvent", args, &reply)
  120. if err != nil {
  121. server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
  122. return
  123. }
  124. default:
  125. server.Log.Infof("unknown message type: %v", msgtype)
  126. }
  127. }
  128. func processReportStatus(deviceid, vendorId string, message *gjson.Json) {
  129. fmt.Printf("deviceId-------------------:%v", deviceid)
  130. device := &models.Device{}
  131. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", deviceid, device)
  132. if err != nil {
  133. server.Log.Errorf("device not found %s", deviceid)
  134. return
  135. }
  136. fmt.Printf("device-------------------:%v", device)
  137. product := &models.Product{}
  138. err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", device.ProductID, product)
  139. if err != nil {
  140. server.Log.Errorf("find product error : %v", err)
  141. return
  142. }
  143. fmt.Printf("product-------------------:%v", product)
  144. reply := rpcs.ReplyOnStatus{}
  145. args := rpcs.ArgsOnStatus{
  146. DeviceId: deviceid,
  147. Timestamp: message.GetUint64("timestamp"),
  148. SubData: message.GetJson("data").MustToJson(),
  149. VendorId: vendorId,
  150. SubDeviceId: message.GetString("subDeviceId"),
  151. ProductKey: product.ProductKey,
  152. Action: klink.PacketAction(message.GetString("action")),
  153. }
  154. err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.OnStatus", args, &reply)
  155. if err != nil {
  156. server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
  157. return
  158. }
  159. }
  160. func processDevLogin(deviceCode, subDeviceId string) error {
  161. var args rpcs.SubDeviceArgs
  162. args.DeviceCode = deviceCode
  163. args.Status = 1
  164. args.SubDeviceId = subDeviceId
  165. var reply *models.SubDevice
  166. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  167. if err != nil {
  168. server.Log.Errorf("子设备上线出错:%s", err.Error())
  169. }
  170. return nil
  171. }
  172. func processDevLogout(deviceCode, subDeviceId string) error {
  173. var args rpcs.SubDeviceArgs
  174. args.DeviceCode = deviceCode
  175. args.Status = 0
  176. args.SubDeviceId = subDeviceId
  177. var reply *models.SubDevice
  178. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.UpdateSubDevice", &args, &reply)
  179. if err != nil {
  180. server.Log.Errorf("子设备下线出错:%s", err.Error())
  181. }
  182. return nil
  183. }
  184. // 处理设备配网信息
  185. func processDevNetConfig(deviceCode, md5 string) error {
  186. args := &models.DeviceNetConfig{
  187. DeviceIdentifier: deviceCode,
  188. MD5: md5,
  189. Status: 1,
  190. }
  191. reply := rpcs.ReplyCheckDeviceNetConfig{}
  192. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateDeviceNetConfig", args, &reply)
  193. if err != nil {
  194. server.Log.Errorf("set device:%s net config info error:%v", deviceCode, err)
  195. }
  196. return nil
  197. }
  198. // 设备上报固件信息处理
  199. func processDeviceReportUpgrade(deviceId, version string) error {
  200. args := &rpcs.ArgsUpdateDeviceVersion{
  201. DeviceId: deviceId,
  202. Version: version,
  203. }
  204. var reply rpcs.ReplyEmptyResult
  205. err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.UpdateDeviceVersion", args, &reply)
  206. if err != nil {
  207. server.Log.Errorf("更新设备版本号失败:%v", err)
  208. }
  209. return nil
  210. }