server.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. package server
  2. import (
  3. "context"
  4. "dlt645-server/protocol"
  5. "fmt"
  6. "github.com/gogf/gf/container/gmap"
  7. "github.com/gogf/gf/net/gtcp"
  8. "github.com/gogf/gf/os/glog"
  9. gatewayV2 "sparrow-sdk/v2"
  10. "sync"
  11. "time"
  12. )
  13. type Server struct {
  14. closeChan chan struct{}
  15. server *gtcp.Server
  16. ctx context.Context
  17. addr string
  18. port int
  19. clients *gmap.HashMap
  20. grMu sync.Mutex
  21. grWG sync.WaitGroup
  22. grRunning bool
  23. message protocol.Message
  24. gateWay *gatewayV2.Gateway
  25. }
  26. func NewServer(ctx context.Context, addr string, port int, gw *gatewayV2.Gateway) *Server {
  27. return &Server{
  28. closeChan: make(chan struct{}),
  29. ctx: ctx,
  30. addr: addr,
  31. port: port,
  32. gateWay: gw,
  33. }
  34. }
  35. func (s *Server) Start() error {
  36. glog.Printf("服务端启动[%s:%d]", s.addr, s.port)
  37. server := gtcp.NewServer(fmt.Sprintf("%s:%d", s.addr, s.port), s.handleConnect)
  38. s.server = server
  39. s.grMu.Lock()
  40. s.grRunning = true
  41. s.grMu.Unlock()
  42. return s.server.Run()
  43. }
  44. func (s *Server) Stop() {
  45. s.server.Close()
  46. }
  47. func (s *Server) handleConnect(conn *gtcp.Conn) {
  48. s.startGoRoutine(func() {
  49. s.createClient(conn)
  50. s.grWG.Done()
  51. })
  52. }
  53. func (s *Server) startGoRoutine(f func()) {
  54. s.grMu.Lock()
  55. if s.grRunning {
  56. s.grWG.Add(1)
  57. go f()
  58. }
  59. s.grMu.Unlock()
  60. }
  61. func (s *Server) createClient(conn *gtcp.Conn) *Client {
  62. c := &Client{
  63. srv: s,
  64. done: make(chan struct{}),
  65. conn: conn,
  66. closeHandler: func(id string, c *Client) {
  67. glog.Debugf("客户端断开:%s", id)
  68. },
  69. }
  70. ctx := new(protocol.PacketContext)
  71. addressChan := make(chan struct{}, 1)
  72. avChan := make(chan struct{}, 1)
  73. aiChan := make(chan struct{}, 1)
  74. bvChan := make(chan struct{}, 1)
  75. biChan := make(chan struct{}, 1)
  76. cvChan := make(chan struct{}, 1)
  77. ciChan := make(chan struct{}, 1)
  78. powerChan := make(chan struct{}, 1)
  79. addressChan <- struct{}{}
  80. s.startGoRoutine(func() {
  81. go c.SendGetAddress(ctx, addressChan, avChan)
  82. })
  83. s.startGoRoutine(func() {
  84. c.GetActivePower(ctx, powerChan)
  85. })
  86. s.startGoRoutine(func() {
  87. c.ReadLoop(ctx)
  88. })
  89. time.Sleep(1 * time.Second)
  90. s.startGoRoutine(func() {
  91. c.GetAV(ctx, avChan, aiChan)
  92. })
  93. time.Sleep(1 * time.Second)
  94. s.startGoRoutine(func() {
  95. c.GetAI(ctx, aiChan, bvChan)
  96. })
  97. time.Sleep(1 * time.Second)
  98. s.startGoRoutine(func() {
  99. c.GetBV(ctx, bvChan, biChan)
  100. })
  101. time.Sleep(1 * time.Second)
  102. s.startGoRoutine(func() {
  103. c.GetBI(ctx, biChan, cvChan)
  104. })
  105. time.Sleep(1 * time.Second)
  106. s.startGoRoutine(func() {
  107. c.GetCV(ctx, cvChan, ciChan)
  108. })
  109. time.Sleep(1 * time.Second)
  110. s.startGoRoutine(func() {
  111. c.GetCI(ctx, ciChan, powerChan)
  112. })
  113. return c
  114. }
  115. func (s *Server) removeClient(gatewayId uint16) {
  116. s.clients.Remove(gatewayId)
  117. }
  118. func (s *Server) ReportStatus(subId string, data interface{}) error {
  119. return s.gateWay.ReportStatus(subId, "status", data)
  120. }