123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300 |
- package v2
- import (
- "context"
- "crypto/tls"
- "encoding/hex"
- "encoding/json"
- "errors"
- "fmt"
- mqtt "github.com/eclipse/paho.mqtt.golang"
- "github.com/gogf/gf/container/gmap"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/net/ghttp"
- "log"
- "os"
- "sparrow-sdk/config"
- spErr "sparrow-sdk/errors"
- "sparrow-sdk/logger"
- "sparrow-sdk/protocal"
- "sparrow-sdk/schema"
- "time"
- )
- type CmdMessage struct {
- Cmd string
- Params interface{}
- }
- type CmdCallbackFun func(msg protocal.CloudSend) error
- // DeviceReportCommandCb 云平台下发的上报指令回调
- type DeviceReportCommandCb func(deviceCode, subId string) error
- func NewGateway(config *config.Config) *Gateway {
- if config.UseTls {
- if config.CaFile == "" || config.KeyFile == "" {
- panic("use tls: CaFile and CaKey must be provide")
- }
- }
- c := ghttp.NewClient()
- c.SetHeader("Content-Type", "application/json")
- if config.Logger == nil {
- config.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags),
- logger.Config{Open: config.Debug})
- }
- return &Gateway{
- config: config,
- httpClient: c,
- closeChan: make(chan struct{}),
- commandMessageChan: make(chan protocal.CloudSend),
- cmdList: gmap.New(true),
- }
- }
- type Gateway struct {
- config *config.Config
- httpClient *ghttp.Client
- mqttClient mqtt.Client
- deviceId int64
- deviceKey string
- deviceSecret string
- deviceIdentifier string
- accessToken []byte
- accessAddr string
- closeChan chan struct{}
- commandMessageChan chan protocal.CloudSend
- reportCommandCb DeviceReportCommandCb
- cmdList *gmap.Map
- }
- func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) {
- a.reportCommandCb = cb
- }
- // Register 接入网关向平台注册
- func (a *Gateway) Register() (*schema.RegisterData, error) {
- params := &schema.RegisterRequestParams{
- ProductKey: a.config.ProductKey,
- DeviceCode: a.config.DeviceCode,
- Version: a.config.Version,
- }
- resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/registration", params)
- if err != nil {
- a.config.Logger.Trace(context.Background(), "请求服务器失败:%s", err.Error())
- return nil, spErr.ErrRegisterToServer
- }
- var response schema.RegisterResponse
- err = json.Unmarshal(resp.ReadAll(), &response)
- if err != nil {
- a.config.Logger.Trace(context.Background(), "%s", err.Error())
- return nil, spErr.ErrResponseFromServer
- }
- if response.Code != 0 {
- return nil, errors.New(response.Message)
- }
- a.config.Logger.Trace(context.Background(), "网关注册结果:%+v", response.Data)
- a.deviceId = response.Data.DeviceId
- a.deviceIdentifier = response.Data.DeviceIdentifier
- a.deviceKey = response.Data.DeviceKey
- a.deviceSecret = response.Data.DeviceSecret
- return &response.Data, nil
- }
- // Authentication 验证设备,并获取接入服务
- func (a *Gateway) Authentication() (*schema.DeviceAuthData, error) {
- if a.deviceSecret == "" || a.deviceId == 0 || a.deviceKey == "" || a.deviceIdentifier == "" {
- return nil, spErr.ErrDeviceNotRegister
- }
- params := &schema.AuthRequestParams{
- DeviceId: a.deviceId,
- DeviceSecret: a.deviceSecret,
- Protocol: string(a.config.Protocol),
- }
- resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/authentication", params)
- if err != nil {
- a.config.Logger.Trace(context.Background(), "%s", err.Error())
- return nil, spErr.ErrAutoToServer
- }
- var result schema.LoginResponse
- err = json.Unmarshal(resp.ReadAll(), &result)
- if err != nil {
- a.config.Logger.Trace(context.Background(), "%s", err.Error())
- return nil, spErr.ErrResponseFromServer
- }
- if result.Code != 0 {
- return nil, errors.New(result.Message)
- }
- a.config.Logger.Trace(context.Background(), "网关认证结果:%+v", result.Data)
- token, err := hex.DecodeString(result.Data.AccessToken)
- if err != nil {
- return nil, err
- }
- a.accessToken = token
- a.accessAddr = result.Data.AccessAddr
- return &result.Data, nil
- }
- // Connect 接入平台,会阻塞主进程
- func (a *Gateway) Connect() {
- var url string
- if a.config.UseTls {
- url = fmt.Sprintf("ssl://%s", a.accessAddr)
- } else {
- url = fmt.Sprintf("tcp://%s", a.accessAddr)
- }
- opts := mqtt.NewClientOptions().AddBroker(url)
- clientId := fmt.Sprintf("%x", a.deviceId)
- opts.SetClientID(clientId)
- opts.SetPassword(hex.EncodeToString(a.accessToken))
- opts.SetAutoReconnect(true)
- opts.SetOnConnectHandler(func(client mqtt.Client) {
- a.config.Logger.Trace(context.Background(), "%s", "成功接入平台")
- })
- if a.config.UseTls {
- cert, err := tls.LoadX509KeyPair(a.config.CaFile, a.config.KeyFile)
- if err != nil {
- panic(err)
- }
- opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true})
- }
- opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
- a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error())
- })
- opts.SetDefaultPublishHandler(func(client mqtt.Client, message mqtt.Message) {
- switch message.Topic() {
- case "c":
- a.commandHandler(message)
- //case "s":
- // a.statusHandler(message)
- }
- })
- opts.CredentialsProvider = func() (username string, password string) {
- _, _ = a.Authentication()
- return clientId, hex.EncodeToString(a.accessToken)
- }
- opts.SetKeepAlive(30 * time.Second)
- c := mqtt.NewClient(opts)
- a.mqttClient = c
- go func() {
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- a.config.Logger.Trace(context.Background(), "%s", token.Error().Error())
- return
- }
- }()
- for {
- select {
- case <-a.closeChan:
- c.Disconnect(250)
- }
- }
- }
- // Close 关闭
- func (a *Gateway) Close() {
- close(a.closeChan)
- }
- // ReportStatus 对应平台v2版本
- func (a *Gateway) ReportStatus(subDeviceId, cmd string, params interface{}) error {
- data := &protocal.DevReport{
- Action: "devSend",
- MsgId: 1,
- TimeStamp: time.Now().Unix(),
- SubDeviceId: subDeviceId,
- DeviceCode: a.config.DeviceCode,
- Data: &protocal.Data{
- Cmd: cmd,
- Params: params,
- },
- }
- payload, err := json.Marshal(data)
- if err != nil {
- return err
- }
- fmt.Println(fmt.Sprintf("payload:%s", string(payload)))
- a.mqttClient.Publish("s", 1, false, payload)
- return nil
- }
- func (a *Gateway) commandHandler(message mqtt.Message) {
- j, err := gjson.DecodeToJson(message.Payload())
- if err != nil {
- a.config.Logger.Trace(context.Background(), "error message format :%s", err.Error())
- return
- }
- var msg protocal.CloudSend
- if err = j.Struct(&msg); err == nil {
- if msg.Data.Cmd == "report" && a.reportCommandCb != nil {
- if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil {
- panic(err)
- }
- return
- }
- a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd)
- if a.cmdList.Contains(msg.Data.Cmd) {
- f := a.cmdList.Get(msg.Data.Cmd)
- if err = f.(CmdCallbackFun)(msg); err != nil {
- a.config.Logger.Trace(context.Background(), "执行指令失败:%s", msg.Data.Cmd)
- }
- }
- select {
- case a.commandMessageChan <- msg:
- case <-time.After(5 * time.Second):
- a.config.Logger.Trace(context.Background(), "command message write timeout")
- }
- } else {
- return
- }
- }
- // RecvCommand recv a command message from channel
- // Deprecated
- func (a *Gateway) RecvCommand() <-chan protocal.CloudSend {
- return a.commandMessageChan
- }
- // RegisterCommand 注册指令回调
- func (a *Gateway) RegisterCommand(cmd string, f CmdCallbackFun) error {
- if a.cmdList.Contains(cmd) {
- return errors.New("重复注册")
- }
- a.cmdList.Set(cmd, f)
- return nil
- }
- // SubDeviceLogin 子设备上线
- func (a *Gateway) SubDeviceLogin(deviceCode, subDeviceId string) error {
- data := &protocal.DevLogin{
- Action: "devLogin",
- MsgId: 1,
- DeviceCode: deviceCode,
- SubDeviceId: subDeviceId,
- Timestamp: time.Now().Unix(),
- }
- payload, err := json.Marshal(data)
- if err != nil {
- return err
- }
- a.mqttClient.Publish("s", 1, false, payload)
- return nil
- }
- // SubDeviceLogout 子设备下线
- func (a *Gateway) SubDeviceLogout(deviceCode, subDeviceId string) error {
- data := &protocal.DevLogin{
- Action: "devLogout",
- MsgId: 1,
- DeviceCode: deviceCode,
- SubDeviceId: subDeviceId,
- Timestamp: time.Now().Unix(),
- }
- payload, err := json.Marshal(data)
- if err != nil {
- return err
- }
- a.mqttClient.Publish("s", 1, false, payload)
- return nil
- }
|