123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package main
- import (
- "context"
- "github.com/gogf/gf/os/grpool"
- "sparrow/pkg/models"
- "sparrow/pkg/protocol"
- "sparrow/pkg/queue"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/ruleEngine"
- "sparrow/pkg/server"
- )
- // ClusterService 集群服务
- type ClusterService struct {
- producer queue.QueueProducer
- }
- func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
- m, err := queue.NewGobQueueMessage(msg)
- if err != nil {
- return
- }
- if err := c.producer.Send(info, m, callback); err != nil {
- server.Log.Error(err)
- }
- }
- // EventService 事件持久化服务
- type EventService struct {
- pool *grpool.Pool
- }
- // NewEventService create
- func NewEventService() *EventService {
- return &EventService{pool: grpool.New(5)}
- }
- func (e *EventService) Save(data *models.Event) error {
- return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
- }
- func (e *EventService) SaveAsync(data *models.Event) error {
- return e.pool.Add(func() {
- if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
- return
- }
- })
- }
- // TenantService 获取租户厂商
- type TenantService struct {
- }
- func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
- var arg int
- var reply []*models.Vendor
- var result []*ruleEngine.Tenant
- err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply)
- if err != nil {
- return nil, err
- }
- for _, vendor := range reply {
- result = append(result, &ruleEngine.Tenant{
- Id: vendor.RecordId,
- })
- }
- return result, nil
- }
- func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
- var reply *models.Vendor
- err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply)
- if err != nil {
- return nil, err
- }
- return &ruleEngine.Tenant{
- Id: reply.RecordId,
- Name: reply.VendorName,
- }, nil
- }
- type RuleChainService struct{}
- func (a *RuleChainService) FindRuleChainById(tenantId, ruleChainId string) (*ruleEngine.RuleChain, error) {
- var reply *models.RuleChain
- err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainById", &ruleChainId, &reply)
- if err != nil {
- return nil, err
- }
- return &ruleEngine.RuleChain{
- TenantId: reply.VendorID,
- Name: reply.Name,
- FirstNodeId: reply.FirstRuleNodeID,
- IsRoot: reply.Root,
- IsDebug: reply.DebugModel,
- Config: reply.Configuration,
- ChainId: reply.RecordId,
- }, nil
- }
- func (a *RuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*ruleEngine.RuleNode, error) {
- var reply *models.RuleNode
- err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleNodeById", &ruleNodeId, &reply)
- if err != nil {
- return nil, err
- }
- return &ruleEngine.RuleNode{
- RuleChainId: reply.RuleChainID,
- Type: reply.Type,
- Name: reply.Name,
- IsDebug: reply.DebugModel,
- Config: reply.Configuration,
- RuleNodeId: reply.RecordId,
- }, nil
- }
- func (a *RuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) ([]*ruleEngine.RuleNode, error) {
- var reply []*models.RuleNode
- var result []*ruleEngine.RuleNode
- err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainNodes", &ruleChainId, &reply)
- if err != nil {
- return nil, err
- }
- for _, node := range reply {
- result = append(result, &ruleEngine.RuleNode{
- RuleChainId: node.RuleChainID,
- Type: node.Type,
- Name: node.Name,
- IsDebug: node.DebugModel,
- Config: node.Configuration,
- RuleNodeId: node.RecordId,
- })
- }
- return result, nil
- }
- func (a *RuleChainService) FindRuleChains(tenantId string) ([]*ruleEngine.RuleChain, error) {
- var reply []*models.RuleChain
- var result []*ruleEngine.RuleChain
- err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChains", &tenantId, &reply)
- if err != nil {
- return nil, err
- }
- for _, chain := range reply {
- result = append(result, &ruleEngine.RuleChain{
- TenantId: chain.VendorID,
- Name: chain.Name,
- FirstNodeId: chain.FirstRuleNodeID,
- IsRoot: chain.Root,
- IsDebug: chain.DebugModel,
- Config: chain.Configuration,
- ChainId: chain.RecordId,
- })
- }
- return result, nil
- }
- func (a *RuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]*ruleEngine.Relation, error) {
- var reply []*models.Relation
- var result []*ruleEngine.Relation
- err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.GetRuleNodeRelations", &nodeId, &reply)
- if err != nil {
- return nil, err
- }
- for _, rel := range reply {
- result = append(result, &ruleEngine.Relation{
- From: rel.FromID,
- To: rel.ToID,
- Type: rel.RelationType,
- RelationTypeGroup: 0,
- })
- }
- return result, nil
- }
|