service.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/os/grpool"
  5. "sparrow/pkg/models"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/ruleEngine"
  10. "sparrow/pkg/server"
  11. )
  12. // ClusterService 集群服务
  13. type ClusterService struct {
  14. producer queue.QueueProducer
  15. }
  16. func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
  17. m, err := queue.NewGobQueueMessage(msg)
  18. if err != nil {
  19. return
  20. }
  21. if err := c.producer.Send(info, m, callback); err != nil {
  22. server.Log.Error(err)
  23. }
  24. }
  25. // EventService 事件持久化服务
  26. type EventService struct {
  27. pool *grpool.Pool
  28. }
  29. // NewEventService create
  30. func NewEventService() *EventService {
  31. return &EventService{pool: grpool.New(5)}
  32. }
  33. func (e *EventService) Save(data *models.Event) error {
  34. return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
  35. }
  36. func (e *EventService) SaveAsync(data *models.Event) error {
  37. return e.pool.Add(func() {
  38. if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
  39. return
  40. }
  41. })
  42. }
  43. // TenantService 获取租户厂商
  44. type TenantService struct {
  45. }
  46. func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
  47. var arg int
  48. var reply []*models.Vendor
  49. var result []*ruleEngine.Tenant
  50. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply)
  51. if err != nil {
  52. return nil, err
  53. }
  54. for _, vendor := range reply {
  55. result = append(result, &ruleEngine.Tenant{
  56. Id: vendor.RecordId,
  57. })
  58. }
  59. return result, nil
  60. }
  61. func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
  62. var reply *models.Vendor
  63. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply)
  64. if err != nil {
  65. return nil, err
  66. }
  67. return &ruleEngine.Tenant{
  68. Id: reply.RecordId,
  69. Name: reply.VendorName,
  70. }, nil
  71. }
  72. type RuleChainService struct{}
  73. func (a *RuleChainService) FindRuleChainById(tenantId, ruleChainId string) (*ruleEngine.RuleChain, error) {
  74. var reply *models.RuleChain
  75. err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainById", &ruleChainId, &reply)
  76. if err != nil {
  77. return nil, err
  78. }
  79. return &ruleEngine.RuleChain{
  80. TenantId: reply.VendorID,
  81. Name: reply.Name,
  82. FirstNodeId: reply.FirstRuleNodeID,
  83. IsRoot: reply.Root,
  84. IsDebug: reply.DebugModel,
  85. Config: reply.Configuration,
  86. ChainId: reply.RecordId,
  87. }, nil
  88. }
  89. func (a *RuleChainService) FindRuleNodeById(tenantId, ruleNodeId string) (*ruleEngine.RuleNode, error) {
  90. var reply *models.RuleNode
  91. err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleNodeById", &ruleNodeId, &reply)
  92. if err != nil {
  93. return nil, err
  94. }
  95. return &ruleEngine.RuleNode{
  96. RuleChainId: reply.RuleChainID,
  97. Type: reply.Type,
  98. Name: reply.Name,
  99. IsDebug: reply.DebugModel,
  100. Config: reply.Configuration,
  101. RuleNodeId: reply.RecordId,
  102. }, nil
  103. }
  104. func (a *RuleChainService) GetRuleChainNodes(tenantId, ruleChainId string) ([]*ruleEngine.RuleNode, error) {
  105. var reply []*models.RuleNode
  106. var result []*ruleEngine.RuleNode
  107. err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChainNodes", &ruleChainId, &reply)
  108. if err != nil {
  109. return nil, err
  110. }
  111. for _, node := range reply {
  112. result = append(result, &ruleEngine.RuleNode{
  113. RuleChainId: node.RuleChainID,
  114. Type: node.Type,
  115. Name: node.Name,
  116. IsDebug: node.DebugModel,
  117. Config: node.Configuration,
  118. RuleNodeId: node.RecordId,
  119. })
  120. }
  121. return result, nil
  122. }
  123. func (a *RuleChainService) FindRuleChains(tenantId string) ([]*ruleEngine.RuleChain, error) {
  124. var reply []*models.RuleChain
  125. var result []*ruleEngine.RuleChain
  126. err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.FindRuleChains", &tenantId, &reply)
  127. if err != nil {
  128. return nil, err
  129. }
  130. for _, chain := range reply {
  131. result = append(result, &ruleEngine.RuleChain{
  132. TenantId: chain.VendorID,
  133. Name: chain.Name,
  134. FirstNodeId: chain.FirstRuleNodeID,
  135. IsRoot: chain.Root,
  136. IsDebug: chain.DebugModel,
  137. Config: chain.Configuration,
  138. ChainId: chain.RecordId,
  139. })
  140. }
  141. return result, nil
  142. }
  143. func (a *RuleChainService) GetRuleNodeRelations(tenantId, nodeId string) ([]*ruleEngine.Relation, error) {
  144. var reply []*models.Relation
  145. var result []*ruleEngine.Relation
  146. err := server.RPCCallByName(context.TODO(), rpcs.RegistryServerName, "Registry.GetRuleNodeRelations", &nodeId, &reply)
  147. if err != nil {
  148. return nil, err
  149. }
  150. for _, rel := range reply {
  151. result = append(result, &ruleEngine.Relation{
  152. From: rel.FromID,
  153. To: rel.ToID,
  154. Type: rel.RelationType,
  155. RelationTypeGroup: 0,
  156. })
  157. }
  158. return result, nil
  159. }