瀏覽代碼

增加上线离线类型

lijian 2 年之前
父節點
當前提交
c71977f22c

+ 24 - 0
pkg/klink/klink.go

@@ -23,6 +23,18 @@ type DevLogin struct {
 	Timestamp   int64  `json:"timestamp"`
 }
 
+func (d *DevLogin) Marshal() ([]byte, error) {
+	return gjson.New(d).ToJson()
+}
+
+func (d *DevLogin) UnMarshal(bytes []byte) error {
+	j, err := gjson.DecodeToJson(bytes)
+	if err != nil {
+		return err
+	}
+	return j.Struct(d)
+}
+
 // DevLogout 子设备下线
 type DevLogout struct {
 	Action      string `json:"action"`
@@ -33,6 +45,18 @@ type DevLogout struct {
 	Timestamp   int64  `json:"timestamp"`
 }
 
+func (d *DevLogout) Marshal() ([]byte, error) {
+	return gjson.New(d).ToJson()
+}
+
+func (d *DevLogout) UnMarshal(bytes []byte) error {
+	j, err := gjson.DecodeToJson(bytes)
+	if err != nil {
+		return err
+	}
+	return j.Struct(d)
+}
+
 // DevSend 设备上报数据帧
 type DevSend struct {
 	Action      string       `json:"action"`

+ 2 - 2
pkg/mqtt/connection.go

@@ -125,7 +125,7 @@ func (c *Connection) Close() {
 	if c.Conn != nil {
 		c.Conn.Close()
 		c.Conn = nil
-		c.Mgr.Provider.OnDeviceOffline(DeviceID)
+		c.Mgr.Provider.OnDeviceOffline(c.DeviceCode, c.VendorId)
 	}
 	if c.SendChan != nil {
 		close(c.SendChan)
@@ -217,7 +217,7 @@ func (c *Connection) RcvMsgFromClient() {
 			c.Submit(connack)
 			c.KeepAlive = msg.KeepAliveTimer
 
-			err = c.Mgr.Provider.OnDeviceOnline(args)
+			err = c.Mgr.Provider.OnDeviceOnline(args, c.VendorId)
 			if err != nil {
 				server.Log.Errorf("device online error : %v", err)
 				c.Close()

+ 2 - 2
pkg/mqtt/provider.go

@@ -7,8 +7,8 @@ import (
 
 type Provider interface {
 	ValidateDeviceToken(deviceid string, token []byte) error
-	OnDeviceOnline(args rpcs.ArgsGetOnline) error
-	OnDeviceOffline(deviceid string) error
+	OnDeviceOnline(args rpcs.ArgsGetOnline, VendorId string) error
+	OnDeviceOffline(deviceid string, vendorId string) error
 	OnDeviceHeartBeat(deviceid string) error
 	OnDeviceMessage(deviceid, vendorId string, msgtype string, message *gjson.Json)
 }

+ 4 - 1
pkg/rpcs/access.go

@@ -10,7 +10,10 @@ type ArgsSetStatus struct {
 }
 type ReplySetStatus ReplyEmptyResult
 
-type ArgsGetStatus ArgsDeviceId
+type ArgsGetStatus struct {
+	Id       string
+	VendorId string
+}
 type ReplyGetStatus struct {
 	Status []protocol.SubData
 }

+ 4 - 1
pkg/rpcs/devicemanager.go

@@ -23,7 +23,10 @@ type ArgsGetOnline struct {
 }
 type ReplyGetOnline ReplyEmptyResult
 
-type ArgsGetOffline ArgsDeviceId
+type ArgsGetOffline struct {
+	Id       string
+	VendorId string
+}
 type ReplyGetOffline ReplyEmptyResult
 
 type ArgsHeartBeat struct {

+ 56 - 9
services/controller/controller.go

@@ -76,21 +76,68 @@ func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStat
 	return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
 }
 
+func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
+	data := gjson.New(nil)
+	_ = data.Set("device_id", args.Id)
+	t := time.Now()
+	msg := &protocol.Message{
+		Id:       guid.S(),
+		Ts:       &t,
+		Type:     protocol.CONNECT_EVENT,
+		Data:     data.MustToJsonString(),
+		Callback: nil,
+		MetaData: map[string]interface{}{
+			"device_id": args.Id,
+			"vendor_id": args.VendorId,
+		},
+		Originator: "device",
+	}
+	tpi := queue.ResolvePartition("RULE_ENGINE",
+		msg.GetQueueName(),
+		args.VendorId,
+		args.Id)
+	g, err := queue.NewGobQueueMessage(msg)
+	if err != nil {
+		return err
+	}
+	g.Headers.Put("tenant_id", []byte(args.VendorId))
+	return c.producer.Send(tpi, g, nil)
+}
+
+func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
+	data := gjson.New(nil)
+	_ = data.Set("device_id", args.Id)
+	t := time.Now()
+	msg := &protocol.Message{
+		Id:       guid.S(),
+		Ts:       &t,
+		Type:     protocol.DISCONNECT_EVENT,
+		Data:     data.MustToJsonString(),
+		Callback: nil,
+		MetaData: map[string]interface{}{
+			"device_id": args.Id,
+			"vendor_id": args.VendorId,
+		},
+		Originator: "device",
+	}
+	tpi := queue.ResolvePartition("RULE_ENGINE",
+		msg.GetQueueName(),
+		args.VendorId,
+		args.Id)
+	g, err := queue.NewGobQueueMessage(msg)
+	if err != nil {
+		return err
+	}
+	g.Headers.Put("tenant_id", []byte(args.VendorId))
+	return c.producer.Send(tpi, g, nil)
+}
+
 func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
 	t := time.Unix(int64(args.Timestamp/1000), 0)
 	data, err := c.processStatusToQueue(args)
 	if err != nil {
 		return err
 	}
-	//reportArgs := &rpcs.ArgsDeviceReport{
-	//	DeviceCode: args.DeviceId,
-	//	Data:       data,
-	//}
-	//var reportReply *rpcs.ReplyEmptyResult
-	//if err = server.RPCCallByName(context.Background(), rpcs.ShadowServiceName,
-	//	"ShadowServer.DeviceReport", reportArgs, reportReply); err != nil {
-	//	server.Log.Error(err)
-	//}
 	msg := &protocol.Message{
 		Id:       guid.S(),
 		Ts:       &t,

+ 22 - 24
services/mqttaccess/mqtt_provider.go

@@ -26,40 +26,38 @@ func (mp *MQTTProvider) ValidateDeviceToken(deviceid string, token []byte) error
 	}
 	return nil
 }
-func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
+func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline, VendorId string) error {
 	reply := rpcs.ReplyGetOnline{}
 	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
 	}
-	//// 拉取期望状态
-	//var desired *rpcs.DeviceDesiredReply
-	//if err = server.RPCCallByName(nil, rpcs.ShadowServiceName,
-	//	"ShadowServer.GetDeviceDesired", args.Id, &desired); err != nil {
-	//	return err
-	//}
-	//j, err := gjson.DecodeToJson(desired.Data)
-	//if err != nil {
-	//	server.Log.Errorf("不正确的期望值格式%s", desired.Data)
-	//} else {
-	//	if j != nil {
-	//		// TODO: 向设备同步期望值
-	//
-	//	}
-	//}
-
+	var cReply rpcs.ReplyEmptyResult
+	var cArgs rpcs.ArgsGetStatus
+	cArgs.VendorId = VendorId
+	cArgs.Id = args.Id
+	if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
+		return err
+	}
 	return err
 }
-func (mp *MQTTProvider) OnDeviceOffline(deviceid string) error {
+func (mp *MQTTProvider) OnDeviceOffline(deviceid string, vendorId string) error {
 	args := rpcs.ArgsGetOffline{
-		Id: deviceid,
+		Id:       deviceid,
+		VendorId: vendorId,
 	}
 	reply := rpcs.ReplyGetOffline{}
 	err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
 	}
-
+	var cReply rpcs.ReplyEmptyResult
+	var cArgs rpcs.ArgsGetStatus
+	cArgs.VendorId = vendorId
+	cArgs.Id = args.Id
+	if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Offline", &cArgs, &cReply); err != nil {
+		return err
+	}
 	return err
 }
 func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid string) error {
@@ -82,10 +80,10 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid, vendorId string, msgtype strin
 			switch act {
 			case klink.DevSendAction:
 				processReportStatus(deviceid, vendorId, message)
-			case klink.DevLoginAction:
-				_ = processDevLogin(deviceid)
-			case klink.DevLogoutAction:
-				_ = processDevLogout(deviceid)
+				//case klink.DevLoginAction:
+				//	_ = processDevLogin(deviceid)
+				//case klink.DevLogoutAction:
+				//	_ = processDevLogout(deviceid)
 			}
 		}
 	case "e":

+ 26 - 5
tests/device/device.go

@@ -307,6 +307,25 @@ func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
 	}
 }
 
+// 子设备上线
+func (d *Device) subDeviceLogin(client *MQTT.Client) {
+	for {
+		dd := &klink.DevLogin{
+			Action:      "devLogin",
+			MsgId:       1,
+			SubDeviceId: "5566-0",
+			Timestamp:   time.Now().Unix(),
+			DeviceCode:  "5566",
+		}
+		payload, err := dd.Marshal()
+		if err != nil {
+			return
+		}
+		client.Publish("s", 1, false, payload)
+		time.Sleep(3 * time.Second)
+	}
+}
+
 func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
 	j, err := gjson.DecodeToJson(msg.Payload())
 	if err != nil {
@@ -363,9 +382,9 @@ func (d *Device) doCoAPAccess() error {
 
 func (d *Device) doMQTTAccess() error {
 	logger := log.New(os.Stdout, "", log.LstdFlags)
-	MQTT.ERROR = logger
-	MQTT.CRITICAL = logger
-	MQTT.WARN = logger
+	//MQTT.ERROR = logger
+	//MQTT.CRITICAL = logger
+	//MQTT.WARN = logger
 	MQTT.DEBUG = logger
 
 	//create a ClientOptions struct setting the broker address, clientid, turn
@@ -387,9 +406,11 @@ func (d *Device) doMQTTAccess() error {
 			return
 		}
 	}()
+	// 子设备上线
+	go d.subDeviceLogin(c)
 	go d.reportStatus(c)
-	go d.reportStatus2(c)
-	go d.reportEvent(c)
+	//go d.reportStatus2(c)
+	//go d.reportEvent(c)
 	// we just pause here to wait for messages
 	<-make(chan int)
 

+ 2 - 11
tests/device/main.go

@@ -3,19 +3,16 @@ package main
 import (
 	"flag"
 	"fmt"
-	"github.com/gogf/gf/frame/g"
-	"github.com/gogf/gf/net/ghttp"
 )
 
 var (
-	testURL        = flag.String("url", "http://192.168.0.224:18100", "login url")
+	testURL        = flag.String("url", "http://192.168.0.160:8088", "login url")
 	testProductKey = flag.String("productkey", "958daf8b3a533f0d9516ac8fd17ef0cb06b439e664787a2a89608a10eeee8eb3c35c82c505d19f8a4417e530de0678fd", "product key")
 	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )
 
 func main() {
 	flag.Parse()
-
 	if *testProductKey == "" {
 		fmt.Println("product key not provided. use -productkey flag")
 		return
@@ -39,11 +36,5 @@ func main() {
 		fmt.Printf("device access error %s", err)
 		return
 	}
-	go func() {
-		s := g.Server()
-		s.BindHandler("/", func(r *ghttp.Request) {
-			r.Response.Write("哈喽世界!")
-		})
-		s.Run()
-	}()
+
 }