server.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package server
  2. import (
  3. "context"
  4. "dlt645-server/protocol"
  5. "fmt"
  6. "github.com/gogf/gf/container/gmap"
  7. "github.com/gogf/gf/net/gtcp"
  8. "github.com/gogf/gf/os/glog"
  9. gatewayV2 "sparrow-sdk/v2"
  10. "sync"
  11. )
  12. type Server struct {
  13. closeChan chan struct{}
  14. server *gtcp.Server
  15. ctx context.Context
  16. addr string
  17. port int
  18. clients *gmap.HashMap
  19. grMu sync.Mutex
  20. grWG sync.WaitGroup
  21. grRunning bool
  22. message protocol.Message
  23. gateWay *gatewayV2.Gateway
  24. }
  25. func NewServer(ctx context.Context, addr string, port int, gw *gatewayV2.Gateway) *Server {
  26. return &Server{
  27. closeChan: make(chan struct{}),
  28. ctx: ctx,
  29. addr: addr,
  30. port: port,
  31. gateWay: gw,
  32. }
  33. }
  34. func (s *Server) Start() error {
  35. glog.Printf("服务端启动[%s:%d]", s.addr, s.port)
  36. server := gtcp.NewServer(fmt.Sprintf("%s:%d", s.addr, s.port), s.handleConnect)
  37. s.server = server
  38. s.grMu.Lock()
  39. s.grRunning = true
  40. s.grMu.Unlock()
  41. return s.server.Run()
  42. }
  43. func (s *Server) Stop() {
  44. s.server.Close()
  45. }
  46. func (s *Server) handleConnect(conn *gtcp.Conn) {
  47. s.startGoRoutine(func() {
  48. s.createClient(conn)
  49. s.grWG.Done()
  50. })
  51. }
  52. func (s *Server) startGoRoutine(f func()) {
  53. s.grMu.Lock()
  54. if s.grRunning {
  55. s.grWG.Add(1)
  56. go f()
  57. }
  58. s.grMu.Unlock()
  59. }
  60. func (s *Server) createClient(conn *gtcp.Conn) *Client {
  61. c := &Client{
  62. srv: s,
  63. done: make(chan struct{}),
  64. nc: conn,
  65. closeHandler: func(id string, c *Client) {
  66. glog.Debugf("客户端断开:%s", id)
  67. },
  68. }
  69. glog.Info("client connect created")
  70. s.startGoRoutine(func() {
  71. c.ReadLoop()
  72. })
  73. return c
  74. }
  75. func (s *Server) removeClient(gatewayId uint16) {
  76. s.clients.Remove(gatewayId)
  77. }
  78. func (s *Server) ReportStatus(subId string, data interface{}) error {
  79. return s.gateWay.ReportStatus(subId, "status", data)
  80. }