client.go 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. package server
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/v2/encoding/gbinary"
  7. "github.com/gogf/gf/v2/net/gtcp"
  8. "github.com/gogf/gf/v2/os/glog"
  9. "github.com/gogf/gf/v2/util/gconv"
  10. "io"
  11. "math"
  12. "net"
  13. "strings"
  14. "syscall"
  15. "time"
  16. "water-system-gateway/protocol"
  17. )
  18. type Client struct {
  19. Id string
  20. ctx context.Context
  21. srv *Server
  22. conn *gtcp.Conn
  23. sendChan chan []byte
  24. closeChan chan struct{}
  25. closeHandler func(id string, c *Client)
  26. regHandler func(id string, c *Client)
  27. isReg bool
  28. }
  29. func NewClient(s *Server, conn *gtcp.Conn) *Client {
  30. return &Client{
  31. srv: s,
  32. ctx: context.Background(),
  33. conn: conn,
  34. sendChan: make(chan []byte),
  35. closeChan: make(chan struct{}),
  36. }
  37. }
  38. func (c *Client) SetId(id string) {
  39. c.Id = id
  40. }
  41. func (c *Client) SendLoop() {
  42. for {
  43. select {
  44. case buf := <-c.sendChan:
  45. err := c.send(buf)
  46. if err != nil {
  47. glog.Errorf(c.ctx, "指令发送失败:%s", err.Error())
  48. continue
  49. }
  50. for {
  51. select {
  52. case <-c.closeChan:
  53. return
  54. case <-time.After(2 * time.Second):
  55. glog.Error(c.ctx, "接收指令超时")
  56. break
  57. default:
  58. receiveBuf, err := c.conn.Recv(-1)
  59. if err != nil {
  60. c.readError(err)
  61. break
  62. }
  63. if !c.isReg {
  64. fmt.Println(receiveBuf)
  65. id := gbinary.DecodeToString(receiveBuf)
  66. glog.Debugf(c.ctx, "收到注册包!id:%s", id)
  67. c.SetId(id)
  68. c.isReg = true
  69. if c.regHandler != nil {
  70. c.regHandler(c.Id, c)
  71. }
  72. continue
  73. }
  74. glog.Debugf(c.ctx, "收到数据:%2X", receiveBuf)
  75. if err := c.decodeAndReport(receiveBuf); err != nil {
  76. glog.Debugf(c.ctx, "处理数据失败:%s", err.Error())
  77. break
  78. }
  79. }
  80. break
  81. }
  82. }
  83. }
  84. }
  85. // 收到数据:01 03 04 00 FD 00 8A EA 64
  86. func (c *Client) decodeAndReport(buf []byte) error {
  87. //length := len(buf)
  88. //var crc crc
  89. //crc.reset().pushBytes(buf[0 : length-2])
  90. //checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2])
  91. //if checksum != crc.value() {
  92. // return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.value()))
  93. //}
  94. if len(buf) <= 8 {
  95. return nil
  96. }
  97. if buf[1] == 0x03 {
  98. if buf[2] == 0xBE {
  99. data := &protocol.WaterSystem{}
  100. data.Power1 = gconv.Int(gbinary.BeDecodeToUint16(buf[3:5]))
  101. data.Power2 = gconv.Int(gbinary.BeDecodeToUint16(buf[5:7]))
  102. data.Power3 = gconv.Int(gbinary.BeDecodeToUint16(buf[7:9]))
  103. data.Power4 = gconv.Int(gbinary.BeDecodeToUint16(buf[9:11]))
  104. data.Power5 = gconv.Int(gbinary.BeDecodeToUint16(buf[11:13]))
  105. data.Power6 = gconv.Int(gbinary.BeDecodeToUint16(buf[13:15]))
  106. data.Power7 = gconv.Int(gbinary.BeDecodeToUint16(buf[15:17]))
  107. data.Power8 = gconv.Int(gbinary.BeDecodeToUint16(buf[17:19]))
  108. data.HeaderPower = gconv.Int(gbinary.BeDecodeToUint16(buf[21:23]))
  109. data.InletPressure = gconv.Float32(gbinary.BeDecodeToUint16(buf[23:25])) / 10
  110. data.OutletPressure = gconv.Float32(gbinary.BeDecodeToUint16(buf[25:27])) / 10
  111. data.PressureDifference = gconv.Float32(gbinary.BeDecodeToUint16(buf[27:29])) / 10
  112. data.FaultCode = gconv.Int(gbinary.BeDecodeToUint16(buf[29:31]))
  113. data.FillValve = gconv.Int(gbinary.BeDecodeToUint16(buf[31:33]))
  114. data.BypassValve = gconv.Int(gbinary.BeDecodeToUint16(buf[33:35]))
  115. data.SetInletWaterPre = gconv.Float32(gbinary.BeDecodeToUint16(buf[35:37])) / 10
  116. data.SetMaxDifferencePre = gconv.Float32(gbinary.BeDecodeToUint16(buf[37:39])) / 10
  117. data.SetMinDifferencePre = gconv.Float32(gbinary.BeDecodeToUint16(buf[39:41])) / 10
  118. data.HeaderMode = gconv.Int(gbinary.BeDecodeToUint16(buf[41:43]))
  119. data.PrimaryPumpStatus = gconv.Int(gbinary.BeDecodeToUint16(buf[43:45]))
  120. data.SecondaryPumpStatus = gconv.Int(gbinary.BeDecodeToUint16(buf[45:47]))
  121. data.PrimaryPumpFrequency = gconv.Float32(gbinary.BeDecodeToUint16(buf[47:49])) / 10
  122. data.SecondaryPumpFrequency = gconv.Float32(gbinary.BeDecodeToUint16(buf[49:51])) / 10
  123. data.WetNodeInput1 = gconv.Int(gbinary.BeDecodeToUint16(buf[51:53]))
  124. data.WetNodeInput2 = gconv.Int(gbinary.BeDecodeToUint16(buf[53:55]))
  125. data.WetNodeInput = gconv.Int(gbinary.BeDecodeToUint16(buf[55:57]))
  126. data.DryNodeInput = gconv.Int(gbinary.BeDecodeToUint16(buf[57:59]))
  127. data.PrimaryOutletTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[59:61])) / 10
  128. data.PrimaryReturnTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[61:63])) / 10
  129. data.SecondaryOutletTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[63:65])) / 10
  130. data.SecondaryReturnTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[65:67])) / 10
  131. data.CollectorPower = gconv.Int(gbinary.BeDecodeToUint16(buf[81:83]))
  132. data.PumpStationPower = gconv.Int(gbinary.BeDecodeToUint16(buf[83:85]))
  133. data.DeviceReset = gconv.Int(gbinary.BeDecodeToUint16(buf[85:87]))
  134. data.CollectorMode = gconv.Int(gbinary.BeDecodeToUint16(buf[87:89]))
  135. data.SetTargetOutletPressure = gconv.Float32(gbinary.BeDecodeToUint16(buf[89:91])) / 10
  136. data.SetMaxiPressureDifference = gconv.Float32(gbinary.BeDecodeToUint16(buf[91:93])) / 10
  137. data.SetMiniPressureDifference = gconv.Float32(gbinary.BeDecodeToUint16(buf[93:95])) / 10
  138. data.SetBypassPipeDiameter = gconv.Int(gbinary.BeDecodeToUint16(buf[95:97]))
  139. data.FillValvePower = gconv.Int(gbinary.BeDecodeToUint16(buf[97:99]))
  140. data.PrimaryPumpTypeConf = gconv.Int(gbinary.BeDecodeToUint16(buf[99:101]))
  141. data.SecondaryPumpTypeConf = gconv.Int(gbinary.BeDecodeToUint16(buf[101:103]))
  142. data.PrimaryPumpControlMethod = gconv.Int(gbinary.BeDecodeToUint16(buf[103:105]))
  143. data.SecondaryPumpControlMethod = gconv.Int(gbinary.BeDecodeToUint16(buf[105:107]))
  144. data.SetPrimaryPumpConstantPressure = gconv.Float32(gbinary.BeDecodeToUint16(buf[107:109])) / 10
  145. data.SetSecondaryPumpConstantPressure = gconv.Float32(gbinary.BeDecodeToUint16(buf[109:111])) / 10
  146. data.SetPrimaryPumpConstantTempDifference = gconv.Float32(gbinary.BeDecodeToUint16(buf[111:113])) / 10
  147. data.SetSecondaryPumpConstantTempDifference = gconv.Float32(gbinary.BeDecodeToUint16(buf[113:115])) / 10
  148. data.PrimaryPumpFrequencyLowerLimit = gconv.Float32(gbinary.BeDecodeToUint16(buf[115:117]))
  149. data.PrimaryPumpFrequencyUpperLimit = gconv.Float32(gconv.Float32(gbinary.BeDecodeToUint16(buf[117:119])))
  150. data.PumpPWMControlTypeConf = gconv.Int(gbinary.BeDecodeToUint16(buf[119:121]))
  151. data.SecondaryPumpFrequencyLowerLimit = gconv.Float32(gbinary.BeDecodeToUint16(buf[121:123]))
  152. data.SecondaryPumpFrequencyUpperLimit = gconv.Float32(gbinary.BeDecodeToUint16(buf[123:125]))
  153. data.AntiJammingFunction = gconv.Int(gbinary.BeDecodeToUint16(buf[125:127]))
  154. data.BypassValvePower = gconv.Int(gbinary.BeDecodeToUint16(buf[127:129]))
  155. data.OperatingMode = gconv.Int(gbinary.BeDecodeToUint16(buf[129:131]))
  156. data.OutdoorMode = gconv.Int(gbinary.BeDecodeToUint16(buf[173:175]))
  157. data.RefrigerationWaterTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[175:177])) / 10
  158. data.HeatingWaterTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[177:179])) / 10
  159. data.OutdoorCirculationTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[151:153]))
  160. data.InletWaterTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[153:155]))
  161. data.OutletWaterTemp = gconv.Float32(gbinary.BeDecodeToUint16(buf[155:157]))
  162. data.OutdoorPower = gconv.Int(gbinary.BeDecodeToUint16(buf[179:181]))
  163. data.Auxiliary = gconv.Int(gbinary.BeDecodeToUint16(buf[159:161]))
  164. data.FaultMark = gconv.Int(gbinary.BeDecodeToUint16(buf[161:163]))
  165. if err := c.srv.ReportStatus(c.Id, data, "status"); err != nil {
  166. return err
  167. }
  168. }
  169. }
  170. return nil
  171. }
  172. func (c *Client) readError(err error) {
  173. defer c.closeConnection()
  174. if err == io.EOF || isErrConnReset(err) {
  175. return
  176. }
  177. glog.Errorf(c.ctx, "读取数据发生错误:%s", err.Error())
  178. }
  179. func (c *Client) closeConnection() {
  180. _ = c.conn.Close()
  181. c.conn = nil
  182. close(c.closeChan)
  183. c.isReg = false
  184. if c.closeHandler != nil {
  185. c.closeHandler(c.Id, c)
  186. }
  187. }
  188. // 计算温度值, 处理零下的情况
  189. func caleTemperature(data []byte) float32 {
  190. value := gconv.Float32(gbinary.BeDecodeToUint16(data)&0x7FFF) / 10
  191. if value < 1 {
  192. value = float32(math.Floor(float64(value) + 0.5))
  193. }
  194. if gbinary.BeDecodeToUint16(data)&0x8000 == 0x8000 && value != 0 {
  195. value = value * -1
  196. }
  197. return value
  198. }
  199. // isErrConnReset read: connection reset by peer
  200. func isErrConnReset(err error) bool {
  201. var ne *net.OpError
  202. if errors.As(err, &ne) {
  203. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  204. }
  205. return false
  206. }
  207. //func dataBlock(value float32) []byte {
  208. // buffer := &bytes.Buffer{}
  209. // var a uint16
  210. // if value >= 0 {
  211. // a = uint16(value*100) | 0x8000
  212. // } else {
  213. // a = uint16(math.Abs(float64(value))*100) | 0x0000
  214. // }
  215. // buffer.Write(gbinary.BeEncodeUint16(a))
  216. // return buffer.Bytes()
  217. //}
  218. func (c *Client) GetSendByte() {
  219. for {
  220. c.sendChan <- []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x5F, 0x05, 0xF2}
  221. time.Sleep(10 * time.Second)
  222. }
  223. }
  224. func (c *Client) send(buf []byte) error {
  225. if c.conn == nil {
  226. return nil
  227. }
  228. glog.Debugf(c.ctx, "----->%2X", buf)
  229. err := c.conn.Send(buf)
  230. if err != nil {
  231. glog.Error(c.srv.ctx, err)
  232. c.closeConnection()
  233. return err
  234. }
  235. return nil
  236. }
  237. func (c *Client) WaterSystemControl(address uint16, value interface{}) error {
  238. result, err := WriteMultipleRegisters(address, 1, gbinary.BeEncodeUint16(gconv.Uint16(value)))
  239. if err != nil {
  240. return err
  241. }
  242. c.sendChan <- result
  243. time.Sleep(100 * time.Millisecond)
  244. c.sendChan <- []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x5F, 0x05, 0xF2}
  245. return nil
  246. }