access.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. package main
  2. import (
  3. "errors"
  4. "sparrow/pkg/mqtt"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "time"
  9. )
  10. const (
  11. defaultTimeoutSecond = 5
  12. commandGetCurrentStatus = uint16(65528)
  13. )
  14. type Access struct {
  15. MqttBroker *mqtt.Broker
  16. }
  17. func NewAccess() (*Access, error) {
  18. p := NewMQTTProvider()
  19. return &Access{
  20. mqtt.NewBroker(p),
  21. }, nil
  22. }
  23. func (a *Access) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  24. server.Log.Infof("Access Set Status: %v", args)
  25. data := &protocol.Data{}
  26. data.Head.Timestamp = uint64(time.Now().Unix())
  27. token, err := a.MqttBroker.GetToken(args.DeviceId)
  28. if err != nil {
  29. return err
  30. }
  31. copy(data.Head.Token[:], token[:16])
  32. data.SubData = args.Status
  33. msg, err := data.Marshal()
  34. if err != nil {
  35. return err
  36. }
  37. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "s", msg, defaultTimeoutSecond*time.Second)
  38. }
  39. func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  40. server.Log.Infof("Access Get Status: %v", args)
  41. // first send a get status command
  42. cmdArgs := rpcs.ArgsSendCommand{
  43. DeviceId: args.Id,
  44. SubDevice: 65535,
  45. No: commandGetCurrentStatus,
  46. Priority: 99,
  47. WaitTime: 0,
  48. }
  49. cmdReply := rpcs.ReplySendCommand{}
  50. err := a.SendCommand(cmdArgs, &cmdReply)
  51. if err != nil {
  52. return err
  53. }
  54. // then wait for status report
  55. StatusChan[args.Id] = make(chan *protocol.Data)
  56. after := time.After(defaultTimeoutSecond * time.Second)
  57. server.Log.Debug("now waiting 5 seconds for status report...")
  58. select {
  59. case <-after:
  60. // timeout
  61. close(StatusChan[args.Id])
  62. delete(StatusChan, args.Id)
  63. return errors.New("get status timeout.")
  64. case data := <-StatusChan[args.Id]:
  65. // go it
  66. close(StatusChan[args.Id])
  67. delete(StatusChan, args.Id)
  68. reply.Status = data.SubData
  69. return nil
  70. }
  71. }
  72. func (a *Access) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  73. server.Log.Infof("Access Send Command: %v", args)
  74. cmd := &protocol.Command{}
  75. cmd.Head.Timestamp = uint64(time.Now().Unix())
  76. token, err := a.MqttBroker.GetToken(args.DeviceId)
  77. if err != nil {
  78. return err
  79. }
  80. copy(cmd.Head.Token[:], token[:16])
  81. cmd.Head.No = args.No
  82. cmd.Head.Priority = args.Priority
  83. cmd.Head.SubDeviceid = args.SubDevice
  84. cmd.Head.ParamsCount = uint16(len(args.Params))
  85. cmd.Params = args.Params
  86. msg, err := cmd.Marshal()
  87. if err != nil {
  88. return err
  89. }
  90. return a.MqttBroker.SendMessageToDevice(args.DeviceId, "c", msg, time.Duration(args.WaitTime)*time.Second)
  91. }