gateway.go 8.2 KB


  1. package v2
  2. import (
  3. "context"
  4. "crypto/tls"
  5. "encoding/hex"
  6. "encoding/json"
  7. "errors"
  8. "fmt"
  9. mqtt "github.com/eclipse/paho.mqtt.golang"
  10. "github.com/gogf/gf/container/gmap"
  11. "github.com/gogf/gf/encoding/gjson"
  12. "github.com/gogf/gf/net/ghttp"
  13. "log"
  14. "os"
  15. "sparrow-sdk/config"
  16. spErr "sparrow-sdk/errors"
  17. "sparrow-sdk/logger"
  18. "sparrow-sdk/protocal"
  19. "sparrow-sdk/schema"
  20. "time"
  21. )
  22. type CmdMessage struct {
  23. Cmd string
  24. Params interface{}
  25. }
  26. type CmdCallbackFun func(msg protocal.CloudSend) error
  27. // DeviceReportCommandCb 云平台下发的上报指令回调
  28. type DeviceReportCommandCb func(deviceCode, subId string) error
  29. func NewGateway(config *config.Config) *Gateway {
  30. if config.UseTls {
  31. if config.CaFile == "" || config.KeyFile == "" {
  32. panic("use tls: CaFile and CaKey must be provide")
  33. }
  34. }
  35. c := ghttp.NewClient()
  36. c.SetHeader("Content-Type", "application/json")
  37. if config.Logger == nil {
  38. config.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags),
  39. logger.Config{Open: config.Debug})
  40. }
  41. return &Gateway{
  42. config: config,
  43. httpClient: c,
  44. closeChan: make(chan struct{}),
  45. commandMessageChan: make(chan protocal.CloudSend),
  46. cmdList: gmap.New(true),
  47. }
  48. }
  49. type Gateway struct {
  50. config *config.Config
  51. httpClient *ghttp.Client
  52. mqttClient mqtt.Client
  53. deviceId int64
  54. deviceKey string
  55. deviceSecret string
  56. deviceIdentifier string
  57. accessToken []byte
  58. accessAddr string
  59. closeChan chan struct{}
  60. commandMessageChan chan protocal.CloudSend
  61. reportCommandCb DeviceReportCommandCb
  62. cmdList *gmap.Map
  63. }
  64. func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) {
  65. a.reportCommandCb = cb
  66. }
  67. // Register 接入网关向平台注册
  68. func (a *Gateway) Register() (*schema.RegisterData, error) {
  69. params := &schema.RegisterRequestParams{
  70. ProductKey: a.config.ProductKey,
  71. DeviceCode: a.config.DeviceCode,
  72. Version: a.config.Version,
  73. }
  74. resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/registration", params)
  75. if err != nil {
  76. a.config.Logger.Trace(context.Background(), "请求服务器失败:%s", err.Error())
  77. return nil, spErr.ErrRegisterToServer
  78. }
  79. var response schema.RegisterResponse
  80. err = json.Unmarshal(resp.ReadAll(), &response)
  81. if err != nil {
  82. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  83. return nil, spErr.ErrResponseFromServer
  84. }
  85. if response.Code != 0 {
  86. return nil, errors.New(response.Message)
  87. }
  88. a.config.Logger.Trace(context.Background(), "网关注册结果:%+v", response.Data)
  89. a.deviceId = response.Data.DeviceId
  90. a.deviceIdentifier = response.Data.DeviceIdentifier
  91. a.deviceKey = response.Data.DeviceKey
  92. a.deviceSecret = response.Data.DeviceSecret
  93. return &response.Data, nil
  94. }
  95. // Authentication 验证设备,并获取接入服务
  96. func (a *Gateway) Authentication() (*schema.DeviceAuthData, error) {
  97. if a.deviceSecret == "" || a.deviceId == 0 || a.deviceKey == "" || a.deviceIdentifier == "" {
  98. return nil, spErr.ErrDeviceNotRegister
  99. }
  100. params := &schema.AuthRequestParams{
  101. DeviceId: a.deviceId,
  102. DeviceSecret: a.deviceSecret,
  103. Protocol: string(a.config.Protocol),
  104. }
  105. resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/authentication", params)
  106. if err != nil {
  107. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  108. return nil, spErr.ErrAutoToServer
  109. }
  110. var result schema.LoginResponse
  111. err = json.Unmarshal(resp.ReadAll(), &result)
  112. if err != nil {
  113. a.config.Logger.Trace(context.Background(), "%s", err.Error())
  114. return nil, spErr.ErrResponseFromServer
  115. }
  116. if result.Code != 0 {
  117. return nil, errors.New(result.Message)
  118. }
  119. a.config.Logger.Trace(context.Background(), "网关认证结果:%+v", result.Data)
  120. token, err := hex.DecodeString(result.Data.AccessToken)
  121. if err != nil {
  122. return nil, err
  123. }
  124. a.accessToken = token
  125. a.accessAddr = result.Data.AccessAddr
  126. return &result.Data, nil
  127. }
  128. // Connect 接入平台,会阻塞主进程
  129. func (a *Gateway) Connect() {
  130. var url string
  131. if a.config.UseTls {
  132. url = fmt.Sprintf("ssl://%s", a.accessAddr)
  133. } else {
  134. url = fmt.Sprintf("tcp://%s", a.accessAddr)
  135. }
  136. opts := mqtt.NewClientOptions().AddBroker(url)
  137. clientId := fmt.Sprintf("%x", a.deviceId)
  138. opts.SetClientID(clientId)
  139. opts.SetPassword(hex.EncodeToString(a.accessToken))
  140. opts.SetAutoReconnect(true)
  141. opts.SetOnConnectHandler(func(client mqtt.Client) {
  142. a.config.Logger.Trace(context.Background(), "%s", "成功接入平台")
  143. })
  144. if a.config.UseTls {
  145. cert, err := tls.LoadX509KeyPair(a.config.CaFile, a.config.KeyFile)
  146. if err != nil {
  147. panic(err)
  148. }
  149. opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true})
  150. }
  151. opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
  152. a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error())
  153. })
  154. opts.SetDefaultPublishHandler(func(client mqtt.Client, message mqtt.Message) {
  155. switch message.Topic() {
  156. case "c":
  157. a.commandHandler(message)
  158. //case "s":
  159. // a.statusHandler(message)
  160. }
  161. })
  162. opts.CredentialsProvider = func() (username string, password string) {
  163. _, _ = a.Authentication()
  164. return clientId, hex.EncodeToString(a.accessToken)
  165. }
  166. opts.SetKeepAlive(30 * time.Second)
  167. c := mqtt.NewClient(opts)
  168. a.mqttClient = c
  169. go func() {
  170. if token := c.Connect(); token.Wait() && token.Error() != nil {
  171. a.config.Logger.Trace(context.Background(), "%s", token.Error().Error())
  172. return
  173. }
  174. }()
  175. for {
  176. select {
  177. case <-a.closeChan:
  178. c.Disconnect(250)
  179. }
  180. }
  181. }
  182. // Close 关闭
  183. func (a *Gateway) Close() {
  184. close(a.closeChan)
  185. }
  186. // ReportStatus 对应平台v2版本
  187. func (a *Gateway) ReportStatus(subDeviceId, cmd string, params interface{}) error {
  188. data := &protocal.DevReport{
  189. Action: "devSend",
  190. MsgId: 1,
  191. TimeStamp: time.Now().Unix(),
  192. SubDeviceId: subDeviceId,
  193. DeviceCode: a.config.DeviceCode,
  194. Data: &protocal.Data{
  195. Cmd: cmd,
  196. Params: params,
  197. },
  198. }
  199. payload, err := json.Marshal(data)
  200. if err != nil {
  201. return err
  202. }
  203. fmt.Println(fmt.Sprintf("payload:%s", string(payload)))
  204. a.mqttClient.Publish("s", 1, false, payload)
  205. return nil
  206. }
  207. func (a *Gateway) commandHandler(message mqtt.Message) {
  208. j, err := gjson.DecodeToJson(message.Payload())
  209. if err != nil {
  210. a.config.Logger.Trace(context.Background(), "error message format :%s", err.Error())
  211. return
  212. }
  213. var msg protocal.CloudSend
  214. if err = j.Struct(&msg); err == nil {
  215. if msg.Data.Cmd == "report" && a.reportCommandCb != nil {
  216. if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil {
  217. panic(err)
  218. }
  219. return
  220. }
  221. a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd)
  222. if a.cmdList.Contains(msg.Data.Cmd) {
  223. f := a.cmdList.Get(msg.Data.Cmd)
  224. if err = f.(CmdCallbackFun)(msg); err != nil {
  225. a.config.Logger.Trace(context.Background(), "执行指令失败:%s", msg.Data.Cmd)
  226. }
  227. }
  228. select {
  229. case a.commandMessageChan <- msg:
  230. case <-time.After(5 * time.Second):
  231. a.config.Logger.Trace(context.Background(), "command message write timeout")
  232. }
  233. } else {
  234. return
  235. }
  236. }
  237. // RecvCommand recv a command message from channel
  238. // Deprecated
  239. func (a *Gateway) RecvCommand() <-chan protocal.CloudSend {
  240. return a.commandMessageChan
  241. }
  242. // RegisterCommand 注册指令回调
  243. func (a *Gateway) RegisterCommand(cmd string, f CmdCallbackFun) error {
  244. if a.cmdList.Contains(cmd) {
  245. return errors.New("重复注册")
  246. }
  247. a.cmdList.Set(cmd, f)
  248. return nil
  249. }
  250. // SubDeviceLogin 子设备上线
  251. func (a *Gateway) SubDeviceLogin(deviceCode, subDeviceId string) error {
  252. data := &protocal.DevLogin{
  253. Action: "devLogin",
  254. MsgId: 1,
  255. DeviceCode: deviceCode,
  256. SubDeviceId: subDeviceId,
  257. Timestamp: time.Now().Unix(),
  258. }
  259. payload, err := json.Marshal(data)
  260. if err != nil {
  261. return err
  262. }
  263. a.mqttClient.Publish("s", 1, false, payload)
  264. return nil
  265. }
  266. // SubDeviceLogout 子设备下线
  267. func (a *Gateway) SubDeviceLogout(deviceCode, subDeviceId string) error {
  268. data := &protocal.DevLogin{
  269. Action: "devLogout",
  270. MsgId: 1,
  271. DeviceCode: deviceCode,
  272. SubDeviceId: subDeviceId,
  273. Timestamp: time.Now().Unix(),
  274. }
  275. payload, err := json.Marshal(data)
  276. if err != nil {
  277. return err
  278. }
  279. a.mqttClient.Publish("s", 1, false, payload)
  280. return nil
  281. }