client.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package server
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/gogf/gf/encoding/gbinary"
  6. "github.com/gogf/gf/net/gtcp"
  7. "github.com/gogf/gf/os/glog"
  8. "github.com/gogf/gf/util/gconv"
  9. "io"
  10. "math"
  11. "net"
  12. "pt100-gateway/protocol"
  13. "strings"
  14. "syscall"
  15. "time"
  16. )
  17. type Client struct {
  18. Id string
  19. srv *Server
  20. conn *gtcp.Conn
  21. sendChan chan []byte
  22. closeChan chan struct{}
  23. closeHandler func(id string, c *Client)
  24. regHandler func(id string, c *Client)
  25. isReg bool
  26. }
  27. func NewClient(s *Server, conn *gtcp.Conn) *Client {
  28. return &Client{
  29. srv: s,
  30. conn: conn,
  31. sendChan: make(chan []byte),
  32. closeChan: make(chan struct{}),
  33. }
  34. }
  35. func (c *Client) SetId(id string) {
  36. c.Id = id
  37. }
  38. func (c *Client) SendLoop() {
  39. for {
  40. select {
  41. case buf := <-c.sendChan:
  42. err := c.send(buf)
  43. if err != nil {
  44. glog.Errorf("指令发送失败:%s", err.Error())
  45. continue
  46. }
  47. timer := time.NewTimer(5 * time.Second)
  48. for {
  49. select {
  50. case <-c.closeChan:
  51. return
  52. case <-timer.C:
  53. glog.Error("接收指令超时")
  54. break
  55. default:
  56. receiveBuf, err := c.conn.Recv(-1)
  57. if err != nil {
  58. c.readError(err)
  59. break
  60. }
  61. if !c.isReg {
  62. id := gbinary.DecodeToString(receiveBuf)
  63. glog.Debugf("收到注册包!id:%s", id)
  64. c.SetId(id)
  65. c.isReg = true
  66. if c.regHandler != nil {
  67. c.regHandler(c.Id, c)
  68. }
  69. continue
  70. }
  71. glog.Debugf("收到数据:%2X", receiveBuf)
  72. if err := c.decodeAndReport(receiveBuf); err != nil {
  73. glog.Debugf("处理数据失败:%s", err.Error())
  74. break
  75. }
  76. }
  77. break
  78. }
  79. }
  80. }
  81. }
  82. // decodeAndReport 收到数据:01 03 10 01 05 01 09 FF FF 01 06 FF FF 01 03 FF FF FF FF E7 EB
  83. func (c *Client) decodeAndReport(buf []byte) error {
  84. length := len(buf)
  85. var crc crc
  86. crc.reset().pushBytes(buf[0 : length-2])
  87. checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2])
  88. if checksum != crc.value() {
  89. return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.value()))
  90. }
  91. if buf[1] == 0x03 {
  92. data := &protocol.Data{}
  93. data.Address = gconv.String(gbinary.BeDecodeToInt(buf[0:1]))
  94. data.Tem1 = caleTemperature(buf[3:5])
  95. data.Tem2 = caleTemperature(buf[5:7])
  96. data.Tem3 = caleTemperature(buf[7:9])
  97. data.Tem4 = caleTemperature(buf[9:11])
  98. data.Tem5 = caleTemperature(buf[11:13])
  99. data.Tem6 = caleTemperature(buf[13:15])
  100. data.Tem7 = caleTemperature(buf[15:17])
  101. data.Tem8 = caleTemperature(buf[17:19])
  102. if err := c.srv.ReportStatus(c.Id, data, "status"); err != nil {
  103. return err
  104. }
  105. }
  106. return nil
  107. }
  108. func caleTemperature(buf []byte) float32 {
  109. if gbinary.BeDecodeToUint16(buf) == 0xFFFF {
  110. return 0
  111. }
  112. value := gconv.Float32(gbinary.BeDecodeToUint16(buf)&0x7FFF) / 10
  113. if value < 1 {
  114. value = float32(math.Floor(float64(value) + 0.5))
  115. }
  116. if gbinary.BeDecodeToUint16(buf)&0x8000 == 0x8000 {
  117. value = value * -1
  118. }
  119. return value
  120. }
  121. func (c *Client) readError(err error) {
  122. defer c.closeConnection()
  123. if err == io.EOF || isErrConnReset(err) {
  124. return
  125. }
  126. glog.Errorf("读取数据发生错误:%s", err.Error())
  127. }
  128. func (c *Client) closeConnection() {
  129. _ = c.conn.Close()
  130. c.conn = nil
  131. close(c.closeChan)
  132. c.isReg = false
  133. if c.closeHandler != nil {
  134. c.closeHandler(c.Id, c)
  135. }
  136. }
  137. // isErrConnReset read: connection reset by peer
  138. func isErrConnReset(err error) bool {
  139. if ne, ok := err.(*net.OpError); ok {
  140. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  141. }
  142. return false
  143. }
  144. func (c *Client) GetSendByte() {
  145. for {
  146. c.sendChan <- []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x08, 0x44, 0x0C}
  147. time.Sleep(10 * time.Second)
  148. }
  149. }
  150. func (c *Client) send(buf []byte) error {
  151. if c.conn == nil {
  152. return nil
  153. }
  154. glog.Debugf("----->%2X", buf)
  155. err := c.conn.Send(buf)
  156. if err != nil {
  157. glog.Error(c.srv.ctx, err)
  158. c.closeConnection()
  159. return err
  160. }
  161. return nil
  162. }