package server import ( "bytes" "errors" "fmt" "github.com/gogf/gf/v2/encoding/gbinary" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/glog" "io" "net" "strings" "syscall" "time" "yx-4g-gateway/address" "yx-4g-gateway/modbus" "yx-4g-gateway/schema" ) const ( FAN_1_OPEN = 1 << 0 FAN_2_OPEN = 1 << 1 FAN_3_OPEN = 1 << 2 FAN_4_OPEN = 1 << 3 FAN_5_OPEN = 1 << 4 ) type Client struct { Id string srv *Server conn *gtcp.Conn sendChan chan []byte closeChan chan struct{} closeHandler func(id string, c *Client) regHandler func(id string, c *Client) isReg bool receiveData []byte } func NewClient(s *Server, conn *gtcp.Conn) *Client { return &Client{ srv: s, conn: conn, sendChan: make(chan []byte), closeChan: make(chan struct{}), receiveData: make([]byte, 0), } } func (c *Client) SetId(id string) { c.Id = id } func (c *Client) SendLoop() { ctx := c.srv.ctx for { select { case buf := <-c.sendChan: err := c.send(buf) if err != nil { g.Log().Errorf(ctx, "指令发送失败:%s", err.Error()) continue } timer := time.NewTimer(5 * time.Second) for { select { case <-c.closeChan: return case <-timer.C: g.Log().Errorf(ctx, "接收指令超时") break default: receiveBuf, err := c.conn.Recv(0) if err != nil { c.readError(err) break } if !c.isReg { id := gbinary.DecodeToString(receiveBuf) g.Log().Debugf(ctx, "收到注册包!id:%s", id) c.SetId(id) c.isReg = true if c.regHandler != nil { c.regHandler(c.Id, c) } continue } if c.Id == "863569064249469" { g.Log().Debugf(ctx, "ID:%s 收到数据:%2X", c.Id, receiveBuf) } c.receiveData = receiveBuf //for v := range receiveBuf { // c.receiveQueue.Push(v) // glog.Debugf("队列数据:%2X", c.receiveQueue) //} //glog.Debugf("receiveQueue长度:%d", c.receiveQueue.Len()) if err := c.readQueue(); err != nil { g.Log().Errorf(ctx, "处理数据失败:%s", err.Error()) break } } break } } } } func (c *Client) readQueue() error { //for i := 0; i < len(c.receiveData)-2; i++ { // if c.receiveData[i] == 0x02 && c.receiveData[i+1] == 0x03 && c.receiveData[i+2] == 0xa8 { // c.receiveData = c.receiveData[i:] // if len(c.receiveData) >= int(c.receiveData[2])+5 { // c.receiveData = c.receiveData[:int(c.receiveData[i+2])+5] // return c.decodeAndReport(c.receiveData) // } else { // return nil // } // // } //} //c.receiveData = []byte{} //return nil if bytes.Equal(c.receiveData[:3], []byte{0x02, 0x03, 0x04}) { if len(c.receiveData) >= int(c.receiveData[2])+5 { return c.decodeAndReport(c.receiveData) } } return nil //if bytes.Equal(c.receiveData[:3], []byte{0x02, 0x03, 0x04}) { // if len(c.receiveData) >= int(c.receiveData[2])+5 { // c.receiveData = c.receiveData[:int(c.receiveData[2]+5)] // if err := c.decodeAndReport(c.receiveData); err != nil { // return err // } // c.receiveData = c.receiveData[:0] // } // return nil //} else { // c.receiveData = c.receiveData[:0] // return nil //} //for { // if c.receiveQueue.Len() > 3 { // glog.Debugf("11111111111111111") // a := FormatInterfaceToByte(c.receiveQueue.Pop()) // b := FormatInterfaceToByte(c.receiveQueue.Pop()) // d := FormatInterfaceToByte(c.receiveQueue.Pop()) // if a == 0x1 && b == 0x03 && d == 0xa8 && len(c.dataByteArray) == 0 { // packageBytes = append(packageBytes, a) // packageBytes = append(packageBytes, b) // packageBytes = append(packageBytes, d) // // } else { // return // } // } else { // return // } // for { // if count == 3 { // glog.Debugf("2222222222222222222") // size := int(packageBytes[2]) + 2 // for { // c.dataByteArray = append(c.dataByteArray, packageBytes...) // for i := 0; i < size; i++ { // if c.receiveQueue.Len() > 0 { // d := c.receiveQueue.Pop() // c.dataByteArray = append(c.dataByteArray, FormatInterfaceToByte(d)) // if i == size-1 { // err := c.decodeAndReport(c.dataByteArray) // glog.Debugf("处理数据失败:%s", err.Error()) // c.dataByteArray = c.dataByteArray[:0] // return // } // } // } // } // count = 0 // } // } //} } func (c *Client) decodeAndReport(buf []byte) error { length := len(buf) var crc modbus.Crc crc.Reset().PushBytes(buf[0 : length-2]) checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2]) if checksum != crc.Value() { return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.Value())) } result := buf[3 : length-2] data := make(map[int][]byte) var index, dIndex int var newBuf []byte for _, b := range result { index += 1 newBuf = append(newBuf, b) if index%2 == 0 { data[dIndex] = newBuf dIndex += 1 index = 0 newBuf = make([]byte, 0) } } ret := new(schema.StatusResponse) //ret.Power = gbinary.BeDecodeToInt(data[address.UnitOnOff]) //ret.Mode = gbinary.BeDecodeToUint16(data[address.SetMode]) //ret.FanSpeed = gbinary.BeDecodeToUint16(data[address.SetFanSpeed]) //ret.SetTemp = gbinary.BeDecodeToUint16(data[address.SetTemp]) //ret.NewFan = gbinary.BeDecodeToUint16(data[address.SetNewFan]) //ret.FanSpeed = gbinary.BeDecodeToUint16(data[8:9]) //ret.SetTemp = gbinary.BeDecodeToUint16(data[28:29]) //ret.AirQuality = gbinary.BeDecodeToUint16(data[35]) //ret.CO2 = gbinary.BeDecodeToUint16(data[36]) //ret.Temperature = gbinary.BeDecodeToUint16(data[27]) //ret.Humidity = gbinary.BeDecodeToUint16(data[28]) // //fanStatus := gbinary.BeDecodeToUint16(data[5]) //ret.FanGateOne = int(fanStatus) & 0x80 //ret.FanGateTwo = int(fanStatus) & 0x40 //ret.FanGateThree = int(fanStatus) & 0x20 //ret.FanGateFour = int(fanStatus) & 0x10 //ret.FanGateFive = int(fanStatus) & 0x08 //// //ret.FanGateOneLevel = gbinary.BeDecodeToUint16(data[7]) //ret.FanGateThreeLevel = gbinary.BeDecodeToUint16(data[8]) //ret.FanGateFourLevel = gbinary.BeDecodeToUint16(data[9]) //ret.FanGateFiveLevel = gbinary.BeDecodeToUint16(data[10]) ret.InletValve = gbinary.BeDecodeToUint16(data[0]) ret.HighWater = gbinary.BeDecodeToUint16(data[1]) //ret.AirQuality = gbinary.BeDecodeToUint16(data[2]) //ret.CO2 = gbinary.BeDecodeToUint16(data[3]) //if ret.FanGateOneLevel > 0 { // ret.FanGateOne = 1 // ret.FanGateTwo = 1 //} //if ret.FanGateThreeLevel > 0 { // ret.FanGateThree = 1 //} // //if ret.FanGateFourLevel > 0 { // ret.FanGateFour = 1 //} //if ret.FanGateFiveLevel > 0 { // ret.FanGateFive = 1 //} c.receiveData = []byte{} if c.Id == "863569064249469" { if err := c.srv.ReportStatus(c.Id, ret, "status"); err != nil { return err } } return nil } func (c *Client) readError(err error) { defer c.closeConnection() if err == io.EOF || isErrConnReset(err) { return } g.Log().Errorf(c.srv.ctx, "读取数据发生错误:%s", err.Error()) } func (c *Client) closeConnection() { _ = c.conn.Close() c.conn = nil close(c.closeChan) c.isReg = false if c.closeHandler != nil { c.closeHandler(c.Id, c) } } // isErrConnReset read: connection reset by peer func isErrConnReset(err error) bool { var ne *net.OpError if errors.As(err, &ne) { return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error()) } return false } func (c *Client) send(buf []byte) error { if c.conn == nil { return nil } if c.Id == "863569064249469" { g.Log().Debugf(c.srv.ctx, "----->%2X", buf) } err := c.conn.Send(buf) if err != nil { glog.Error(c.srv.ctx, err) c.closeConnection() return err } return nil } // GetStatus 获取机组运行状态 func (c *Client) GetStatus() { //for { // err := c.send([]byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}) // err := c.send([]byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x35}) // if err != nil { // glog.Debugf("处理数据失败:%s", err.Error()) // } // time.Sleep(10 * time.Second) //} for { c.sendChan <- []byte{0x02, 0x03, 0x00, 0x21, 0x00, 0x02, 0x94, 0x32} //c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} time.Sleep(1 * time.Second) } } func (c *Client) GetSensorStatus() { for { err := c.send([]byte{0x01, 0x03, 0x00, 0x02, 0x00, 0x07, 0x00, 0x00}) if err != nil { g.Log().Errorf(c.srv.ctx, "处理数据失败:%s", err.Error()) } time.Sleep(10 * time.Second) } } func (c *Client) Read() { for { receiveBuf, err := c.conn.Recv(-1) if err != nil { g.Log().Errorf(c.srv.ctx, "处理数据失败:%s", err.Error()) } g.Log().Debugf(c.srv.ctx, "收到数据:%2X", receiveBuf) time.Sleep(1 * time.Second) } } // PowerOn 开机 func (c *Client) PowerOn() error { result, err := modbus.WriteMultipleRegisters(address.UnitOnOff, 1, []byte{0x00, 0x01}) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // PowerOff 关机 func (c *Client) PowerOff() error { result, err := modbus.WriteMultipleRegisters(address.UnitOnOff, 1, []byte{0x00, 0x00}) if err != nil { return err } c.sendChan <- result return nil } // GetPower 获取开关机状态 func (c *Client) GetPower() (err error) { result, err := modbus.ReadHoldingRegisters(address.UnitOnOff, 1) if err != nil { return } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetNewFan 设置新风阀模式 func (c *Client) SetNewFan(mode uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetNewFan, 1, gbinary.BeEncode(mode)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetMode 设置模式 func (c *Client) SetMode(mode uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetMode, 1, gbinary.BeEncode(mode)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetFanSpeed 设置风速 func (c *Client) SetFanSpeed(speed uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetFanSpeed, 1, gbinary.BeEncode(speed)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetTemp 设置温度 func (c *Client) SetTemp(temp uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetTemp, 1, gbinary.BeEncode(temp)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SleepMode 睡眠模式 func (c *Client) SleepMode(mode uint16) error { result, err := modbus.WriteMultipleRegisters(address.SleepMode, 1, gbinary.BeEncode(mode)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetFanGateThreeLevel 风阀3开度 func (c *Client) SetFanGateThreeLevel(value uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetFanGateThreeLevel, 1, gbinary.BeEncode(value)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetFanGateFourLevel 风阀4开度 func (c *Client) SetFanGateFourLevel(value uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetFanGateFourLevel, 1, gbinary.BeEncode(value)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetFanGateFiveLevel 风阀5开度 func (c *Client) SetFanGateFiveLevel(value uint16) error { result, err := modbus.WriteMultipleRegisters(address.SetFanGateFiveLevel, 1, gbinary.BeEncode(value)) if err != nil { return err } c.sendChan <- result c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06} return nil } // SetValvePower 控制风阀开关 func (c *Client) SetValvePower(param *schema.SetValvePowerReq) error { var result []byte var err error if param.Valve1 == 1 { result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_1_OPEN)) if err != nil { return err } } if param.Valve2 == 1 { result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_2_OPEN)) if err != nil { return err } } if param.Valve3 == 1 { result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_3_OPEN)) if err != nil { return err } } if param.Valve4 == 1 { result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_4_OPEN)) if err != nil { return err } } if param.Valve5 == 1 { result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_5_OPEN)) if err != nil { return err } } c.sendChan <- result return nil } func FormatInterfaceToByte(a interface{}) (result byte) { v, ok := a.(uint8) if ok { result = byte(v) } return result }