gateway.go 6.3 KB


  1. package v2
  2. import (
  3. "context"
  4. "encoding/hex"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. mqtt "github.com/eclipse/paho.mqtt.golang"
  9. "github.com/gogf/gf/encoding/gjson"
  10. "github.com/gogf/gf/net/ghttp"
  11. "log"
  12. "os"
  13. "sparrow-sdk/config"
  14. spErr "sparrow-sdk/errors"
  15. "sparrow-sdk/logger"
  16. "sparrow-sdk/protocal"
  17. "sparrow-sdk/schema"
  18. "time"
  19. )
  20. type CmdMessage struct {
  21. Cmd string
  22. Params interface{}
  23. }
  24. // DeviceReportCommandCb 云平台下发的上报指令回调
  25. type DeviceReportCommandCb func(deviceCode, subId string) error
  26. func NewGateway(config *config.Config) *Gateway {
  27. c := ghttp.NewClient()
  28. c.SetHeader("Content-Type", "application/json")
  29. if config.Logger == nil {
  30. config.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags),
  31. logger.Config{Open: config.Debug})
  32. }
  33. return &Gateway{
  34. config: config,
  35. httpClient: c,
  36. closeChan: make(chan struct{}),
  37. commandMessageChan: make(chan protocal.CloudSend),
  38. }
  39. }
  40. type Gateway struct {
  41. config *config.Config
  42. httpClient *ghttp.Client
  43. mqttClient mqtt.Client
  44. deviceId int64
  45. deviceKey string
  46. deviceSecret string
  47. deviceIdentifier string
  48. accessToken []byte
  49. accessAddr string
  50. closeChan chan struct{}
  51. commandMessageChan chan protocal.CloudSend
  52. reportCommandCb DeviceReportCommandCb
  53. }
  54. func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) {
  55. a.reportCommandCb = cb
  56. }
  57. // Register 接入网关向平台注册
  58. func (a *Gateway) Register() (*schema.RegisterData, error) {
  59. params := &schema.RegisterRequestParams{
  60. ProductKey: a.config.ProductKey,
  61. DeviceCode: a.config.DeviceCode,
  62. Version: a.config.Version,
  63. }
  64. resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/registration", params)
  65. if err != nil {
  66. a.config.Logger.Trace(context.Background(), "请求服务器失败:%s", err.Error())
  67. return nil, spErr.ErrRegisterToServer
  68. }
  69. var response schema.RegisterResponse
  70. err = json.Unmarshal(resp.ReadAll(), &response)
  71. if err != nil {
  72. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  73. return nil, spErr.ErrResponseFromServer
  74. }
  75. if response.Code != 0 {
  76. return nil, errors.New(response.Message)
  77. }
  78. a.config.Logger.Trace(context.Background(), "网关注册结果:%+v", response.Data)
  79. a.deviceId = response.Data.DeviceId
  80. a.deviceIdentifier = response.Data.DeviceIdentifier
  81. a.deviceKey = response.Data.DeviceKey
  82. a.deviceSecret = response.Data.DeviceSecret
  83. return &response.Data, nil
  84. }
  85. // Authentication 验证设备,并获取接入服务
  86. func (a *Gateway) Authentication() (*schema.DeviceAuthData, error) {
  87. if a.deviceSecret == "" || a.deviceId == 0 || a.deviceKey == "" || a.deviceIdentifier == "" {
  88. return nil, spErr.ErrDeviceNotRegister
  89. }
  90. params := &schema.AuthRequestParams{
  91. DeviceId: a.deviceId,
  92. DeviceSecret: a.deviceSecret,
  93. Protocol: string(a.config.Protocol),
  94. }
  95. resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/authentication", params)
  96. if err != nil {
  97. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  98. return nil, spErr.ErrAutoToServer
  99. }
  100. var result schema.LoginResponse
  101. err = json.Unmarshal(resp.ReadAll(), &result)
  102. if err != nil {
  103. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  104. return nil, spErr.ErrResponseFromServer
  105. }
  106. if result.Code != 0 {
  107. return nil, errors.New(result.Message)
  108. }
  109. a.config.Logger.Trace(context.Background(), "网关认证结果:%+v", result.Data)
  110. token, err := hex.DecodeString(result.Data.AccessToken)
  111. if err != nil {
  112. return nil, err
  113. }
  114. a.accessToken = token
  115. a.accessAddr = result.Data.AccessAddr
  116. return &result.Data, nil
  117. }
  118. // Connect 接入平台,会阻塞主进程
  119. func (a *Gateway) Connect() {
  120. opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s", a.accessAddr))
  121. clientId := fmt.Sprintf("%x", a.deviceId)
  122. opts.SetClientID(clientId)
  123. opts.SetPassword(hex.EncodeToString(a.accessToken))
  124. opts.SetAutoReconnect(true)
  125. opts.SetOnConnectHandler(func(client mqtt.Client) {
  126. a.config.Logger.Trace(context.Background(), "%s", "成功接入平台")
  127. })
  128. opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
  129. a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error())
  130. })
  131. opts.SetDefaultPublishHandler(func(client mqtt.Client, message mqtt.Message) {
  132. switch message.Topic() {
  133. case "c":
  134. a.commandHandler(message)
  135. //case "s":
  136. // a.statusHandler(message)
  137. }
  138. })
  139. opts.CredentialsProvider = func() (username string, password string) {
  140. _, _ = a.Authentication()
  141. return clientId, hex.EncodeToString(a.accessToken)
  142. }
  143. opts.SetKeepAlive(30 * time.Second)
  144. c := mqtt.NewClient(opts)
  145. a.mqttClient = c
  146. go func() {
  147. if token := c.Connect(); token.Wait() && token.Error() != nil {
  148. a.config.Logger.Trace(context.Background(), "%s", token.Error().Error())
  149. return
  150. }
  151. }()
  152. for {
  153. select {
  154. case <-a.closeChan:
  155. c.Disconnect(250)
  156. }
  157. }
  158. }
  159. // Close 关闭
  160. func (a *Gateway) Close() {
  161. close(a.closeChan)
  162. }
  163. // ReportStatus 对应平台v2版本
  164. func (a *Gateway) ReportStatus(subDeviceId, cmd string, params interface{}) error {
  165. data := &protocal.DevReport{
  166. Action: "devSend",
  167. MsgId: 1,
  168. TimeStamp: time.Now().Unix(),
  169. SubDeviceId: subDeviceId,
  170. DeviceCode: a.config.DeviceCode,
  171. Data: &protocal.Data{
  172. Cmd: cmd,
  173. Params: params,
  174. },
  175. }
  176. payload, err := json.Marshal(data)
  177. if err != nil {
  178. return err
  179. }
  180. fmt.Println(fmt.Sprintf("payload:%s", string(payload)))
  181. a.mqttClient.Publish("s", 1, false, payload)
  182. return nil
  183. }
  184. func (a *Gateway) commandHandler(message mqtt.Message) {
  185. j, err := gjson.DecodeToJson(message.Payload())
  186. if err != nil {
  187. a.config.Logger.Trace(context.Background(), "error message format :%s", err.Error())
  188. return
  189. }
  190. var msg protocal.CloudSend
  191. if err = j.Struct(&msg); err == nil {
  192. if msg.Data.Cmd == "report" && a.reportCommandCb != nil {
  193. if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil {
  194. panic(err)
  195. }
  196. }
  197. a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd)
  198. select {
  199. case a.commandMessageChan <- msg:
  200. case <-time.After(5 * time.Second):
  201. a.config.Logger.Trace(context.Background(), "command message write timeout")
  202. }
  203. } else {
  204. return
  205. }
  206. }
  207. // RecvCommand recv a command message from channel
  208. func (a *Gateway) RecvCommand() <-chan protocal.CloudSend {
  209. return a.commandMessageChan
  210. }