main.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/frame/g"
  5. "math"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sparrow/services/emqx-agent/client"
  9. "sync"
  10. "time"
  11. pb "sparrow/services/emqx-agent/protobuf"
  12. )
  13. type Counter struct {
  14. count int64
  15. start int64
  16. duration int64
  17. maxCount int64
  18. lock sync.Mutex
  19. }
  20. func NewCounter(duration int64, maxCount int64) *Counter {
  21. counter := &Counter{}
  22. counter.count = 0
  23. counter.start = time.Now().UnixNano() / 1e6 // 当前毫秒时间戳
  24. if duration == 0 {
  25. counter.duration = math.MaxInt64 // duration传0表示没有时间间隔限制,计数器不刷新
  26. } else {
  27. counter.duration = duration
  28. }
  29. counter.maxCount = maxCount
  30. return counter
  31. }
  32. // Count 计数器计数
  33. // n: 计数值
  34. // refersh: 计数器是否刷新
  35. // limit: 是否达到计数最大值
  36. // num: 计数后计数器的值
  37. func (counter *Counter) Count(n int64) (refresh bool, limit bool, num int64) {
  38. now := time.Now().UnixNano() / 1e6
  39. counter.lock.Lock()
  40. defer counter.lock.Unlock()
  41. if now-counter.start < counter.duration {
  42. counter.count += n
  43. num = counter.count
  44. limit = num > counter.maxCount
  45. } else {
  46. // num = counter.count // 刷新前的最大计数
  47. counter.start = now
  48. counter.count = 0
  49. refresh = true
  50. }
  51. return
  52. }
  53. func (counter *Counter) GetCount() (num int64) {
  54. counter.lock.Lock()
  55. defer counter.lock.Unlock()
  56. return counter.count
  57. }
  58. var cnter *Counter = NewCounter(0, 100)
  59. // emqttServer is used to implement emqx_exhook_v1.s *emqttServer
  60. type emqttServer struct {
  61. pb.UnimplementedHookProviderServer
  62. }
  63. // HookProviderServer callbacks
  64. func (s *emqttServer) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
  65. cnter.Count(1)
  66. hooks := []*pb.HookSpec{
  67. &pb.HookSpec{Name: "client.connect"},
  68. &pb.HookSpec{Name: "client.connack"},
  69. &pb.HookSpec{Name: "client.connected"},
  70. &pb.HookSpec{Name: "client.disconnected"},
  71. &pb.HookSpec{Name: "client.authenticate"},
  72. &pb.HookSpec{Name: "client.authorize"},
  73. &pb.HookSpec{Name: "client.subscribe"},
  74. &pb.HookSpec{Name: "client.unsubscribe"},
  75. &pb.HookSpec{Name: "session.created"},
  76. &pb.HookSpec{Name: "session.subscribed"},
  77. &pb.HookSpec{Name: "session.unsubscribed"},
  78. &pb.HookSpec{Name: "session.resumed"},
  79. &pb.HookSpec{Name: "session.discarded"},
  80. &pb.HookSpec{Name: "session.takenover"},
  81. &pb.HookSpec{Name: "session.terminated"},
  82. &pb.HookSpec{Name: "message.publish"},
  83. &pb.HookSpec{Name: "message.delivered"},
  84. &pb.HookSpec{Name: "message.acked"},
  85. &pb.HookSpec{Name: "message.dropped"},
  86. }
  87. return &pb.LoadedResponse{Hooks: hooks}, nil
  88. }
  89. func (s *emqttServer) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
  90. cnter.Count(1)
  91. return &pb.EmptySuccess{}, nil
  92. }
  93. func (s *emqttServer) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
  94. cnter.Count(1)
  95. return &pb.EmptySuccess{}, nil
  96. }
  97. func (s *emqttServer) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {
  98. cnter.Count(1)
  99. return &pb.EmptySuccess{}, nil
  100. }
  101. func (s *emqttServer) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
  102. cnter.Count(1)
  103. return &pb.EmptySuccess{}, nil
  104. }
  105. func (s *emqttServer) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
  106. cnter.Count(1)
  107. return &pb.EmptySuccess{}, nil
  108. }
  109. func (s *emqttServer) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
  110. cnter.Count(1)
  111. reply := &pb.ValuedResponse{}
  112. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  113. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  114. return reply, nil
  115. }
  116. func (s *emqttServer) OnClientAuthorize(ctx context.Context, in *pb.ClientAuthorizeRequest) (*pb.ValuedResponse, error) {
  117. cnter.Count(1)
  118. reply := &pb.ValuedResponse{}
  119. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  120. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  121. return reply, nil
  122. }
  123. func (s *emqttServer) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
  124. cnter.Count(1)
  125. return &pb.EmptySuccess{}, nil
  126. }
  127. func (s *emqttServer) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
  128. cnter.Count(1)
  129. return &pb.EmptySuccess{}, nil
  130. }
  131. func (s *emqttServer) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
  132. cnter.Count(1)
  133. return &pb.EmptySuccess{}, nil
  134. }
  135. func (s *emqttServer) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
  136. cnter.Count(1)
  137. return &pb.EmptySuccess{}, nil
  138. }
  139. func (s *emqttServer) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
  140. cnter.Count(1)
  141. return &pb.EmptySuccess{}, nil
  142. }
  143. func (s *emqttServer) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
  144. cnter.Count(1)
  145. return &pb.EmptySuccess{}, nil
  146. }
  147. func (s *emqttServer) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
  148. cnter.Count(1)
  149. return &pb.EmptySuccess{}, nil
  150. }
  151. func (s *emqttServer) OnSessionTakenover(ctx context.Context, in *pb.SessionTakenoverRequest) (*pb.EmptySuccess, error) {
  152. cnter.Count(1)
  153. return &pb.EmptySuccess{}, nil
  154. }
  155. func (s *emqttServer) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
  156. cnter.Count(1)
  157. return &pb.EmptySuccess{}, nil
  158. }
  159. func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  160. cnter.Count(1)
  161. in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
  162. reply := &pb.ValuedResponse{}
  163. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  164. reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  165. return reply, nil
  166. }
  167. //case2: stop publish the `t/d` messages
  168. //func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  169. // cnter.Count(1)
  170. // if in.Message.Topic == "t/d" {
  171. // in.Message.Headers["allow_publish"] = "false"
  172. // in.Message.Payload = []byte("")
  173. // }
  174. // reply := &pb.ValuedResponse{}
  175. // reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  176. // reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  177. // return reply, nil
  178. //}
  179. func (s *emqttServer) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
  180. cnter.Count(1)
  181. return &pb.EmptySuccess{}, nil
  182. }
  183. func (s *emqttServer) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
  184. cnter.Count(1)
  185. return &pb.EmptySuccess{}, nil
  186. }
  187. func (s *emqttServer) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
  188. cnter.Count(1)
  189. return &pb.EmptySuccess{}, nil
  190. }
  191. func main() {
  192. // init emqttServer
  193. err := server.Init(rpcs.EmqxAgentServiceName)
  194. if err != nil {
  195. server.Log.Fatal(err)
  196. return
  197. }
  198. sd, err := NewSubDev(&client.MqttConfig{
  199. ClientId: g.Cfg().GetString("mqtt.client_id"),
  200. User: g.Cfg().GetString("mqtt.user"),
  201. Password: g.Cfg().GetString("mqtt.password"),
  202. Brokers: g.Cfg().GetStrings("mqtt.brokers"),
  203. ConnNum: 10,
  204. })
  205. if err != nil {
  206. panic(err)
  207. }
  208. agent := NewAgent(sd)
  209. err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle {
  210. return agent
  211. })
  212. if err != nil {
  213. panic(err)
  214. }
  215. err = server.RegisterRPCHandler(agent)
  216. // start to run
  217. err = server.Run()
  218. if err != nil {
  219. server.Log.Fatal(err)
  220. }
  221. }