package main import ( "context" "github.com/gogf/gf/os/grpool" "sparrow/pkg/models" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/rpcs" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" ) // ClusterService 集群服务 type ClusterService struct { producer queue.QueueProducer } func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) { m, err := queue.NewGobQueueMessage(msg) if err != nil { return } if err := c.producer.Send(info, m, callback); err != nil { server.Log.Error(err) } } // EventService 事件持久化服务 type EventService struct { pool *grpool.Pool } // NewEventService create func NewEventService() *EventService { return &EventService{pool: grpool.New(5)} } func (e *EventService) Save(data *models.Event) error { return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}) } func (e *EventService) SaveAsync(data *models.Event) error { return e.pool.Add(func() { if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil { return } }) } // TenantService 获取租户厂商 type TenantService struct { } func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) { var arg int var reply []*models.Vendor var result []*ruleEngine.Tenant err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply) if err != nil { return nil, err } for _, vendor := range reply { result = append(result, &ruleEngine.Tenant{ Id: vendor.RecordId, }) } return result, nil } func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) { var reply *models.Vendor err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply) if err != nil { return nil, err } return &ruleEngine.Tenant{ Id: reply.RecordId, Name: reply.VendorName, }, nil } type RuleChainService struct{} func (a *RuleChainService) FindRuleChainById(tenantId, ruleChainId string) (*ruleEngine.RuleChain, error) { var reply *models.RuleChain err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainById", &ruleChainId, &reply) if err != nil { return nil, err } return &ruleEngine.RuleChain{ TenantId: reply.VendorID, Name: reply.Name, FirstNodeId: reply.FirstRuleNodeID, IsRoot: reply.Root, IsDebug: reply.DebugModel, Config: reply.Configuration, ChainId: reply.RecordId, }, nil } func (a *RuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*ruleEngine.RuleNode, error) { var reply *models.RuleNode err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleNodeById", &ruleNodeId, &reply) if err != nil { return nil, err } return &ruleEngine.RuleNode{ RuleChainId: reply.RuleChainID, Type: reply.Type, Name: reply.Name, IsDebug: reply.DebugModel, Config: reply.Configuration, RuleNodeId: reply.RecordId, }, nil } func (a *RuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) ([]*ruleEngine.RuleNode, error) { var reply []*models.RuleNode var result []*ruleEngine.RuleNode err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainNodes", &ruleChainId, &reply) if err != nil { return nil, err } for _, node := range reply { result = append(result, &ruleEngine.RuleNode{ RuleChainId: node.RuleChainID, Type: node.Type, Name: node.Name, IsDebug: node.DebugModel, Config: node.Configuration, RuleNodeId: node.RecordId, }) } return result, nil } func (a *RuleChainService) FindRuleChains(tenantId string) ([]*ruleEngine.RuleChain, error) { var reply []*models.RuleChain var result []*ruleEngine.RuleChain err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChains", &tenantId, &reply) if err != nil { return nil, err } for _, chain := range reply { result = append(result, &ruleEngine.RuleChain{ TenantId: chain.VendorID, Name: chain.Name, FirstNodeId: chain.FirstRuleNodeID, IsRoot: chain.Root, IsDebug: chain.DebugModel, Config: chain.Configuration, ChainId: chain.RecordId, }) } return result, nil } func (a *RuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]*ruleEngine.Relation, error) { var reply []*models.Relation var result []*ruleEngine.Relation err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.GetRuleNodeRelations", &nodeId, &reply) if err != nil { return nil, err } for _, rel := range reply { result = append(result, &ruleEngine.Relation{ From: rel.FromID, To: rel.ToID, Type: rel.RelationType, RelationTypeGroup: 0, }) } return result, nil }