package main import ( "github.com/gogf/gf/os/grpool" "sparrow/pkg/models" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/rpcs" "sparrow/pkg/server" ) // ClusterService 集群服务 type ClusterService struct { producer queue.QueueProducer } func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) { m, err := queue.NewGobQueueMessage(msg) if err != nil { return } if err := c.producer.Send(info, m, callback); err != nil { server.Log.Error(err) } } // EventService 事件持久化服务 type EventService struct { pool *grpool.Pool } // NewEventService create func NewEventService() *EventService { return &EventService{pool: grpool.New(5)} } func (e *EventService) Save(data *models.Event) error { return server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}) } func (e *EventService) SaveAsync(data *models.Event) error { return e.pool.Add(func() { if err := server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil { return } }) } //type TenantService struct { // //} // //func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) { // var arg int // var reply []*models.Vendor // var result []*ruleEngine.Tenant // err :=server.RPCCallByName(context.Background(), "registry", "registry.GetVendors", &arg, &reply) // if err != nil { // return nil, err // } // for _, vendor := range reply { // result = append(result, &ruleEngine.Tenant{ // Id: vendor.RecordId, // }) // } // return result, nil //} // //func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) { // var reply *models.Vendor // err := server. //}