package v2 import ( "context" "crypto/tls" "encoding/hex" "encoding/json" "errors" "fmt" mqtt "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/container/gmap" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/net/ghttp" "log" "os" "sparrow-sdk/config" spErr "sparrow-sdk/errors" "sparrow-sdk/logger" "sparrow-sdk/protocal" "sparrow-sdk/schema" "time" ) type CmdMessage struct { Cmd string Params interface{} } type CmdCallbackFun func(msg protocal.CloudSend) error // DeviceReportCommandCb 云平台下发的上报指令回调 type DeviceReportCommandCb func(deviceCode, subId string) error func NewGateway(config *config.Config) *Gateway { if config.UseTls { if config.CaFile == "" || config.KeyFile == "" { panic("use tls: CaFile and CaKey must be provide") } } c := ghttp.NewClient() c.SetHeader("Content-Type", "application/json") if config.Logger == nil { config.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{Open: config.Debug}) } return &Gateway{ config: config, httpClient: c, closeChan: make(chan struct{}), commandMessageChan: make(chan protocal.CloudSend), cmdList: gmap.New(true), } } type Gateway struct { config *config.Config httpClient *ghttp.Client mqttClient mqtt.Client deviceId int64 deviceKey string deviceSecret string deviceIdentifier string accessToken []byte accessAddr string closeChan chan struct{} commandMessageChan chan protocal.CloudSend reportCommandCb DeviceReportCommandCb cmdList *gmap.Map } func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) { a.reportCommandCb = cb } // Register 接入网关向平台注册 func (a *Gateway) Register() (*schema.RegisterData, error) { params := &schema.RegisterRequestParams{ ProductKey: a.config.ProductKey, DeviceCode: a.config.DeviceCode, Version: a.config.Version, } resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/registration", params) if err != nil { a.config.Logger.Trace(context.Background(), "请求服务器失败:%s", err.Error()) return nil, spErr.ErrRegisterToServer } var response schema.RegisterResponse err = json.Unmarshal(resp.ReadAll(), &response) if err != nil { a.config.Logger.Trace(context.Background(), "%s", err.Error()) return nil, spErr.ErrResponseFromServer } if response.Code != 0 { return nil, errors.New(response.Message) } a.config.Logger.Trace(context.Background(), "网关注册结果:%+v", response.Data) a.deviceId = response.Data.DeviceId a.deviceIdentifier = response.Data.DeviceIdentifier a.deviceKey = response.Data.DeviceKey a.deviceSecret = response.Data.DeviceSecret return &response.Data, nil } // Authentication 验证设备,并获取接入服务 func (a *Gateway) Authentication() (*schema.DeviceAuthData, error) { if a.deviceSecret == "" || a.deviceId == 0 || a.deviceKey == "" || a.deviceIdentifier == "" { return nil, spErr.ErrDeviceNotRegister } params := &schema.AuthRequestParams{ DeviceId: a.deviceId, DeviceSecret: a.deviceSecret, Protocol: string(a.config.Protocol), } resp, err := a.httpClient.Post(a.config.SparrowServer+"/v1/devices/authentication", params) if err != nil { a.config.Logger.Trace(context.Background(), "%s", err.Error()) return nil, spErr.ErrAutoToServer } var result schema.LoginResponse err = json.Unmarshal(resp.ReadAll(), &result) if err != nil { a.config.Logger.Trace(context.Background(), "%s", err.Error()) return nil, spErr.ErrResponseFromServer } if result.Code != 0 { return nil, errors.New(result.Message) } a.config.Logger.Trace(context.Background(), "网关认证结果:%+v", result.Data) token, err := hex.DecodeString(result.Data.AccessToken) if err != nil { return nil, err } a.accessToken = token a.accessAddr = result.Data.AccessAddr return &result.Data, nil } // Connect 接入平台,会阻塞主进程 func (a *Gateway) Connect() { var url string if a.config.UseTls { url = fmt.Sprintf("ssl://%s", a.accessAddr) } else { url = fmt.Sprintf("tcp://%s", a.accessAddr) } opts := mqtt.NewClientOptions().AddBroker(url) clientId := fmt.Sprintf("%x", a.deviceId) opts.SetClientID(clientId) opts.SetPassword(hex.EncodeToString(a.accessToken)) opts.SetAutoReconnect(true) opts.SetOnConnectHandler(func(client mqtt.Client) { a.config.Logger.Trace(context.Background(), "%s", "成功接入平台") }) if a.config.UseTls { cert, err := tls.LoadX509KeyPair(a.config.CaFile, a.config.KeyFile) if err != nil { panic(err) } opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true}) } opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error()) }) opts.SetDefaultPublishHandler(func(client mqtt.Client, message mqtt.Message) { switch message.Topic() { case "c": a.commandHandler(message) //case "s": // a.statusHandler(message) } }) opts.CredentialsProvider = func() (username string, password string) { _, _ = a.Authentication() return clientId, hex.EncodeToString(a.accessToken) } opts.SetKeepAlive(30 * time.Second) c := mqtt.NewClient(opts) a.mqttClient = c go func() { if token := c.Connect(); token.Wait() && token.Error() != nil { a.config.Logger.Trace(context.Background(), "%s", token.Error().Error()) return } }() for { select { case <-a.closeChan: c.Disconnect(250) } } } // Close 关闭 func (a *Gateway) Close() { close(a.closeChan) } // ReportStatus 对应平台v2版本 func (a *Gateway) ReportStatus(subDeviceId, cmd string, params interface{}) error { data := &protocal.DevReport{ Action: "devSend", MsgId: 1, TimeStamp: time.Now().Unix(), SubDeviceId: subDeviceId, DeviceCode: a.config.DeviceCode, Data: &protocal.Data{ Cmd: cmd, Params: params, }, } payload, err := json.Marshal(data) if err != nil { return err } fmt.Println(fmt.Sprintf("payload:%s", string(payload))) a.mqttClient.Publish("s", 1, false, payload) return nil } func (a *Gateway) commandHandler(message mqtt.Message) { j, err := gjson.DecodeToJson(message.Payload()) if err != nil { a.config.Logger.Trace(context.Background(), "error message format :%s", err.Error()) return } var msg protocal.CloudSend if err = j.Struct(&msg); err == nil { if msg.Data.Cmd == "report" && a.reportCommandCb != nil { if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil { panic(err) } return } a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd) if a.cmdList.Contains(msg.Data.Cmd) { f := a.cmdList.Get(msg.Data.Cmd) if err = f.(CmdCallbackFun)(msg); err != nil { a.config.Logger.Trace(context.Background(), "执行指令失败:%s", msg.Data.Cmd) } } select { case a.commandMessageChan <- msg: case <-time.After(5 * time.Second): a.config.Logger.Trace(context.Background(), "command message write timeout") } } else { return } } // RecvCommand recv a command message from channel // Deprecated func (a *Gateway) RecvCommand() <-chan protocal.CloudSend { return a.commandMessageChan } // RegisterCommand 注册指令回调 func (a *Gateway) RegisterCommand(cmd string, f CmdCallbackFun) error { if a.cmdList.Contains(cmd) { return errors.New("重复注册") } a.cmdList.Set(cmd, f) return nil } // SubDeviceLogin 子设备上线 func (a *Gateway) SubDeviceLogin(deviceCode, subDeviceId string) error { data := &protocal.DevLogin{ Action: "devLogin", MsgId: 1, DeviceCode: deviceCode, SubDeviceId: subDeviceId, Timestamp: time.Now().Unix(), } payload, err := json.Marshal(data) if err != nil { return err } a.mqttClient.Publish("s", 1, false, payload) return nil } // SubDeviceLogout 子设备下线 func (a *Gateway) SubDeviceLogout(deviceCode, subDeviceId string) error { data := &protocal.DevLogin{ Action: "devLogout", MsgId: 1, DeviceCode: deviceCode, SubDeviceId: subDeviceId, Timestamp: time.Now().Unix(), } payload, err := json.Marshal(data) if err != nil { return err } a.mqttClient.Publish("s", 1, false, payload) return nil }