Forráskód Böngészése

process connect and disconnect

lijian 1 éve
szülő
commit
50eebe42ea

+ 3 - 4
pkg/mqtt/connection.go

@@ -205,10 +205,9 @@ func (c *Connection) RcvMsgFromClient() {
 				return
 			}
 			args := rpcs.ArgsGetOnline{
-				Id:            device.DeviceIdentifier,
-				ClientIP:      host,
-				AccessRPCHost: server.GetRPCHost(),
-				//HeartbeatInterval: uint32(c.KeepAlive),
+				Id:                device.DeviceIdentifier,
+				ClientIP:          host,
+				AccessRPCHost:     server.GetRPCHost(),
 				HeartbeatInterval: 300,
 			}
 

+ 15 - 0
pkg/online/online.go

@@ -63,6 +63,21 @@ func (mgr *Manager) GetOnline(id string, status Status) error {
 	return nil
 }
 
+func (mgr *Manager) GetOnlineV2(id string, status Status) error {
+	key := KeyPrefix + id
+	buffStr := gconv.String(&status)
+	_, err := mgr.redisClient.Do("SET", key, buffStr)
+	if err != nil {
+		return err
+	}
+	_, err = mgr.redisClient.Do("EXPIRE", key, -1)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
 func (mgr *Manager) SetHeartbeat(id string) error {
 	status, err := mgr.GetStatus(id)
 	if err != nil {

+ 1 - 0
pkg/protocol/emqx_device.go

@@ -5,4 +5,5 @@ type DevConnectStatus struct {
 	Reason     string
 	Action     string
 	DeviceId   string
+	ClientIp   string
 }

+ 8 - 0
services/devicemanager/manager.go

@@ -42,6 +42,14 @@ func (dm *DeviceManager) GetOnline(args rpcs.ArgsGetOnline, reply *rpcs.ReplyGet
 	})
 }
 
+func (dm *DeviceManager) GetOnlineV2(args rpcs.ArgsGetOnline, reply *rpcs.ReplyGetOnline) error {
+	return dm.onlineManager.GetOnlineV2(args.Id, online.Status{
+		ClientIP:          args.ClientIP,
+		AccessRPCHost:     args.AccessRPCHost,
+		HeartbeatInterval: args.HeartbeatInterval,
+	})
+}
+
 func (dm *DeviceManager) HeartBeat(args rpcs.ArgsHeartBeat, reply *rpcs.ReplyHeartBeat) error {
 	return dm.onlineManager.SetHeartbeat(args.Id)
 }

+ 73 - 0
services/emqx-agent/agent.go

@@ -0,0 +1,73 @@
+package main
+
+import (
+	"fmt"
+	"sparrow/pkg/models"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/server"
+)
+
+type Agent struct {
+}
+
+// Message 收到设备上报消息处理
+func (a *Agent) Message(topic string, payload []byte) error {
+	fmt.Printf("%s, %s\r\n", topic, payload)
+	return nil
+}
+
+// Connected 设备接入时
+func (a *Agent) Connected(status *protocol.DevConnectStatus) error {
+	// 查询设备信息
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", status.DeviceId)
+		return nil
+	}
+	args := rpcs.ArgsGetOnline{
+		Id:                status.DeviceId,
+		ClientIP:          status.ClientIp,
+		AccessRPCHost:     server.GetRPCHost(),
+		HeartbeatInterval: 300,
+	}
+	reply := rpcs.ReplyGetOnline{}
+	err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOnlineV2", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
+	}
+	var cReply rpcs.ReplyEmptyResult
+	var cArgs rpcs.ArgsGetStatus
+	cArgs.VendorId = device.VendorID
+	cArgs.Id = args.Id
+	if err = server.RPCCallByName(nil, rpcs.ControllerName, "Controller.Online", &cArgs, &cReply); err != nil {
+		return err
+	}
+	return nil
+}
+
+// Disconnected 设备断开连接时
+func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
+	// 查询设备信息
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByIdentifier", status.DeviceId, device)
+	if err != nil {
+		server.Log.Errorf("device not found %s", status.DeviceId)
+		return nil
+	}
+	args := rpcs.ArgsGetOffline{
+		Id:       status.DeviceId,
+		VendorId: device.VendorID,
+	}
+	reply := rpcs.ReplyGetOffline{}
+	err = server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetOffline", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device offline error. deviceid: %v, error: %v", status.DeviceId, err)
+	}
+	return err
+}
+
+func NewAgent() *Agent {
+	return &Agent{}
+}

+ 0 - 31
services/emqx-agent/emqx-agent.go

@@ -1,31 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"sparrow/pkg/protocol"
-)
-
-type Agent struct {
-}
-
-// Message 收到设备上报消息处理
-func (a *Agent) Message(topic string, payload []byte) error {
-	fmt.Printf("%s, %s\r\n", topic, payload)
-	return nil
-}
-
-// Connected 设备接入时
-func (a *Agent) Connected(status *protocol.DevConnectStatus) error {
-	fmt.Printf("%v\r\n", status)
-	return nil
-}
-
-// Disconnected 设备断开连接时
-func (a *Agent) Disconnected(status *protocol.DevConnectStatus) error {
-	fmt.Printf("%v\r\n", status)
-	return nil
-}
-
-func NewAgent() *Agent {
-	return &Agent{}
-}

+ 17 - 11
services/emqx-agent/sub_dev.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"encoding/json"
 	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gogf/gf/os/grpool"
 	"runtime"
 	"runtime/debug"
 	"sparrow/pkg/protocol"
@@ -42,7 +43,8 @@ type DevSubHandle interface {
 }
 
 type MqttClient struct {
-	client *client.MqttClient
+	client     *client.MqttClient
+	handlePool *grpool.Pool
 }
 
 const (
@@ -67,7 +69,8 @@ func newEmqClient(conf *client.MqttConfig) (SubDev, error) {
 		return nil, err
 	}
 	return &MqttClient{
-		client: mc,
+		client:     mc,
+		handlePool: grpool.New(1000),
 	}, nil
 }
 
@@ -151,6 +154,7 @@ func (d *MqttClient) subConnectStatus(handle Handle) func(ctx context.Context, t
 		status := protocol.DevConnectStatus{
 			DeviceCode: msg.Username,
 			DeviceId:   msg.Clientid,
+			ClientIp:   msg.Ipaddress,
 		}
 		if strings.HasSuffix(topic, "/connected") {
 			status.Action = "LOGIN"
@@ -166,15 +170,17 @@ func (d *MqttClient) subConnectStatus(handle Handle) func(ctx context.Context, t
 func (d *MqttClient) subscribeWithFunc(cli mqtt.Client, topic string,
 	handle func(ctx context.Context, topic string, payload []byte) error) error {
 	return d.client.Subscribe(cli, topic, 1, func(c mqtt.Client, message mqtt.Message) {
-		go func() {
-			ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
-			defer cancel()
-			Recover(ctx)
-			err := handle(ctx, message.Topic(), message.Payload())
-			if err != nil {
-				server.Log.Errorf("handle failure err :%s, topic :%v", err, topic)
-			}
-		}()
+		_ = d.handlePool.Add(func() {
+			go func() {
+				ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
+				defer cancel()
+				Recover(ctx)
+				err := handle(ctx, message.Topic(), message.Payload())
+				if err != nil {
+					server.Log.Errorf("handle failure err :%s, topic :%v", err, topic)
+				}
+			}()
+		})
 	})
 }