package main import ( "context" "github.com/gogf/gf/frame/g" "math" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sparrow/services/emqx-agent/client" "sync" "time" pb "sparrow/services/emqx-agent/protobuf" ) type Counter struct { count int64 start int64 duration int64 maxCount int64 lock sync.Mutex } func NewCounter(duration int64, maxCount int64) *Counter { counter := &Counter{} counter.count = 0 counter.start = time.Now().UnixNano() / 1e6 // 当前毫秒时间戳 if duration == 0 { counter.duration = math.MaxInt64 // duration传0表示没有时间间隔限制,计数器不刷新 } else { counter.duration = duration } counter.maxCount = maxCount return counter } // Count 计数器计数 // n: 计数值 // refersh: 计数器是否刷新 // limit: 是否达到计数最大值 // num: 计数后计数器的值 func (counter *Counter) Count(n int64) (refresh bool, limit bool, num int64) { now := time.Now().UnixNano() / 1e6 counter.lock.Lock() defer counter.lock.Unlock() if now-counter.start < counter.duration { counter.count += n num = counter.count limit = num > counter.maxCount } else { // num = counter.count // 刷新前的最大计数 counter.start = now counter.count = 0 refresh = true } return } func (counter *Counter) GetCount() (num int64) { counter.lock.Lock() defer counter.lock.Unlock() return counter.count } var cnter *Counter = NewCounter(0, 100) // emqttServer is used to implement emqx_exhook_v1.s *emqttServer type emqttServer struct { pb.UnimplementedHookProviderServer } // HookProviderServer callbacks func (s *emqttServer) OnProviderLoaded(ctx context.Context, in *pb.ProviderLoadedRequest) (*pb.LoadedResponse, error) { cnter.Count(1) hooks := []*pb.HookSpec{ &pb.HookSpec{Name: "client.connect"}, &pb.HookSpec{Name: "client.connack"}, &pb.HookSpec{Name: "client.connected"}, &pb.HookSpec{Name: "client.disconnected"}, &pb.HookSpec{Name: "client.authenticate"}, &pb.HookSpec{Name: "client.authorize"}, &pb.HookSpec{Name: "client.subscribe"}, &pb.HookSpec{Name: "client.unsubscribe"}, &pb.HookSpec{Name: "session.created"}, &pb.HookSpec{Name: "session.subscribed"}, &pb.HookSpec{Name: "session.unsubscribed"}, &pb.HookSpec{Name: "session.resumed"}, &pb.HookSpec{Name: "session.discarded"}, &pb.HookSpec{Name: "session.takenover"}, &pb.HookSpec{Name: "session.terminated"}, &pb.HookSpec{Name: "message.publish"}, &pb.HookSpec{Name: "message.delivered"}, &pb.HookSpec{Name: "message.acked"}, &pb.HookSpec{Name: "message.dropped"}, } return &pb.LoadedResponse{Hooks: hooks}, nil } func (s *emqttServer) OnProviderUnloaded(ctx context.Context, in *pb.ProviderUnloadedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientConnect(ctx context.Context, in *pb.ClientConnectRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientConnack(ctx context.Context, in *pb.ClientConnackRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientConnected(ctx context.Context, in *pb.ClientConnectedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientDisconnected(ctx context.Context, in *pb.ClientDisconnectedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientAuthenticate(ctx context.Context, in *pb.ClientAuthenticateRequest) (*pb.ValuedResponse, error) { cnter.Count(1) reply := &pb.ValuedResponse{} reply.Type = pb.ValuedResponse_STOP_AND_RETURN reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true} return reply, nil } func (s *emqttServer) OnClientAuthorize(ctx context.Context, in *pb.ClientAuthorizeRequest) (*pb.ValuedResponse, error) { cnter.Count(1) reply := &pb.ValuedResponse{} reply.Type = pb.ValuedResponse_STOP_AND_RETURN reply.Value = &pb.ValuedResponse_BoolResult{BoolResult: true} return reply, nil } func (s *emqttServer) OnClientSubscribe(ctx context.Context, in *pb.ClientSubscribeRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnClientUnsubscribe(ctx context.Context, in *pb.ClientUnsubscribeRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionCreated(ctx context.Context, in *pb.SessionCreatedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionSubscribed(ctx context.Context, in *pb.SessionSubscribedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionUnsubscribed(ctx context.Context, in *pb.SessionUnsubscribedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionResumed(ctx context.Context, in *pb.SessionResumedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionDiscarded(ctx context.Context, in *pb.SessionDiscardedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionTakenover(ctx context.Context, in *pb.SessionTakenoverRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnSessionTerminated(ctx context.Context, in *pb.SessionTerminatedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) { cnter.Count(1) in.Message.Payload = []byte("hardcode payload by exhook-svr-go :)") reply := &pb.ValuedResponse{} reply.Type = pb.ValuedResponse_STOP_AND_RETURN reply.Value = &pb.ValuedResponse_Message{Message: in.Message} return reply, nil } //case2: stop publish the `t/d` messages //func (s *emqttServer) OnMessagePublish(ctx context.Context, in *pb.MessagePublishRequest) (*pb.ValuedResponse, error) { // cnter.Count(1) // if in.Message.Topic == "t/d" { // in.Message.Headers["allow_publish"] = "false" // in.Message.Payload = []byte("") // } // reply := &pb.ValuedResponse{} // reply.Type = pb.ValuedResponse_STOP_AND_RETURN // reply.Value = &pb.ValuedResponse_Message{Message: in.Message} // return reply, nil //} func (s *emqttServer) OnMessageDelivered(ctx context.Context, in *pb.MessageDeliveredRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnMessageDropped(ctx context.Context, in *pb.MessageDroppedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func (s *emqttServer) OnMessageAcked(ctx context.Context, in *pb.MessageAckedRequest) (*pb.EmptySuccess, error) { cnter.Count(1) return &pb.EmptySuccess{}, nil } func main() { // init emqttServer err := server.Init(rpcs.EmqxAgentServiceName) if err != nil { server.Log.Fatal(err) return } sd, err := NewSubDev(&client.MqttConfig{ ClientId: g.Cfg().GetString("mqtt.client_id"), User: g.Cfg().GetString("mqtt.user"), Password: g.Cfg().GetString("mqtt.password"), Brokers: g.Cfg().GetStrings("mqtt.brokers"), ConnNum: 10, }) if err != nil { panic(err) } agent := NewAgent(sd) err = sd.SubDevMsg(func(ctx context.Context) DevSubHandle { return agent }) if err != nil { panic(err) } err = server.RegisterRPCHandler(agent) // start to run err = server.Run() if err != nil { server.Log.Fatal(err) } }