|
@@ -2,11 +2,13 @@ package v2
|
|
|
|
|
|
import (
|
|
import (
|
|
"context"
|
|
"context"
|
|
|
|
+ "crypto/tls"
|
|
"encoding/hex"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"errors"
|
|
"errors"
|
|
"fmt"
|
|
"fmt"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
|
|
+ "github.com/gogf/gf/container/gmap"
|
|
"github.com/gogf/gf/encoding/gjson"
|
|
"github.com/gogf/gf/encoding/gjson"
|
|
"github.com/gogf/gf/net/ghttp"
|
|
"github.com/gogf/gf/net/ghttp"
|
|
"log"
|
|
"log"
|
|
@@ -23,11 +25,17 @@ type CmdMessage struct {
|
|
Cmd string
|
|
Cmd string
|
|
Params interface{}
|
|
Params interface{}
|
|
}
|
|
}
|
|
|
|
+type CmdCallbackFun func(msg protocal.CloudSend) error
|
|
|
|
|
|
// DeviceReportCommandCb 云平台下发的上报指令回调
|
|
// DeviceReportCommandCb 云平台下发的上报指令回调
|
|
type DeviceReportCommandCb func(deviceCode, subId string) error
|
|
type DeviceReportCommandCb func(deviceCode, subId string) error
|
|
|
|
|
|
func NewGateway(config *config.Config) *Gateway {
|
|
func NewGateway(config *config.Config) *Gateway {
|
|
|
|
+ if config.UseTls {
|
|
|
|
+ if config.CaFile == "" || config.KeyFile == "" {
|
|
|
|
+ panic("use tls: CaFile and CaKey must be provide")
|
|
|
|
+ }
|
|
|
|
+ }
|
|
c := ghttp.NewClient()
|
|
c := ghttp.NewClient()
|
|
c.SetHeader("Content-Type", "application/json")
|
|
c.SetHeader("Content-Type", "application/json")
|
|
if config.Logger == nil {
|
|
if config.Logger == nil {
|
|
@@ -39,6 +47,7 @@ func NewGateway(config *config.Config) *Gateway {
|
|
httpClient: c,
|
|
httpClient: c,
|
|
closeChan: make(chan struct{}),
|
|
closeChan: make(chan struct{}),
|
|
commandMessageChan: make(chan protocal.CloudSend),
|
|
commandMessageChan: make(chan protocal.CloudSend),
|
|
|
|
+ cmdList: gmap.New(true),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -58,6 +67,7 @@ type Gateway struct {
|
|
closeChan chan struct{}
|
|
closeChan chan struct{}
|
|
commandMessageChan chan protocal.CloudSend
|
|
commandMessageChan chan protocal.CloudSend
|
|
reportCommandCb DeviceReportCommandCb
|
|
reportCommandCb DeviceReportCommandCb
|
|
|
|
+ cmdList *gmap.Map
|
|
}
|
|
}
|
|
|
|
|
|
func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) {
|
|
func (a *Gateway) SetReportCommandCallback(cb DeviceReportCommandCb) {
|
|
@@ -129,7 +139,13 @@ func (a *Gateway) Authentication() (*schema.DeviceAuthData, error) {
|
|
|
|
|
|
// Connect 接入平台,会阻塞主进程
|
|
// Connect 接入平台,会阻塞主进程
|
|
func (a *Gateway) Connect() {
|
|
func (a *Gateway) Connect() {
|
|
- opts := mqtt.NewClientOptions().AddBroker(fmt.Sprintf("tcp://%s", a.accessAddr))
|
|
|
|
|
|
+ var url string
|
|
|
|
+ if a.config.UseTls {
|
|
|
|
+ url = fmt.Sprintf("ssl://%s", a.accessAddr)
|
|
|
|
+ } else {
|
|
|
|
+ url = fmt.Sprintf("tcp://%s", a.accessAddr)
|
|
|
|
+ }
|
|
|
|
+ opts := mqtt.NewClientOptions().AddBroker(url)
|
|
clientId := fmt.Sprintf("%x", a.deviceId)
|
|
clientId := fmt.Sprintf("%x", a.deviceId)
|
|
opts.SetClientID(clientId)
|
|
opts.SetClientID(clientId)
|
|
opts.SetPassword(hex.EncodeToString(a.accessToken))
|
|
opts.SetPassword(hex.EncodeToString(a.accessToken))
|
|
@@ -137,6 +153,13 @@ func (a *Gateway) Connect() {
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
a.config.Logger.Trace(context.Background(), "%s", "成功接入平台")
|
|
a.config.Logger.Trace(context.Background(), "%s", "成功接入平台")
|
|
})
|
|
})
|
|
|
|
+ if a.config.UseTls {
|
|
|
|
+ cert, err := tls.LoadX509KeyPair(a.config.CaFile, a.config.KeyFile)
|
|
|
|
+ if err != nil {
|
|
|
|
+ panic(err)
|
|
|
|
+ }
|
|
|
|
+ opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true})
|
|
|
|
+ }
|
|
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
|
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
|
a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error())
|
|
a.config.Logger.Trace(context.Background(), "与平台断开连接[%s]!", err.Error())
|
|
})
|
|
})
|
|
@@ -208,8 +231,15 @@ func (a *Gateway) commandHandler(message mqtt.Message) {
|
|
if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil {
|
|
if err = a.reportCommandCb(msg.DeviceCode, msg.SubDeviceId); err != nil {
|
|
panic(err)
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
+ return
|
|
}
|
|
}
|
|
a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd)
|
|
a.config.Logger.Trace(context.Background(), "gateway receiving command:%+v", msg.Data.Cmd)
|
|
|
|
+ if a.cmdList.Contains(msg.Data.Cmd) {
|
|
|
|
+ f := a.cmdList.Get(msg.Data.Cmd)
|
|
|
|
+ if err = f.(CmdCallbackFun)(msg); err != nil {
|
|
|
|
+ a.config.Logger.Trace(context.Background(), "执行指令失败:%s", msg.Data.Cmd)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
select {
|
|
select {
|
|
case a.commandMessageChan <- msg:
|
|
case a.commandMessageChan <- msg:
|
|
case <-time.After(5 * time.Second):
|
|
case <-time.After(5 * time.Second):
|
|
@@ -221,6 +251,50 @@ func (a *Gateway) commandHandler(message mqtt.Message) {
|
|
}
|
|
}
|
|
|
|
|
|
// RecvCommand recv a command message from channel
|
|
// RecvCommand recv a command message from channel
|
|
|
|
+// Deprecated
|
|
func (a *Gateway) RecvCommand() <-chan protocal.CloudSend {
|
|
func (a *Gateway) RecvCommand() <-chan protocal.CloudSend {
|
|
return a.commandMessageChan
|
|
return a.commandMessageChan
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+// RegisterCommand 注册指令回调
|
|
|
|
+func (a *Gateway) RegisterCommand(cmd string, f CmdCallbackFun) error {
|
|
|
|
+ if a.cmdList.Contains(cmd) {
|
|
|
|
+ return errors.New("重复注册")
|
|
|
|
+ }
|
|
|
|
+ a.cmdList.Set(cmd, f)
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// SubDeviceLogin 子设备上线
|
|
|
|
+func (a *Gateway) SubDeviceLogin(deviceCode, subDeviceId string) error {
|
|
|
|
+ data := &protocal.DevLogin{
|
|
|
|
+ Action: "devLogin",
|
|
|
|
+ MsgId: 1,
|
|
|
|
+ DeviceCode: deviceCode,
|
|
|
|
+ SubDeviceId: subDeviceId,
|
|
|
|
+ Timestamp: time.Now().Unix(),
|
|
|
|
+ }
|
|
|
|
+ payload, err := json.Marshal(data)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ a.mqttClient.Publish("s", 1, false, payload)
|
|
|
|
+ return nil
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// SubDeviceLogout 子设备下线
|
|
|
|
+func (a *Gateway) SubDeviceLogout(deviceCode, subDeviceId string) error {
|
|
|
|
+ data := &protocal.DevLogin{
|
|
|
|
+ Action: "devLogout",
|
|
|
|
+ MsgId: 1,
|
|
|
|
+ DeviceCode: deviceCode,
|
|
|
|
+ SubDeviceId: subDeviceId,
|
|
|
|
+ Timestamp: time.Now().Unix(),
|
|
|
|
+ }
|
|
|
|
+ payload, err := json.Marshal(data)
|
|
|
|
+ if err != nil {
|
|
|
|
+ return err
|
|
|
|
+ }
|
|
|
|
+ a.mqttClient.Publish("s", 1, false, payload)
|
|
|
|
+ return nil
|
|
|
|
+}
|