access.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package main
  2. import (
  3. "github.com/gogf/gf/encoding/gjson"
  4. "sparrow/pkg/klink"
  5. "sparrow/pkg/mqtt"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. "time"
  10. )
  11. const (
  12. defaultTimeoutSecond = 5
  13. commandGetCurrentStatus = uint16(65528)
  14. )
  15. type Access struct {
  16. MqttBroker *mqtt.Broker
  17. }
  18. func NewAccess() (*Access, error) {
  19. p := NewMQTTProvider()
  20. return &Access{
  21. mqtt.NewBroker(p),
  22. }, nil
  23. }
  24. func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  25. server.Log.Infof("Access Set Status: %v", args)
  26. data := &protocol.Data{}
  27. data.Head.Timestamp = uint64(time.Now().Unix())
  28. token, err := a.MqttBroker.GetToken(args.DeviceId)
  29. if err != nil {
  30. return err
  31. }
  32. copy(data.Head.Token[:], token[:16])
  33. data.SubData = args.Status
  34. msg, err := data.Marshal()
  35. if err != nil {
  36. return err
  37. }
  38. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "s", msg, defaultTimeoutSecond*time.Second)
  39. }
  40. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  41. server.Log.Infof("Access Get Status: %v", args)
  42. // first send a get status command
  43. //cmdArgs := rpcs.ArgsSendCommand{
  44. // DeviceId: args.Id,
  45. // SubDevice: 65535,
  46. // No: commandGetCurrentStatus,
  47. // Priority: 99,
  48. // WaitTime: 0,
  49. //}
  50. //cmdReply := rpcs.ReplySendCommand{}
  51. //err := a.SendCommand(cmdArgs, &cmdReply)
  52. //if err != nil {
  53. // return err
  54. //}
  55. //
  56. //// then wait for status report
  57. //StatusChan[args.Id] = make(chan *protocol.Data)
  58. //after := time.After(defaultTimeoutSecond * time.Second)
  59. //server.Log.Debug("now waiting 5 seconds for status report...")
  60. //select {
  61. //case <-after:
  62. // // timeout
  63. // close(StatusChan[args.Id])
  64. // delete(StatusChan, args.Id)
  65. // return errors.New("get status timeout.")
  66. //case data := <-StatusChan[args.Id]:
  67. // // go it
  68. // close(StatusChan[args.Id])
  69. // delete(StatusChan, args.Id)
  70. // reply.Status = data.SubData
  71. // return nil
  72. //}
  73. return nil
  74. }
  75. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  76. server.Log.Infof("Access Send Command: %v", args)
  77. cmd := &klink.CloudSend{
  78. Action: "cloudSend",
  79. MsgId: 0,
  80. DeviceCode: args.DeviceId,
  81. SubDeviceId: args.SubDevice,
  82. Timestamp: time.Now().Unix(),
  83. Data: &klink.CloudSendData{
  84. Cmd: args.Cmd,
  85. Params: gjson.New(args.Params),
  86. },
  87. }
  88. msg, err := cmd.Marshal()
  89. if err != nil {
  90. return err
  91. }
  92. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, time.Duration(args.WaitTime)*time.Second)
  93. }