main.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. package main
  2. import (
  3. "context"
  4. "log"
  5. "math"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sync"
  9. "time"
  10. "google.golang.org/grpc"
  11. pb "sparrow/services/emqx-agent/protobuf"
  12. )
  13. type Counter struct {
  14. count int64
  15. start int64
  16. duration int64
  17. maxCount int64
  18. lock sync.Mutex
  19. }
  20. func NewCounter(duration int64, maxCount int64) *Counter {
  21. counter := &Counter{}
  22. counter.count = 0
  23. counter.start = time.Now().UnixNano() / 1e6 // 当前毫秒时间戳
  24. if duration == 0 {
  25. counter.duration = math.MaxInt64 // duration传0表示没有时间间隔限制,计数器不刷新
  26. } else {
  27. counter.duration = duration
  28. }
  29. counter.maxCount = maxCount
  30. return counter
  31. }
  32. // Count 计数器计数
  33. // n: 计数值
  34. // refersh: 计数器是否刷新
  35. // limit: 是否达到计数最大值
  36. // num: 计数后计数器的值
  37. func (counter *Counter) Count(n int64) (refresh bool, limit bool, num int64) {
  38. now := time.Now().UnixNano() / 1e6
  39. counter.lock.Lock()
  40. defer counter.lock.Unlock()
  41. if now-counter.start < counter.duration {
  42. counter.count += n
  43. num = counter.count
  44. limit = num > counter.maxCount
  45. } else {
  46. // num = counter.count // 刷新前的最大计数
  47. counter.start = now
  48. counter.count = 0
  49. refresh = true
  50. }
  51. return
  52. }
  53. func (counter *Counter) GetCount() (num int64) {
  54. counter.lock.Lock()
  55. defer counter.lock.Unlock()
  56. return counter.count
  57. }
  58. const (
  59. port = ":9000"
  60. )
  61. var cnter *Counter = NewCounter(0, 100)
  62. // emqttServer is used to implement emqx_exhook_v1.s *emqttServer
  63. type emqttServer struct {
  64. pb.UnimplementedHookProviderServer
  65. }
  66. // HookProviderServer callbacks
  67. func (s *emqttServer) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
  68. cnter.Count(1)
  69. hooks := []*pb.HookSpec{
  70. &pb.HookSpec{Name: "client.connect"},
  71. &pb.HookSpec{Name: "client.connack"},
  72. &pb.HookSpec{Name: "client.connected"},
  73. &pb.HookSpec{Name: "client.disconnected"},
  74. &pb.HookSpec{Name: "client.authenticate"},
  75. &pb.HookSpec{Name: "client.authorize"},
  76. &pb.HookSpec{Name: "client.subscribe"},
  77. &pb.HookSpec{Name: "client.unsubscribe"},
  78. &pb.HookSpec{Name: "session.created"},
  79. &pb.HookSpec{Name: "session.subscribed"},
  80. &pb.HookSpec{Name: "session.unsubscribed"},
  81. &pb.HookSpec{Name: "session.resumed"},
  82. &pb.HookSpec{Name: "session.discarded"},
  83. &pb.HookSpec{Name: "session.takenover"},
  84. &pb.HookSpec{Name: "session.terminated"},
  85. &pb.HookSpec{Name: "message.publish"},
  86. &pb.HookSpec{Name: "message.delivered"},
  87. &pb.HookSpec{Name: "message.acked"},
  88. &pb.HookSpec{Name: "message.dropped"},
  89. }
  90. return &pb.LoadedResponse{Hooks: hooks}, nil
  91. }
  92. func (s *emqttServer) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
  93. cnter.Count(1)
  94. return &pb.EmptySuccess{}, nil
  95. }
  96. func (s *emqttServer) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
  97. cnter.Count(1)
  98. return &pb.EmptySuccess{}, nil
  99. }
  100. func (s *emqttServer) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {
  101. cnter.Count(1)
  102. return &pb.EmptySuccess{}, nil
  103. }
  104. func (s *emqttServer) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
  105. cnter.Count(1)
  106. return &pb.EmptySuccess{}, nil
  107. }
  108. func (s *emqttServer) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
  109. cnter.Count(1)
  110. return &pb.EmptySuccess{}, nil
  111. }
  112. func (s *emqttServer) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
  113. cnter.Count(1)
  114. reply := &pb.ValuedResponse{}
  115. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  116. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  117. return reply, nil
  118. }
  119. func (s *emqttServer) OnClientAuthorize(ctx context.Context, in *pb.ClientAuthorizeRequest) (*pb.ValuedResponse, error) {
  120. cnter.Count(1)
  121. reply := &pb.ValuedResponse{}
  122. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  123. reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
  124. return reply, nil
  125. }
  126. func (s *emqttServer) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
  127. cnter.Count(1)
  128. return &pb.EmptySuccess{}, nil
  129. }
  130. func (s *emqttServer) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
  131. cnter.Count(1)
  132. return &pb.EmptySuccess{}, nil
  133. }
  134. func (s *emqttServer) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
  135. cnter.Count(1)
  136. return &pb.EmptySuccess{}, nil
  137. }
  138. func (s *emqttServer) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
  139. cnter.Count(1)
  140. return &pb.EmptySuccess{}, nil
  141. }
  142. func (s *emqttServer) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
  143. cnter.Count(1)
  144. return &pb.EmptySuccess{}, nil
  145. }
  146. func (s *emqttServer) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
  147. cnter.Count(1)
  148. return &pb.EmptySuccess{}, nil
  149. }
  150. func (s *emqttServer) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
  151. cnter.Count(1)
  152. return &pb.EmptySuccess{}, nil
  153. }
  154. func (s *emqttServer) OnSessionTakenover(ctx context.Context, in *pb.SessionTakenoverRequest) (*pb.EmptySuccess, error) {
  155. cnter.Count(1)
  156. return &pb.EmptySuccess{}, nil
  157. }
  158. func (s *emqttServer) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
  159. cnter.Count(1)
  160. return &pb.EmptySuccess{}, nil
  161. }
  162. func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  163. cnter.Count(1)
  164. in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
  165. reply := &pb.ValuedResponse{}
  166. reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  167. reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  168. return reply, nil
  169. }
  170. //case2: stop publish the `t/d` messages
  171. //func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
  172. // cnter.Count(1)
  173. // if in.Message.Topic == "t/d" {
  174. // in.Message.Headers["allow_publish"] = "false"
  175. // in.Message.Payload = []byte("")
  176. // }
  177. // reply := &pb.ValuedResponse{}
  178. // reply.Type = pb.ValuedResponse_STOP_AND_RETURN
  179. // reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
  180. // return reply, nil
  181. //}
  182. func (s *emqttServer) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
  183. cnter.Count(1)
  184. return &pb.EmptySuccess{}, nil
  185. }
  186. func (s *emqttServer) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
  187. cnter.Count(1)
  188. return &pb.EmptySuccess{}, nil
  189. }
  190. func (s *emqttServer) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
  191. cnter.Count(1)
  192. return &pb.EmptySuccess{}, nil
  193. }
  194. func main() {
  195. // init emqttServer
  196. err := server.Init(rpcs.EmqxAgentServiceName)
  197. if err != nil {
  198. server.Log.Fatal(err)
  199. return
  200. }
  201. s := grpc.NewServer()
  202. pb.RegisterHookProviderServer(s, &emqttServer{})
  203. log.Println("Started gRPC emqttServer on ::9000")
  204. // register a http handler
  205. err = server.RegisterHTTPHandler(s)
  206. if err != nil {
  207. server.Log.Errorf("RegisterHTTPHandler Error: %s", err)
  208. return
  209. }
  210. }