123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- package main
- import (
- "context"
- "github.com/gogf/gf/os/grpool"
- "sparrow/pkg/models"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- )
- // ClusterService 集群服务
- type ClusterService struct {
- producer queue.QueueProducer
- }
- func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
- m, err := queue.NewGobQueueMessage(msg)
- if err != nil {
- return
- }
- if err := c.producer.Send(info, m, callback); err != nil {
- server.Log.Error(err)
- }
- }
- // EventService 事件持久化服务
- type EventService struct {
- pool *grpool.Pool
- }
- // NewEventService create
- func NewEventService() *EventService {
- return &EventService{pool: grpool.New(5)}
- }
- func (e *EventService) Save(data *models.Event) error {
- return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
- }
- func (e *EventService) SaveAsync(data *models.Event) error {
- return e.pool.Add(func() {
- if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
- return
- }
- })
- }
- // TODO:完成这里
- type TenantService struct {
- }
- func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
- var arg int
- var reply []*models.Vendor
- var result []*ruleEngine.Tenant
- err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply)
- if err != nil {
- return nil, err
- }
- for _, vendor := range reply {
- result = append(result, &ruleEngine.Tenant{
- Id: vendor.RecordId,
- })
- }
- return result, nil
- }
- func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
- var reply *models.Vendor
- err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply)
- if err != nil {
- return nil, err
- }
- return &ruleEngine.Tenant{
- Id: reply.RecordId,
- Name: reply.VendorName,
- }, nil
- }
|