client.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package server
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/encoding/gbinary"
  7. "github.com/gogf/gf/net/gtcp"
  8. "github.com/gogf/gf/os/glog"
  9. "github.com/gogf/gf/util/gconv"
  10. "io"
  11. "net"
  12. "pt100-gateway/protocol"
  13. "strconv"
  14. "strings"
  15. "syscall"
  16. "time"
  17. )
  18. type Client struct {
  19. Id string
  20. srv *Server
  21. conn *gtcp.Conn
  22. sendChan chan []byte
  23. closeChan chan struct{}
  24. closeHandler func(id string, c *Client)
  25. regHandler func(id string, c *Client)
  26. isReg bool
  27. }
  28. func NewClient(s *Server, conn *gtcp.Conn) *Client {
  29. return &Client{
  30. srv: s,
  31. conn: conn,
  32. sendChan: make(chan []byte),
  33. closeChan: make(chan struct{}),
  34. }
  35. }
  36. func (c *Client) SetId(id string) {
  37. c.Id = id
  38. }
  39. func (c *Client) SendLoop() {
  40. for {
  41. select {
  42. case buf := <-c.sendChan:
  43. err := c.send(buf)
  44. if err != nil {
  45. glog.Errorf("指令发送失败:%s", err.Error())
  46. continue
  47. }
  48. timer := time.NewTimer(5 * time.Second)
  49. for {
  50. select {
  51. case <-c.closeChan:
  52. return
  53. case <-timer.C:
  54. glog.Error("接收指令超时")
  55. break
  56. default:
  57. receiveBuf, err := c.conn.Recv(-1)
  58. if err != nil {
  59. c.readError(err)
  60. break
  61. }
  62. if !c.isReg {
  63. id := gbinary.DecodeToString(receiveBuf)
  64. glog.Debugf("收到注册包!id:%s", id)
  65. c.SetId(id)
  66. c.isReg = true
  67. if c.regHandler != nil {
  68. c.regHandler(c.Id, c)
  69. }
  70. continue
  71. }
  72. glog.Debugf("收到数据:%2X", receiveBuf)
  73. if err := c.decodeAndReport(receiveBuf); err != nil {
  74. glog.Debugf("处理数据失败:%s", err.Error())
  75. break
  76. }
  77. }
  78. break
  79. }
  80. }
  81. }
  82. }
  83. // decodeAndReport 收到数据:01 03 10 01 05 01 09 FF FF 01 06 FF FF 01 03 FF FF FF FF E7 EB
  84. func (c *Client) decodeAndReport(buf []byte) error {
  85. length := len(buf)
  86. var crc crc
  87. crc.reset().pushBytes(buf[0 : length-2])
  88. checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2])
  89. if checksum != crc.value() {
  90. return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.value()))
  91. }
  92. if buf[1] == 0x03 {
  93. data := &protocol.Data{}
  94. data.Address = gconv.String(gbinary.BeDecodeToInt(buf[0:1]))
  95. data.Tem1 = decodeData(buf[3:5])
  96. data.Tem2 = decodeData(buf[5:7])
  97. data.Tem3 = decodeData(buf[7:9])
  98. data.Tem4 = decodeData(buf[9:11])
  99. data.Tem5 = decodeData(buf[11:13])
  100. data.Tem6 = decodeData(buf[13:15])
  101. data.Tem7 = decodeData(buf[15:17])
  102. data.Tem8 = decodeData(buf[17:19])
  103. if err := c.srv.ReportStatus(c.Id, data, "status"); err != nil {
  104. return err
  105. }
  106. }
  107. return nil
  108. }
  109. func decodeData(buf []byte) float32 {
  110. if gbinary.BeDecodeToUint16(buf) == 0xFFFF {
  111. return 0
  112. }
  113. value := gconv.Float32(gbinary.BeDecodeToUint16(buf)&0x7FFF) / 10
  114. if gbinary.BeDecodeToUint16(buf)&0x8000 == 0x8000 {
  115. value = -value
  116. }
  117. return value
  118. }
  119. func (c *Client) readError(err error) {
  120. defer c.closeConnection()
  121. if err == io.EOF || isErrConnReset(err) {
  122. return
  123. }
  124. glog.Errorf("读取数据发生错误:%s", err.Error())
  125. }
  126. func (c *Client) closeConnection() {
  127. _ = c.conn.Close()
  128. c.conn = nil
  129. close(c.closeChan)
  130. c.isReg = false
  131. if c.closeHandler != nil {
  132. c.closeHandler(c.Id, c)
  133. }
  134. }
  135. // 计算温度值, 处理零下的情况
  136. func caleTemperature(data []byte) int {
  137. var ym uint16
  138. var isBlowZero bool
  139. var result int
  140. bm := binary.BigEndian.Uint16(data)
  141. var bitNum = len(data) * 8
  142. f := "%." + strconv.Itoa(bitNum) + "b"
  143. bmStr := fmt.Sprintf(f, bm)
  144. if string(bmStr[0]) == "1" { // blow zero
  145. ym = ^bm + 1
  146. isBlowZero = true
  147. } else {
  148. ym = bm
  149. }
  150. result = int(ym)
  151. if isBlowZero {
  152. result = int(ym) * -1
  153. }
  154. return result
  155. }
  156. // isErrConnReset read: connection reset by peer
  157. func isErrConnReset(err error) bool {
  158. if ne, ok := err.(*net.OpError); ok {
  159. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  160. }
  161. return false
  162. }
  163. func (c *Client) GetSendByte() {
  164. for {
  165. c.sendChan <- []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x08, 0x44, 0x0C}
  166. time.Sleep(10 * time.Second)
  167. }
  168. }
  169. func (c *Client) send(buf []byte) error {
  170. if c.conn == nil {
  171. return nil
  172. }
  173. glog.Debugf("----->%2X", buf)
  174. err := c.conn.Send(buf)
  175. if err != nil {
  176. glog.Error(c.srv.ctx, err)
  177. c.closeConnection()
  178. return err
  179. }
  180. return nil
  181. }