123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- package main
- import (
- "context"
- "github.com/gogf/gf/frame/g"
- "math"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "sparrow/services/emqx-agent/client"
- "sync"
- "time"
- pb "sparrow/services/emqx-agent/protobuf"
- )
- type Counter struct {
- count int64
- start int64
- duration int64
- maxCount int64
- lock sync.Mutex
- }
- func NewCounter(duration int64, maxCount int64) *Counter {
- counter := &Counter{}
- counter.count = 0
- counter.start = time.Now().UnixNano() / 1e6 // 当前毫秒时间戳
- if duration == 0 {
- counter.duration = math.MaxInt64 // duration传0表示没有时间间隔限制,计数器不刷新
- } else {
- counter.duration = duration
- }
- counter.maxCount = maxCount
- return counter
- }
- // Count 计数器计数
- // n: 计数值
- // refersh: 计数器是否刷新
- // limit: 是否达到计数最大值
- // num: 计数后计数器的值
- func (counter *Counter) Count(n int64) (refresh bool, limit bool, num int64) {
- now := time.Now().UnixNano() / 1e6
- counter.lock.Lock()
- defer counter.lock.Unlock()
- if now-counter.start < counter.duration {
- counter.count += n
- num = counter.count
- limit = num > counter.maxCount
- } else {
- // num = counter.count // 刷新前的最大计数
- counter.start = now
- counter.count = 0
- refresh = true
- }
- return
- }
- func (counter *Counter) GetCount() (num int64) {
- counter.lock.Lock()
- defer counter.lock.Unlock()
- return counter.count
- }
- var cnter *Counter = NewCounter(0, 100)
- // emqttServer is used to implement emqx_exhook_v1.s *emqttServer
- type emqttServer struct {
- pb.UnimplementedHookProviderServer
- }
- // HookProviderServer callbacks
- func (s *emqttServer) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) {
- cnter.Count(1)
- hooks := []*pb.HookSpec{
- &pb.HookSpec{Name: "client.connect"},
- &pb.HookSpec{Name: "client.connack"},
- &pb.HookSpec{Name: "client.connected"},
- &pb.HookSpec{Name: "client.disconnected"},
- &pb.HookSpec{Name: "client.authenticate"},
- &pb.HookSpec{Name: "client.authorize"},
- &pb.HookSpec{Name: "client.subscribe"},
- &pb.HookSpec{Name: "client.unsubscribe"},
- &pb.HookSpec{Name: "session.created"},
- &pb.HookSpec{Name: "session.subscribed"},
- &pb.HookSpec{Name: "session.unsubscribed"},
- &pb.HookSpec{Name: "session.resumed"},
- &pb.HookSpec{Name: "session.discarded"},
- &pb.HookSpec{Name: "session.takenover"},
- &pb.HookSpec{Name: "session.terminated"},
- &pb.HookSpec{Name: "message.publish"},
- &pb.HookSpec{Name: "message.delivered"},
- &pb.HookSpec{Name: "message.acked"},
- &pb.HookSpec{Name: "message.dropped"},
- }
- return &pb.LoadedResponse{Hooks: hooks}, nil
- }
- func (s *emqttServer) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) {
- cnter.Count(1)
- reply := &pb.ValuedResponse{}
- reply.Type = pb.ValuedResponse_STOP_AND_RETURN
- reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
- return reply, nil
- }
- func (s *emqttServer) OnClientAuthorize(ctx context.Context, in *pb.ClientAuthorizeRequest) (*pb.ValuedResponse, error) {
- cnter.Count(1)
- reply := &pb.ValuedResponse{}
- reply.Type = pb.ValuedResponse_STOP_AND_RETURN
- reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true}
- return reply, nil
- }
- func (s *emqttServer) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionTakenover(ctx context.Context, in *pb.SessionTakenoverRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
- cnter.Count(1)
- in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)")
- reply := &pb.ValuedResponse{}
- reply.Type = pb.ValuedResponse_STOP_AND_RETURN
- reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
- return reply, nil
- }
- //case2: stop publish the `t/d` messages
- //func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) {
- // cnter.Count(1)
- // if in.Message.Topic == "t/d" {
- // in.Message.Headers["allow_publish"] = "false"
- // in.Message.Payload = []byte("")
- // }
- // reply := &pb.ValuedResponse{}
- // reply.Type = pb.ValuedResponse_STOP_AND_RETURN
- // reply.Value = &pb.ValuedResponse_Message{Message: in.Message}
- // return reply, nil
- //}
- func (s *emqttServer) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func (s *emqttServer) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) {
- cnter.Count(1)
- return &pb.EmptySuccess{}, nil
- }
- func main() {
- // init emqttServer
- err := server.Init(rpcs.EmqxAgentServiceName)
- if err != nil {
- server.Log.Fatal(err)
- return
- }
- sd, err := NewSubDev(&client.MqttConfig{
- ClientId: g.Cfg().GetString("mqtt.client_id"),
- User: g.Cfg().GetString("mqtt.user"),
- Password: g.Cfg().GetString("mqtt.password"),
- Brokers: g.Cfg().GetStrings("mqtt.brokers"),
- ConnNum: 10,
- })
- if err != nil {
- panic(err)
- }
- agent := NewAgent(sd)
- err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle {
- return agent
- })
- if err != nil {
- panic(err)
- }
- err = server.RegisterRPCHandler(agent)
- // start to run
- err = server.Run()
- if err != nil {
- server.Log.Fatal(err)
- }
- }
|