main.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/frame/g"
  5. "math"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sparrow/services/emqx-agent/client"
  9. "sync"
  10. "time"
  11. "google.golang.org/grpc"
  12. pb "sparrow/services/emqx-agent/protobuf"
  13. )
  14. type Counter struct {
  15. count int64
  16. start int64
  17. duration int64
  18. maxCount int64
  19. lock sync.Mutex
  20. }
  21. func NewCounter(duration int64, maxCount int64) *Counter {
  22. counter := &Counter{}
  23. counter.count = 0
  24. counter.start = time.Now().UnixNano() / 1e6 // 当前毫秒时间戳
  25. if duration == 0 {
  26. counter.duration = math.MaxInt64 // duration传0表示没有时间间隔限制,计数器不刷新
  27. } else {
  28. counter.duration = duration
  29. }
  30. counter.maxCount = maxCount
  31. return counter
  32. }
  33. // Count 计数器计数
  34. // n: 计数值
  35. // refersh: 计数器是否刷新
  36. // limit: 是否达到计数最大值
  37. // num: 计数后计数器的值
  38. func (counter *Counter) Count(n int64) (refresh bool, limit bool, num int64) {
  39. now := time.Now().UnixNano() / 1e6
  40. counter.lock.Lock()
  41. defer counter.lock.Unlock()
  42. if now-counter.start < counter.duration {
  43. counter.count += n
  44. num = counter.count
  45. limit = num > counter.maxCount
  46. } else {
  47. // num = counter.count // 刷新前的最大计数
  48. counter.start = now
  49. counter.count = 0
  50. refresh = true
  51. }
  52. return
  53. }
  54. func (counter *Counter) GetCount() (num int64) {
  55. counter.lock.Lock()
  56. defer counter.lock.Unlock()
  57. return counter.count
  58. }
  59. var cnter *Counter = NewCounter(0, 100)
  60. // emqttServer is used to implement emqx_exhook_v1.s *emqttServer
  61. type emqttServer struct {
  62. pb.UnimplementedHookProviderServer
  63. }
  64. // HookProviderServer callbacks
  65. func (s *emqttServer) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
  66. cnter.Count(1)
  67. hooks := []*pb.HookSpec{
  68. &pb.HookSpec{Name: "client.connect"},
  69. &pb.HookSpec{Name: "client.connack"},
  70. &pb.HookSpec{Name: "client.connected"},
  71. &pb.HookSpec{Name: "client.disconnected"},
  72. &pb.HookSpec{Name: "client.authenticate"},
  73. &pb.HookSpec{Name: "client.authorize"},
  74. &pb.HookSpec{Name: "client.subscribe"},
  75. &pb.HookSpec{Name: "client.unsubscribe"},
  76. &pb.HookSpec{Name: "session.created"},
  77. &pb.HookSpec{Name: "session.subscribed"},
  78. &pb.HookSpec{Name: "session.unsubscribed"},
  79. &pb.HookSpec{Name: "session.resumed"},
  80. &pb.HookSpec{Name: "session.discarded"},
  81. &pb.HookSpec{Name: "session.takenover"},
  82. &pb.HookSpec{Name: "session.terminated"},
  83. &pb.HookSpec{Name: "message.publish"},
  84. &pb.HookSpec{Name: "message.delivered"},
  85. &pb.HookSpec{Name: "message.acked"},
  86. &pb.HookSpec{Name: "message.dropped"},
  87. }
  88. return &pb.LoadedResponse{Hooks: hooks}, nil
  89. }
  90. func (s *emqttServer) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
  91. cnter.Count(1)
  92. return &pb.EmptySuccess{}, nil
  93. }
  94. func (s *emqttServer) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
  95. cnter.Count(1)
  96. return &pb.EmptySuccess{}, nil
  97. }
  98. func (s *emqttServer) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {
  99. cnter.Count(1)
  100. return &pb.EmptySuccess{}, nil
  101. }
  102. func (s *emqttServer) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
  103. cnter.Count(1)
  104. return &pb.EmptySuccess{}, nil
  105. }
  106. func (s *emqttServer) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
  107. cnter.Count(1)
  108. return &pb.EmptySuccess{}, nil
  109. }
  110. func (s *emqttServer) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
  111. cnter.Count(1)
  112. reply := &pb.ValuedResponse{}
  113. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  114. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  115. return reply, nil
  116. }
  117. func (s *emqttServer) OnClientAuthorize(ctx context.Context, in *pb.ClientAuthorizeRequest) (*pb.ValuedResponse, error) {
  118. cnter.Count(1)
  119. reply := &pb.ValuedResponse{}
  120. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  121. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  122. return reply, nil
  123. }
  124. func (s *emqttServer) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
  125. cnter.Count(1)
  126. return &pb.EmptySuccess{}, nil
  127. }
  128. func (s *emqttServer) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
  129. cnter.Count(1)
  130. return &pb.EmptySuccess{}, nil
  131. }
  132. func (s *emqttServer) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
  133. cnter.Count(1)
  134. return &pb.EmptySuccess{}, nil
  135. }
  136. func (s *emqttServer) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
  137. cnter.Count(1)
  138. return &pb.EmptySuccess{}, nil
  139. }
  140. func (s *emqttServer) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
  141. cnter.Count(1)
  142. return &pb.EmptySuccess{}, nil
  143. }
  144. func (s *emqttServer) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
  145. cnter.Count(1)
  146. return &pb.EmptySuccess{}, nil
  147. }
  148. func (s *emqttServer) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
  149. cnter.Count(1)
  150. return &pb.EmptySuccess{}, nil
  151. }
  152. func (s *emqttServer) OnSessionTakenover(ctx context.Context, in *pb.SessionTakenoverRequest) (*pb.EmptySuccess, error) {
  153. cnter.Count(1)
  154. return &pb.EmptySuccess{}, nil
  155. }
  156. func (s *emqttServer) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
  157. cnter.Count(1)
  158. return &pb.EmptySuccess{}, nil
  159. }
  160. func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  161. cnter.Count(1)
  162. in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
  163. reply := &pb.ValuedResponse{}
  164. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  165. reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  166. return reply, nil
  167. }
  168. //case2: stop publish the `t/d` messages
  169. //func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  170. // cnter.Count(1)
  171. // if in.Message.Topic == "t/d" {
  172. // in.Message.Headers["allow_publish"] = "false"
  173. // in.Message.Payload = []byte("")
  174. // }
  175. // reply := &pb.ValuedResponse{}
  176. // reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  177. // reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  178. // return reply, nil
  179. //}
  180. func (s *emqttServer) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
  181. cnter.Count(1)
  182. return &pb.EmptySuccess{}, nil
  183. }
  184. func (s *emqttServer) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
  185. cnter.Count(1)
  186. return &pb.EmptySuccess{}, nil
  187. }
  188. func (s *emqttServer) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
  189. cnter.Count(1)
  190. return &pb.EmptySuccess{}, nil
  191. }
  192. func main() {
  193. // init emqttServer
  194. err := server.Init(rpcs.EmqxAgentServiceName)
  195. if err != nil {
  196. server.Log.Fatal(err)
  197. return
  198. }
  199. sd, err := NewSubDev(&client.MqttConfig{
  200. ClientId: g.Cfg().GetString("mqtt.client_id"),
  201. User: g.Cfg().GetString("mqtt.user"),
  202. Password: g.Cfg().GetString("mqtt.password"),
  203. Brokers: g.Cfg().GetStrings("mqtt.brokers"),
  204. ConnNum: 10,
  205. })
  206. if err != nil {
  207. panic(err)
  208. }
  209. agent := NewAgent(sd)
  210. err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle {
  211. return agent
  212. })
  213. if err != nil {
  214. panic(err)
  215. }
  216. s := grpc.NewServer()
  217. pb.RegisterHookProviderServer(s, &emqttServer{})
  218. err = server.RegisterHTTPHandler(s)
  219. if err != nil {
  220. server.Log.Errorf("RegisterHTTPHandler Error: %s", err)
  221. return
  222. }
  223. err = server.RegisterRPCHandler(agent)
  224. // start to run
  225. err = server.Run()
  226. if err != nil {
  227. server.Log.Fatal(err)
  228. }
  229. }