access.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. package main
  2. import (
  3. MQTT "github.com/eclipse/paho.mqtt.golang"
  4. "sparrow/pkg/klink"
  5. "sparrow/pkg/mqtt"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. "time"
  10. )
  11. const (
  12. defaultTimeoutSecond = 5
  13. commandGetCurrentStatus = uint16(65528)
  14. )
  15. type Access struct {
  16. MqttBroker *mqtt.Broker
  17. MQTT.Client
  18. }
  19. func NewAccess() (*Access, error) {
  20. p := NewMQTTProvider()
  21. return &Access{
  22. MqttBroker: mqtt.NewBroker(p),
  23. }, nil
  24. }
  25. func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  26. server.Log.Infof("Access Set Status: %v", args)
  27. data := &protocol.Data{}
  28. data.Head.Timestamp = uint64(time.Now().Unix())
  29. token, err := a.MqttBroker.GetToken(args.DeviceId)
  30. if err != nil {
  31. return err
  32. }
  33. copy(data.Head.Token[:], token[:16])
  34. data.SubData = args.Status
  35. msg, err := data.Marshal()
  36. if err != nil {
  37. return err
  38. }
  39. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "s", msg, defaultTimeoutSecond*time.Second)
  40. }
  41. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  42. server.Log.Infof("Access Get Status: %v", args)
  43. // first send a get status command
  44. cmdArgs := rpcs.ArgsSendCommand{
  45. DeviceId: args.Id,
  46. WaitTime: 0,
  47. SubDevice: args.SubDeviceId,
  48. Cmd: "report",
  49. }
  50. cmdReply := rpcs.ReplySendCommand{}
  51. return a.SendCommand(cmdArgs, &cmdReply)
  52. }
  53. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  54. cmd := &klink.CloudSend{
  55. Action: "cloudSend",
  56. MsgId: 0,
  57. DeviceCode: args.DeviceId,
  58. SubDeviceId: args.SubDevice,
  59. Timestamp: time.Now().Unix(),
  60. Data: &klink.CloudSendData{
  61. Cmd: args.Cmd,
  62. Params: args.Params,
  63. },
  64. }
  65. msg, err := cmd.Marshal()
  66. if err != nil {
  67. return err
  68. }
  69. print("Access Send Command: %v, %v,%s\r\n", args.DeviceId, args.Cmd, string(msg))
  70. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, time.Duration(args.WaitTime)*time.Second)
  71. }
  72. // Upgrade 设备OTA升级指令
  73. func (a *Access) Upgrade(args rpcs.ArgsDeviceUpgrade, reply *rpcs.ReplyEmptyResult) error {
  74. server.Log.Infof("设备OTA升级:%s, %s", args.DeviceId, args.Version)
  75. cmd := &klink.CloudSend{
  76. Action: "cloudSend",
  77. MsgId: 0,
  78. DeviceCode: args.DeviceId,
  79. Timestamp: time.Now().Unix(),
  80. Data: &klink.CloudSendData{
  81. Cmd: "devUpgrade",
  82. Params: map[string]interface{}{
  83. "md5": args.Md5,
  84. "url": args.Url,
  85. "version": args.Version,
  86. "file_size": args.FileSize,
  87. },
  88. },
  89. SubDeviceId: args.SudDeviceId,
  90. }
  91. msg, err := cmd.Marshal()
  92. if err != nil {
  93. return err
  94. }
  95. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, 5*time.Second)
  96. }