Browse Source

更新查询电压电流指令

liuxiulin 2 years ago
parent
commit
132216e3e6

+ 4 - 0
protocol/const.go

@@ -26,4 +26,8 @@ const (
 
 	// MessageHeaderSize 消息头大小
 	MessageHeaderSize = 12
+
+	IsPower = 1
+	IsVData = 2
+	IsIData = 3
 )

+ 1 - 1
protocol/dlt645_0x33333433.go

@@ -57,7 +57,7 @@ func (entity *Dlt_0x33333433) Decode(ctx *PacketContext, dataByte []byte, data *
 
 	glog.Debugf("数据读取成功:表号:%s,正向有功总电能:%2f kWh", entity.DeviceID, entity.WH)
 	data.ActivePower = entity.WH
-	ctx.SetReportPower(true)
+	data.DataType = IsPower
 	return err
 }
 

+ 1 - 4
protocol/dlt645_0x33343435.go

@@ -56,10 +56,7 @@ func (entity *Dlt_0x33343435) Decode(ctx *PacketContext, dataByte []byte, data *
 	}
 
 	glog.Debugf("数据读取成功:表号:%s,当前A相电压:%2f", entity.DeviceID, entity.VoltageA)
-
-	ctx.SetAV(entity.VoltageA)
-	ctx.SetViCount()
-
+	data.DataType = IsVData
 	return nil
 }
 

+ 1 - 0
protocol/dlt645_0x33343535.go

@@ -56,6 +56,7 @@ func (entity *Dlt_0x33343535) Decode(ctx *PacketContext, dataByte []byte, data *
 	}
 	glog.Debugf("数据读取成功:表号:%s,当前A相电流:%2f", entity.DeviceID, entity.CurrentA)
 	data.AI = entity.CurrentA
+	data.DataType = IsIData
 	return nil
 }
 

+ 8 - 0
protocol/message.go

@@ -21,8 +21,10 @@ type DHeader struct {
 }
 
 type Data struct {
+	DataType int
 	PowerData
 	VIData
+	IData
 }
 
 type PowerData struct {
@@ -38,6 +40,12 @@ type VIData struct {
 	CI float64 `json:"ci"` // C相电流
 }
 
+type IData struct {
+	AI2 float64 `json:"ai"` // A相电流
+	BI2 float64 `json:"bi"` // B相电流
+	CI2 float64 `json:"ci"` // C相电流
+}
+
 // 协议编码
 func (message *Message) Encode() ([]byte, error) {
 	// 编码消息体

+ 88 - 122
server/client.go

@@ -2,7 +2,6 @@ package server
 
 import (
 	"dlt645-server/protocol"
-	"github.com/gogf/gf/frame/g"
 	"github.com/gogf/gf/net/gtcp"
 	"github.com/gogf/gf/os/glog"
 	"io"
@@ -34,57 +33,73 @@ func NewClient(s *Server, conn *gtcp.Conn) *Client {
 	}
 }
 
-func (c *Client) ReadLoop(ctx *protocol.PacketContext, data *protocol.Data) {
+func (c *Client) SendLoop(ctx *protocol.PacketContext, data *protocol.Data) {
 	defer c.srv.grWG.Done()
+
 	for {
-		buf, err := c.conn.Recv(-1)
-		if err != nil {
-			c.readError(err)
-			return
-		}
-		if len(buf) > 0 {
-			_, err := c.srv.message.Decode(ctx, buf, data)
+		select {
+		case buf := <-c.sendChan:
+			err := c.send(buf)
 			if err != nil {
-				glog.Errorf("解析报文失败:%s", err.Error())
-			}
-			if !c.isReg {
-				c.SetId(ctx.GetId())
-				c.isReg = true
+				if err != nil {
+					glog.Errorf("指令发送失败:%s", err.Error())
+				}
+				continue
 			}
-		}
-	}
-}
+			timer := time.NewTimer(5 * time.Second)
+			for {
+				select {
+				case <-timer.C:
+					glog.Errorf("读取超时:%s", err.Error())
+					continue
+				default:
+					receiveBuf, err := c.conn.Recv(-1)
+					if err != nil {
+						c.readError(err)
+						continue
+					}
+					if len(buf) > 0 {
+						result, err := c.srv.message.Decode(ctx, receiveBuf, data)
+						if err != nil {
+							glog.Errorf("解析报文失败:%s", err.Error())
+						}
+						if !c.isReg {
+							c.SetId(ctx.GetId())
+							c.isReg = true
+							continue
+						}
+						var reportData interface{}
+						switch result.DataType {
+						case protocol.IsPower:
+							reportData = protocol.PowerData{
+								ActivePower: result.ActivePower,
+							}
+						case protocol.IsVData:
+							reportData = protocol.VIData{
+								AV: result.AV,
+								BV: result.BV,
+								CV: result.CV,
+							}
+						case protocol.IsIData:
+							reportData = protocol.IData{
+								AI2: result.AI,
+								BI2: result.BI,
+								CI2: result.CI,
+							}
+						default:
+							continue
+						}
+						if err := c.srv.ReportStatus(c.Id, reportData); err != nil {
+							glog.Errorf("数据上报发送错误:%s", err.Error())
+							continue
+						}
+					}
+				}
 
-func (c *Client) ReportPower(data *protocol.Data) {
-	defer c.srv.grWG.Done()
-	for {
-		reportData := new(protocol.PowerData)
-		reportData.ActivePower = data.ActivePower
-		if err := c.srv.ReportStatus(c.Id, reportData); err != nil {
-			c.readError(err)
-			return
-		}
-		time.Sleep(time.Duration(g.Cfg().GetInt("Server.PowerFrequency")) * time.Second)
-	}
-}
+			}
 
-func (c *Client) ReportVIData(data *protocol.Data) {
-	defer c.srv.grWG.Done()
-	for {
-		reportData := new(protocol.VIData)
-		reportData.AV = data.AV
-		reportData.BV = data.BV
-		reportData.CV = data.CV
-		reportData.AI = data.AI
-		reportData.BI = data.BI
-		reportData.CI = data.CI
-		if err := c.srv.ReportStatus(c.Id, reportData); err != nil {
-			c.readError(err)
-			return
 		}
-		time.Sleep(time.Duration(g.Cfg().GetInt("Server.VIFrequency")) * time.Second)
 	}
-
 }
 
 func (c *Client) SetId(id string) {
@@ -136,86 +151,54 @@ func (c *Client) send(buf []byte) error {
 	return nil
 }
 
-func (c *Client) GetActivePower(ctx *protocol.PacketContext, powerChan chan struct{}) {
+func (c *Client) GetActivePower(ctx *protocol.PacketContext) {
 	defer c.srv.grWG.Done()
 	for {
-		<-powerChan
 		entity := protocol.Dlt_0x33333433{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 
 }
 
-func (c *Client) GetAV(ctx *protocol.PacketContext, avChan, aiChan chan struct{}) {
+func (c *Client) GetAV(ctx *protocol.PacketContext) {
 	defer c.srv.grWG.Done()
 	for {
-		<-avChan
 		entity := protocol.Dlt_0x33343435{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(2 * time.Second)
-		aiChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
 
-func (c *Client) GetAI(ctx *protocol.PacketContext, aiChan, bvChan chan struct{}) {
+func (c *Client) GetAI(ctx *protocol.PacketContext) {
 	defer c.srv.grWG.Done()
 	for {
-		<-aiChan
 		entity := protocol.Dlt_0x33343535{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(2 * time.Second)
-		bvChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
 
 func (c *Client) GetBV(ctx *protocol.PacketContext, bvChan, biChan chan struct{}) {
 	defer c.srv.grWG.Done()
 	for {
-		<-bvChan
 		entity := protocol.Dlt_0x33353435{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(2 * time.Second)
-		biChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
+
 func (c *Client) GetBI(ctx *protocol.PacketContext, biChan, cvChan chan struct{}) {
 	defer c.srv.grWG.Done()
 	for {
-		<-biChan
 		entity := protocol.Dlt_0x33353535{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(2 * time.Second)
-		cvChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
 func (c *Client) GetCV(ctx *protocol.PacketContext, cvChan, ciChan chan struct{}) {
@@ -223,15 +206,9 @@ func (c *Client) GetCV(ctx *protocol.PacketContext, cvChan, ciChan chan struct{}
 	for {
 		<-cvChan
 		entity := protocol.Dlt_0x33363435{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(2 * time.Second)
-		ciChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
 func (c *Client) GetCI(ctx *protocol.PacketContext, ciChan, powerChan chan struct{}) {
@@ -239,25 +216,14 @@ func (c *Client) GetCI(ctx *protocol.PacketContext, ciChan, powerChan chan struc
 	for {
 		<-ciChan
 		entity := protocol.Dlt_0x33363535{}
-		if ctx.GetReceiveAddress() != nil {
-			sendByte, _ := entity.Encode(ctx)
-			err := c.send(sendByte)
-			if err != nil {
-				glog.Debugf("指令发送失败:%s", err.Error())
-			}
-		}
-		time.Sleep(time.Duration(g.Cfg().GetInt("Server.PowerFrequency")-5) * time.Second)
-		powerChan <- struct{}{}
+		sendByte, _ := entity.Encode(ctx)
+		c.sendChan <- sendByte
+		time.Sleep(10 * time.Second)
 	}
 }
 
-func (c *Client) SendGetAddress(ctx *protocol.PacketContext, addressChan, avChan chan struct{}) {
-	defer c.srv.grWG.Done()
-	<-addressChan
-	bytea := []byte{0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
-	_ = c.send(bytea)
-	for {
-		time.Sleep(time.Duration(g.Cfg().GetInt("Server.VIFrequency")) * time.Second)
-		avChan <- struct{}{}
-	}
+func (c *Client) SendGetAddress(ctx *protocol.PacketContext) []byte {
+	entity := new(protocol.Dlt_0x93)
+	sendBuf, _ := entity.Encode(ctx)
+	return sendBuf
 }

+ 5 - 25
server/server.go

@@ -50,35 +50,15 @@ func (s *Server) onClientConnect(conn *gtcp.Conn) {
 		glog.Debugf("客户端断开:%s", id)
 	}
 	ctx := new(protocol.PacketContext)
+	c.sendChan <- c.SendGetAddress(ctx)
 	data := new(protocol.Data)
-	go c.ReadLoop(ctx, data)
+	go c.SendLoop(ctx, data)
 
-	addressChan := make(chan struct{}, 1)
-	avChan := make(chan struct{}, 1)
-	aiChan := make(chan struct{}, 1)
-	bvChan := make(chan struct{}, 1)
-	biChan := make(chan struct{}, 1)
-	cvChan := make(chan struct{}, 1)
-	ciChan := make(chan struct{}, 1)
-	powerChan := make(chan struct{}, 1)
+	go c.GetActivePower(ctx)
 
-	addressChan <- struct{}{}
+	go c.GetAV(ctx)
 
-	go c.SendGetAddress(ctx, addressChan, avChan)
-
-	go c.GetActivePower(ctx, powerChan)
-
-	go c.GetAV(ctx, avChan, aiChan)
-
-	go c.GetAI(ctx, aiChan, bvChan)
-
-	go c.GetBV(ctx, bvChan, biChan)
-
-	go c.GetBI(ctx, biChan, cvChan)
-
-	go c.GetCV(ctx, cvChan, ciChan)
-
-	go c.GetCI(ctx, ciChan, powerChan)
+	go c.GetAI(ctx)
 
 }