package main import ( "context" "github.com/gogf/gf/os/grpool" "sparrow/pkg/models" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/rpcs" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" ) // ClusterService 集群服务 type ClusterService struct { producer queue.QueueProducer } func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) { m, err := queue.NewGobQueueMessage(msg) if err != nil { return } if err := c.producer.Send(info, m, callback); err != nil { server.Log.Error(err) } } // EventService 事件持久化服务 type EventService struct { pool *grpool.Pool } // NewEventService create func NewEventService() *EventService { return &EventService{pool: grpool.New(5)} } func (e *EventService) Save(data *models.Event) error { return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}) } func (e *EventService) SaveAsync(data *models.Event) error { return e.pool.Add(func() { if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil { return } }) } // TODO:完成这里 type TenantService struct { } func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) { var arg int var reply []*models.Vendor var result []*ruleEngine.Tenant err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply) if err != nil { return nil, err } for _, vendor := range reply { result = append(result, &ruleEngine.Tenant{ Id: vendor.RecordId, }) } return result, nil } func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) { var reply *models.Vendor err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply) if err != nil { return nil, err } return &ruleEngine.Tenant{ Id: reply.RecordId, Name: reply.VendorName, }, nil }