client.go 13 KB


  1. package server
  2. import (
  3. "bytes"
  4. "errors"
  5. "fmt"
  6. "github.com/gogf/gf/v2/encoding/gbinary"
  7. "github.com/gogf/gf/v2/frame/g"
  8. "github.com/gogf/gf/v2/net/gtcp"
  9. "github.com/gogf/gf/v2/os/glog"
  10. "io"
  11. "net"
  12. "strings"
  13. "syscall"
  14. "time"
  15. "yx-4g-gateway/address"
  16. "yx-4g-gateway/modbus"
  17. "yx-4g-gateway/schema"
  18. )
  19. const (
  20. FAN_1_OPEN = 1 << 0
  21. FAN_2_OPEN = 1 << 1
  22. FAN_3_OPEN = 1 << 2
  23. FAN_4_OPEN = 1 << 3
  24. FAN_5_OPEN = 1 << 4
  25. )
  26. type Client struct {
  27. Id string
  28. srv *Server
  29. conn *gtcp.Conn
  30. sendChan chan []byte
  31. closeChan chan struct{}
  32. closeHandler func(id string, c *Client)
  33. regHandler func(id string, c *Client)
  34. isReg bool
  35. receiveData []byte
  36. }
  37. func NewClient(s *Server, conn *gtcp.Conn) *Client {
  38. return &Client{
  39. srv: s,
  40. conn: conn,
  41. sendChan: make(chan []byte),
  42. closeChan: make(chan struct{}),
  43. receiveData: make([]byte, 0),
  44. }
  45. }
  46. func (c *Client) SetId(id string) {
  47. c.Id = id
  48. }
  49. func (c *Client) SendLoop() {
  50. ctx := c.srv.ctx
  51. for {
  52. select {
  53. case buf := <-c.sendChan:
  54. err := c.send(buf)
  55. if err != nil {
  56. g.Log().Errorf(ctx, "指令发送失败:%s", err.Error())
  57. continue
  58. }
  59. timer := time.NewTimer(5 * time.Second)
  60. for {
  61. select {
  62. case <-c.closeChan:
  63. return
  64. case <-timer.C:
  65. g.Log().Errorf(ctx, "接收指令超时")
  66. break
  67. default:
  68. receiveBuf, err := c.conn.Recv(0)
  69. if err != nil {
  70. c.readError(err)
  71. break
  72. }
  73. if !c.isReg {
  74. id := gbinary.DecodeToString(receiveBuf)
  75. g.Log().Debugf(ctx, "收到注册包!id:%s", id)
  76. c.SetId(id)
  77. c.isReg = true
  78. if c.regHandler != nil {
  79. c.regHandler(c.Id, c)
  80. }
  81. continue
  82. }
  83. if c.Id == "863569064249469" {
  84. g.Log().Debugf(ctx, "ID:%s 收到数据:%2X", c.Id, receiveBuf)
  85. }
  86. c.receiveData = receiveBuf
  87. //for v := range receiveBuf {
  88. // c.receiveQueue.Push(v)
  89. // glog.Debugf("队列数据:%2X", c.receiveQueue)
  90. //}
  91. //glog.Debugf("receiveQueue长度:%d", c.receiveQueue.Len())
  92. if err := c.readQueue(); err != nil {
  93. g.Log().Errorf(ctx, "处理数据失败:%s", err.Error())
  94. break
  95. }
  96. }
  97. break
  98. }
  99. }
  100. }
  101. }
  102. func (c *Client) readQueue() error {
  103. //for i := 0; i < len(c.receiveData)-2; i++ {
  104. // if c.receiveData[i] == 0x02 && c.receiveData[i+1] == 0x03 && c.receiveData[i+2] == 0xa8 {
  105. // c.receiveData = c.receiveData[i:]
  106. // if len(c.receiveData) >= int(c.receiveData[2])+5 {
  107. // c.receiveData = c.receiveData[:int(c.receiveData[i+2])+5]
  108. // return c.decodeAndReport(c.receiveData)
  109. // } else {
  110. // return nil
  111. // }
  112. //
  113. // }
  114. //}
  115. //c.receiveData = []byte{}
  116. //return nil
  117. if bytes.Equal(c.receiveData[:3], []byte{0x02, 0x03, 0x04}) {
  118. if len(c.receiveData) >= int(c.receiveData[2])+5 {
  119. return c.decodeAndReport(c.receiveData)
  120. }
  121. }
  122. return nil
  123. //if bytes.Equal(c.receiveData[:3], []byte{0x02, 0x03, 0x04}) {
  124. // if len(c.receiveData) >= int(c.receiveData[2])+5 {
  125. // c.receiveData = c.receiveData[:int(c.receiveData[2]+5)]
  126. // if err := c.decodeAndReport(c.receiveData); err != nil {
  127. // return err
  128. // }
  129. // c.receiveData = c.receiveData[:0]
  130. // }
  131. // return nil
  132. //} else {
  133. // c.receiveData = c.receiveData[:0]
  134. // return nil
  135. //}
  136. //for {
  137. // if c.receiveQueue.Len() > 3 {
  138. // glog.Debugf("11111111111111111")
  139. // a := FormatInterfaceToByte(c.receiveQueue.Pop())
  140. // b := FormatInterfaceToByte(c.receiveQueue.Pop())
  141. // d := FormatInterfaceToByte(c.receiveQueue.Pop())
  142. // if a == 0x1 && b == 0x03 && d == 0xa8 && len(c.dataByteArray) == 0 {
  143. // packageBytes = append(packageBytes, a)
  144. // packageBytes = append(packageBytes, b)
  145. // packageBytes = append(packageBytes, d)
  146. //
  147. // } else {
  148. // return
  149. // }
  150. // } else {
  151. // return
  152. // }
  153. // for {
  154. // if count == 3 {
  155. // glog.Debugf("2222222222222222222")
  156. // size := int(packageBytes[2]) + 2
  157. // for {
  158. // c.dataByteArray = append(c.dataByteArray, packageBytes...)
  159. // for i := 0; i < size; i++ {
  160. // if c.receiveQueue.Len() > 0 {
  161. // d := c.receiveQueue.Pop()
  162. // c.dataByteArray = append(c.dataByteArray, FormatInterfaceToByte(d))
  163. // if i == size-1 {
  164. // err := c.decodeAndReport(c.dataByteArray)
  165. // glog.Debugf("处理数据失败:%s", err.Error())
  166. // c.dataByteArray = c.dataByteArray[:0]
  167. // return
  168. // }
  169. // }
  170. // }
  171. // }
  172. // count = 0
  173. // }
  174. // }
  175. //}
  176. }
  177. func (c *Client) decodeAndReport(buf []byte) error {
  178. length := len(buf)
  179. var crc modbus.Crc
  180. crc.Reset().PushBytes(buf[0 : length-2])
  181. checksum := uint16(buf[length-1])<<8 | uint16(buf[length-2])
  182. if checksum != crc.Value() {
  183. return errors.New(fmt.Sprintf("modbus: response crc '%v' does not match expected '%v'", checksum, crc.Value()))
  184. }
  185. result := buf[3 : length-2]
  186. data := make(map[int][]byte)
  187. var index, dIndex int
  188. var newBuf []byte
  189. for _, b := range result {
  190. index += 1
  191. newBuf = append(newBuf, b)
  192. if index%2 == 0 {
  193. data[dIndex] = newBuf
  194. dIndex += 1
  195. index = 0
  196. newBuf = make([]byte, 0)
  197. }
  198. }
  199. ret := new(schema.StatusResponse)
  200. //ret.Power = gbinary.BeDecodeToInt(data[address.UnitOnOff])
  201. //ret.Mode = gbinary.BeDecodeToUint16(data[address.SetMode])
  202. //ret.FanSpeed = gbinary.BeDecodeToUint16(data[address.SetFanSpeed])
  203. //ret.SetTemp = gbinary.BeDecodeToUint16(data[address.SetTemp])
  204. //ret.NewFan = gbinary.BeDecodeToUint16(data[address.SetNewFan])
  205. //ret.FanSpeed = gbinary.BeDecodeToUint16(data[8:9])
  206. //ret.SetTemp = gbinary.BeDecodeToUint16(data[28:29])
  207. //ret.AirQuality = gbinary.BeDecodeToUint16(data[35])
  208. //ret.CO2 = gbinary.BeDecodeToUint16(data[36])
  209. //ret.Temperature = gbinary.BeDecodeToUint16(data[27])
  210. //ret.Humidity = gbinary.BeDecodeToUint16(data[28])
  211. //
  212. //fanStatus := gbinary.BeDecodeToUint16(data[5])
  213. //ret.FanGateOne = int(fanStatus) & 0x80
  214. //ret.FanGateTwo = int(fanStatus) & 0x40
  215. //ret.FanGateThree = int(fanStatus) & 0x20
  216. //ret.FanGateFour = int(fanStatus) & 0x10
  217. //ret.FanGateFive = int(fanStatus) & 0x08
  218. ////
  219. //ret.FanGateOneLevel = gbinary.BeDecodeToUint16(data[7])
  220. //ret.FanGateThreeLevel = gbinary.BeDecodeToUint16(data[8])
  221. //ret.FanGateFourLevel = gbinary.BeDecodeToUint16(data[9])
  222. //ret.FanGateFiveLevel = gbinary.BeDecodeToUint16(data[10])
  223. ret.InletValve = gbinary.BeDecodeToUint16(data[0])
  224. ret.HighWater = gbinary.BeDecodeToUint16(data[1])
  225. //ret.AirQuality = gbinary.BeDecodeToUint16(data[2])
  226. //ret.CO2 = gbinary.BeDecodeToUint16(data[3])
  227. //if ret.FanGateOneLevel > 0 {
  228. // ret.FanGateOne = 1
  229. // ret.FanGateTwo = 1
  230. //}
  231. //if ret.FanGateThreeLevel > 0 {
  232. // ret.FanGateThree = 1
  233. //}
  234. //
  235. //if ret.FanGateFourLevel > 0 {
  236. // ret.FanGateFour = 1
  237. //}
  238. //if ret.FanGateFiveLevel > 0 {
  239. // ret.FanGateFive = 1
  240. //}
  241. c.receiveData = []byte{}
  242. if c.Id == "863569064249469" {
  243. if err := c.srv.ReportStatus(c.Id, ret, "status"); err != nil {
  244. return err
  245. }
  246. }
  247. return nil
  248. }
  249. func (c *Client) readError(err error) {
  250. defer c.closeConnection()
  251. if err == io.EOF || isErrConnReset(err) {
  252. return
  253. }
  254. g.Log().Errorf(c.srv.ctx, "读取数据发生错误:%s", err.Error())
  255. }
  256. func (c *Client) closeConnection() {
  257. _ = c.conn.Close()
  258. c.conn = nil
  259. close(c.closeChan)
  260. c.isReg = false
  261. if c.closeHandler != nil {
  262. c.closeHandler(c.Id, c)
  263. }
  264. }
  265. // isErrConnReset read: connection reset by peer
  266. func isErrConnReset(err error) bool {
  267. var ne *net.OpError
  268. if errors.As(err, &ne) {
  269. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  270. }
  271. return false
  272. }
  273. func (c *Client) send(buf []byte) error {
  274. if c.conn == nil {
  275. return nil
  276. }
  277. if c.Id == "863569064249469" {
  278. g.Log().Debugf(c.srv.ctx, "----->%2X", buf)
  279. }
  280. err := c.conn.Send(buf)
  281. if err != nil {
  282. glog.Error(c.srv.ctx, err)
  283. c.closeConnection()
  284. return err
  285. }
  286. return nil
  287. }
  288. // GetStatus 获取机组运行状态
  289. func (c *Client) GetStatus() {
  290. //for {
  291. // err := c.send([]byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06})
  292. // err := c.send([]byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x35})
  293. // if err != nil {
  294. // glog.Debugf("处理数据失败:%s", err.Error())
  295. // }
  296. // time.Sleep(10 * time.Second)
  297. //}
  298. for {
  299. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x21, 0x00, 0x02, 0x94, 0x32}
  300. //c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  301. time.Sleep(1 * time.Second)
  302. }
  303. }
  304. func (c *Client) GetSensorStatus() {
  305. for {
  306. err := c.send([]byte{0x01, 0x03, 0x00, 0x02, 0x00, 0x07, 0x00, 0x00})
  307. if err != nil {
  308. g.Log().Errorf(c.srv.ctx, "处理数据失败:%s", err.Error())
  309. }
  310. time.Sleep(10 * time.Second)
  311. }
  312. }
  313. func (c *Client) Read() {
  314. for {
  315. receiveBuf, err := c.conn.Recv(-1)
  316. if err != nil {
  317. g.Log().Errorf(c.srv.ctx, "处理数据失败:%s", err.Error())
  318. }
  319. g.Log().Debugf(c.srv.ctx, "收到数据:%2X", receiveBuf)
  320. time.Sleep(1 * time.Second)
  321. }
  322. }
  323. // PowerOn 开机
  324. func (c *Client) PowerOn() error {
  325. result, err := modbus.WriteMultipleRegisters(address.UnitOnOff, 1, []byte{0x00, 0x01})
  326. if err != nil {
  327. return err
  328. }
  329. c.sendChan <- result
  330. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  331. return nil
  332. }
  333. // PowerOff 关机
  334. func (c *Client) PowerOff() error {
  335. result, err := modbus.WriteMultipleRegisters(address.UnitOnOff, 1, []byte{0x00, 0x00})
  336. if err != nil {
  337. return err
  338. }
  339. c.sendChan <- result
  340. return nil
  341. }
  342. // GetPower 获取开关机状态
  343. func (c *Client) GetPower() (err error) {
  344. result, err := modbus.ReadHoldingRegisters(address.UnitOnOff, 1)
  345. if err != nil {
  346. return
  347. }
  348. c.sendChan <- result
  349. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  350. return nil
  351. }
  352. // SetNewFan 设置新风阀模式
  353. func (c *Client) SetNewFan(mode uint16) error {
  354. result, err := modbus.WriteMultipleRegisters(address.SetNewFan, 1, gbinary.BeEncode(mode))
  355. if err != nil {
  356. return err
  357. }
  358. c.sendChan <- result
  359. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  360. return nil
  361. }
  362. // SetMode 设置模式
  363. func (c *Client) SetMode(mode uint16) error {
  364. result, err := modbus.WriteMultipleRegisters(address.SetMode, 1, gbinary.BeEncode(mode))
  365. if err != nil {
  366. return err
  367. }
  368. c.sendChan <- result
  369. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  370. return nil
  371. }
  372. // SetFanSpeed 设置风速
  373. func (c *Client) SetFanSpeed(speed uint16) error {
  374. result, err := modbus.WriteMultipleRegisters(address.SetFanSpeed, 1, gbinary.BeEncode(speed))
  375. if err != nil {
  376. return err
  377. }
  378. c.sendChan <- result
  379. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  380. return nil
  381. }
  382. // SetTemp 设置温度
  383. func (c *Client) SetTemp(temp uint16) error {
  384. result, err := modbus.WriteMultipleRegisters(address.SetTemp, 1, gbinary.BeEncode(temp))
  385. if err != nil {
  386. return err
  387. }
  388. c.sendChan <- result
  389. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  390. return nil
  391. }
  392. // SleepMode 睡眠模式
  393. func (c *Client) SleepMode(mode uint16) error {
  394. result, err := modbus.WriteMultipleRegisters(address.SleepMode, 1, gbinary.BeEncode(mode))
  395. if err != nil {
  396. return err
  397. }
  398. c.sendChan <- result
  399. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  400. return nil
  401. }
  402. // SetFanGateThreeLevel 风阀3开度
  403. func (c *Client) SetFanGateThreeLevel(value uint16) error {
  404. result, err := modbus.WriteMultipleRegisters(address.SetFanGateThreeLevel, 1, gbinary.BeEncode(value))
  405. if err != nil {
  406. return err
  407. }
  408. c.sendChan <- result
  409. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  410. return nil
  411. }
  412. // SetFanGateFourLevel 风阀4开度
  413. func (c *Client) SetFanGateFourLevel(value uint16) error {
  414. result, err := modbus.WriteMultipleRegisters(address.SetFanGateFourLevel, 1, gbinary.BeEncode(value))
  415. if err != nil {
  416. return err
  417. }
  418. c.sendChan <- result
  419. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  420. return nil
  421. }
  422. // SetFanGateFiveLevel 风阀5开度
  423. func (c *Client) SetFanGateFiveLevel(value uint16) error {
  424. result, err := modbus.WriteMultipleRegisters(address.SetFanGateFiveLevel, 1, gbinary.BeEncode(value))
  425. if err != nil {
  426. return err
  427. }
  428. c.sendChan <- result
  429. c.sendChan <- []byte{0x02, 0x03, 0x00, 0x00, 0x00, 0x54, 0x44, 0x06}
  430. return nil
  431. }
  432. // SetValvePower 控制风阀开关
  433. func (c *Client) SetValvePower(param *schema.SetValvePowerReq) error {
  434. var result []byte
  435. var err error
  436. if param.Valve1 == 1 {
  437. result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_1_OPEN))
  438. if err != nil {
  439. return err
  440. }
  441. }
  442. if param.Valve2 == 1 {
  443. result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_2_OPEN))
  444. if err != nil {
  445. return err
  446. }
  447. }
  448. if param.Valve3 == 1 {
  449. result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_3_OPEN))
  450. if err != nil {
  451. return err
  452. }
  453. }
  454. if param.Valve4 == 1 {
  455. result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_4_OPEN))
  456. if err != nil {
  457. return err
  458. }
  459. }
  460. if param.Valve5 == 1 {
  461. result, err = modbus.WriteMultipleRegisters(address.SetValvePower, 1, gbinary.BeEncode(FAN_5_OPEN))
  462. if err != nil {
  463. return err
  464. }
  465. }
  466. c.sendChan <- result
  467. return nil
  468. }
  469. func FormatInterfaceToByte(a interface{}) (result byte) {
  470. v, ok := a.(uint8)
  471. if ok {
  472. result = byte(v)
  473. }
  474. return result
  475. }