client.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package server
  2. import (
  3. "bth-rs30-gateway/protocol"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/encoding/gbinary"
  7. "github.com/gogf/gf/frame/g"
  8. "github.com/gogf/gf/net/gtcp"
  9. "github.com/gogf/gf/os/glog"
  10. "io"
  11. "net"
  12. "strings"
  13. "syscall"
  14. "time"
  15. )
  16. type Client struct {
  17. Id string
  18. srv *Server
  19. conn *gtcp.Conn
  20. sendChan chan []byte
  21. closeChan chan struct{}
  22. closeHandler func(id string, c *Client)
  23. isReg bool
  24. }
  25. func NewClient(s *Server, conn *gtcp.Conn) *Client {
  26. return &Client{
  27. srv: s,
  28. conn: conn,
  29. sendChan: make(chan []byte),
  30. closeChan: make(chan struct{}),
  31. }
  32. }
  33. func (c *Client) SetId(id string) {
  34. c.Id = id
  35. }
  36. func (c *Client) SendLoop() {
  37. for {
  38. select {
  39. case buf := <-c.sendChan:
  40. err := c.send(buf)
  41. if err != nil {
  42. glog.Errorf("指令发送失败:%s", err.Error())
  43. continue
  44. }
  45. timer := time.NewTimer(5 * time.Second)
  46. for {
  47. timer.Reset(5 * time.Second)
  48. select {
  49. case <-timer.C:
  50. glog.Error("接收指令超时")
  51. break
  52. default:
  53. if !c.isReg {
  54. id := gbinary.DecodeToString(buf)
  55. glog.Debugf("收到注册包!id:%s", id)
  56. c.SetId(id)
  57. c.isReg = true
  58. break
  59. }
  60. receiveBuf, err := c.conn.Recv(-1)
  61. if err != nil {
  62. c.readError(err)
  63. break
  64. }
  65. glog.Debugf("收到数据:%2X", receiveBuf)
  66. if err := c.decodeAndReport(receiveBuf); err != nil {
  67. glog.Debugf("处理数据失败:%s", err.Error())
  68. break
  69. }
  70. }
  71. break
  72. }
  73. }
  74. }
  75. }
  76. // 收到数据:01 03 04 00 FD 00 8A EA 64
  77. func (c *Client) decodeAndReport(buf []byte) error {
  78. length := len(buf)
  79. var crc crc
  80. crc.reset().pushBytes(buf[0 : length-2])
  81. checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2])
  82. if checksum != crc.value() {
  83. return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.value()))
  84. }
  85. if buf[1] == 0x03 && buf[2] == 0x06 {
  86. data := &protocol.Data{}
  87. data.Temperature = float32(gbinary.BeDecodeToUint16(buf[3:5])) * 0.1
  88. data.Humidly = float32(gbinary.BeDecodeToUint16(buf[5:7])) * 0.1
  89. if err := c.srv.ReportStatus(c.Id, data); err != nil {
  90. return err
  91. }
  92. }
  93. return nil
  94. }
  95. func (c *Client) readError(err error) {
  96. defer c.closeConnection()
  97. if err == io.EOF || isErrConnReset(err) {
  98. return
  99. }
  100. glog.Errorf("读取数据发生错误:%s", err.Error())
  101. }
  102. func (c *Client) closeConnection() {
  103. _ = c.conn.Close()
  104. c.conn = nil
  105. close(c.closeChan)
  106. c.SetId("")
  107. c.isReg = false
  108. if c.closeHandler != nil {
  109. c.closeHandler(c.Id, c)
  110. }
  111. }
  112. // isErrConnReset read: connection reset by peer
  113. func isErrConnReset(err error) bool {
  114. if ne, ok := err.(*net.OpError); ok {
  115. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  116. }
  117. return false
  118. }
  119. func (c *Client) GetSendByte() {
  120. for {
  121. c.sendChan <- []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x02, 0xC4, 0x0B}
  122. time.Sleep(time.Duration(g.Cfg().GetInt("Server.Frequency")) * time.Second)
  123. }
  124. }
  125. func (c *Client) send(buf []byte) error {
  126. if c.conn == nil {
  127. return nil
  128. }
  129. glog.Debugf("----->%2X", buf)
  130. err := c.conn.Send(buf)
  131. if err != nil {
  132. glog.Error(c.srv.ctx, err)
  133. c.closeConnection()
  134. return err
  135. }
  136. return nil
  137. }