client.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. package server
  2. import (
  3. "dlt645-server/protocol"
  4. "github.com/gogf/gf/frame/g"
  5. "github.com/gogf/gf/net/gtcp"
  6. "github.com/gogf/gf/os/glog"
  7. "io"
  8. "net"
  9. "strings"
  10. "syscall"
  11. "time"
  12. )
  13. type Client struct {
  14. Id string
  15. Address []byte
  16. srv *Server
  17. conn *gtcp.Conn
  18. sendChan chan []byte
  19. closeChan chan struct{}
  20. closeHandler func(id string, c *Client)
  21. lastHeartBeat time.Time
  22. done chan struct{}
  23. gatewayId uint16
  24. isReg bool
  25. }
  26. func (c *Client) ReadLoop(ctx *protocol.PacketContext) {
  27. defer c.srv.grWG.Done()
  28. for {
  29. buf, err := c.conn.Recv(-1)
  30. if err != nil {
  31. c.readError(err)
  32. return
  33. }
  34. if len(buf) > 0 {
  35. result, err := c.srv.message.Decode(ctx, buf)
  36. if err != nil {
  37. glog.Errorf("解析报文失败:%s", err.Error())
  38. }
  39. if !c.isReg {
  40. c.SetId(ctx.GetId())
  41. c.isReg = true
  42. }
  43. if ctx.ReportPower {
  44. data := new(protocol.PowerData)
  45. data.ActivePower = result.ActivePower
  46. if err := c.srv.ReportStatus(c.Id, data); err != nil {
  47. c.readError(err)
  48. return
  49. }
  50. ctx.SetReportPower(false)
  51. }
  52. if ctx.ReportVI {
  53. data := new(protocol.VIData)
  54. data.AV = ctx.GetVIData().AV
  55. data.BV = ctx.GetVIData().BV
  56. data.CV = ctx.GetVIData().CV
  57. data.AI = ctx.GetVIData().AI
  58. data.BI = ctx.GetVIData().BI
  59. data.CI = ctx.GetVIData().CI
  60. if err := c.srv.ReportStatus(c.Id, data); err != nil {
  61. c.readError(err)
  62. return
  63. }
  64. ctx.SetReportVI(false)
  65. }
  66. }
  67. }
  68. }
  69. func (c *Client) SetId(id string) {
  70. c.Id = id
  71. }
  72. func (c *Client) SetAddress(address []byte) {
  73. c.Address = address
  74. }
  75. func (c *Client) readError(err error) {
  76. defer c.closeConnection()
  77. if err == io.EOF || isErrConnReset(err) {
  78. return
  79. }
  80. glog.Errorf("读取数据发生错误:%s", err.Error())
  81. }
  82. func (c *Client) closeConnection() {
  83. _ = c.conn.Close()
  84. c.conn = nil
  85. close(c.done)
  86. c.SetId("")
  87. c.isReg = false
  88. if c.closeHandler != nil {
  89. c.closeHandler(c.Id, c)
  90. }
  91. }
  92. // isErrConnReset read: connection reset by peer
  93. func isErrConnReset(err error) bool {
  94. if ne, ok := err.(*net.OpError); ok {
  95. return strings.Contains(ne.Err.Error(), syscall.ECONNRESET.Error())
  96. }
  97. return false
  98. }
  99. func (c *Client) send(buf []byte) error {
  100. if c.conn == nil {
  101. return nil
  102. }
  103. err := c.conn.Send(buf)
  104. if err != nil {
  105. glog.Error(err)
  106. c.closeConnection()
  107. return err
  108. }
  109. glog.Debugf("指令发送成功:%2X", buf)
  110. return nil
  111. }
  112. func (c *Client) GetActivePower(ctx *protocol.PacketContext, powerChan chan struct{}) {
  113. defer c.srv.grWG.Done()
  114. for {
  115. <-powerChan
  116. entity := protocol.Dlt_0x33333433{}
  117. if ctx.GetReceiveAddress() != nil {
  118. sendByte, _ := entity.Encode(ctx)
  119. err := c.send(sendByte)
  120. if err != nil {
  121. glog.Debugf("指令发送失败:%s", err.Error())
  122. }
  123. }
  124. }
  125. }
  126. func (c *Client) GetAV(ctx *protocol.PacketContext, avChan, aiChan chan struct{}) {
  127. defer c.srv.grWG.Done()
  128. for {
  129. <-avChan
  130. entity := protocol.Dlt_0x33343435{}
  131. if ctx.GetReceiveAddress() != nil {
  132. sendByte, _ := entity.Encode(ctx)
  133. err := c.send(sendByte)
  134. if err != nil {
  135. glog.Debugf("指令发送失败:%s", err.Error())
  136. }
  137. }
  138. time.Sleep(1 * time.Second)
  139. aiChan <- struct{}{}
  140. }
  141. }
  142. func (c *Client) GetAI(ctx *protocol.PacketContext, aiChan, bvChan chan struct{}) {
  143. defer c.srv.grWG.Done()
  144. for {
  145. <-aiChan
  146. entity := protocol.Dlt_0x33343535{}
  147. if ctx.GetReceiveAddress() != nil {
  148. sendByte, _ := entity.Encode(ctx)
  149. err := c.send(sendByte)
  150. if err != nil {
  151. glog.Debugf("指令发送失败:%s", err.Error())
  152. }
  153. }
  154. time.Sleep(1 * time.Second)
  155. bvChan <- struct{}{}
  156. }
  157. }
  158. func (c *Client) GetBV(ctx *protocol.PacketContext, bvChan, biChan chan struct{}) {
  159. defer c.srv.grWG.Done()
  160. for {
  161. <-bvChan
  162. entity := protocol.Dlt_0x33353435{}
  163. if ctx.GetReceiveAddress() != nil {
  164. sendByte, _ := entity.Encode(ctx)
  165. err := c.send(sendByte)
  166. if err != nil {
  167. glog.Debugf("指令发送失败:%s", err.Error())
  168. }
  169. }
  170. time.Sleep(1 * time.Second)
  171. biChan <- struct{}{}
  172. }
  173. }
  174. func (c *Client) GetBI(ctx *protocol.PacketContext, biChan, cvChan chan struct{}) {
  175. defer c.srv.grWG.Done()
  176. for {
  177. <-biChan
  178. entity := protocol.Dlt_0x33353535{}
  179. if ctx.GetReceiveAddress() != nil {
  180. sendByte, _ := entity.Encode(ctx)
  181. err := c.send(sendByte)
  182. if err != nil {
  183. glog.Debugf("指令发送失败:%s", err.Error())
  184. }
  185. }
  186. time.Sleep(1 * time.Second)
  187. cvChan <- struct{}{}
  188. }
  189. }
  190. func (c *Client) GetCV(ctx *protocol.PacketContext, cvChan, ciChan chan struct{}) {
  191. defer c.srv.grWG.Done()
  192. for {
  193. <-cvChan
  194. entity := protocol.Dlt_0x33363435{}
  195. if ctx.GetReceiveAddress() != nil {
  196. sendByte, _ := entity.Encode(ctx)
  197. err := c.send(sendByte)
  198. if err != nil {
  199. glog.Debugf("指令发送失败:%s", err.Error())
  200. }
  201. }
  202. time.Sleep(1 * time.Second)
  203. ciChan <- struct{}{}
  204. }
  205. }
  206. func (c *Client) GetCI(ctx *protocol.PacketContext, ciChan, powerChan chan struct{}) {
  207. defer c.srv.grWG.Done()
  208. for {
  209. <-ciChan
  210. entity := protocol.Dlt_0x33363535{}
  211. if ctx.GetReceiveAddress() != nil {
  212. sendByte, _ := entity.Encode(ctx)
  213. err := c.send(sendByte)
  214. if err != nil {
  215. glog.Debugf("指令发送失败:%s", err.Error())
  216. }
  217. }
  218. time.Sleep(time.Duration(g.Cfg().GetInt("Server.PowerFrequency")-5) * time.Second)
  219. powerChan <- struct{}{}
  220. }
  221. }
  222. func (c *Client) SendGetAddress(ctx *protocol.PacketContext, addressChan, avChan chan struct{}) {
  223. defer c.srv.grWG.Done()
  224. <-addressChan
  225. bytea := []byte{0x68, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0x68, 0x13, 0x00, 0xDF, 0x16}
  226. _ = c.send(bytea)
  227. for {
  228. time.Sleep(time.Duration(g.Cfg().GetInt("Server.VIFrequency")) * time.Second)
  229. avChan <- struct{}{}
  230. }
  231. }