service.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package main
  2. import (
  3. "context"
  4. "github.com/gogf/gf/os/grpool"
  5. "sparrow/pkg/models"
  6. "sparrow/pkg/protocol"
  7. "sparrow/pkg/queue"
  8. "sparrow/pkg/rpcs"
  9. "sparrow/pkg/ruleEngine"
  10. "sparrow/pkg/server"
  11. )
  12. // ClusterService 集群服务
  13. type ClusterService struct {
  14. producer queue.QueueProducer
  15. }
  16. func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
  17. m, err := queue.NewGobQueueMessage(msg)
  18. if err != nil {
  19. return
  20. }
  21. if err := c.producer.Send(info, m, callback); err != nil {
  22. server.Log.Error(err)
  23. }
  24. }
  25. // EventService 事件持久化服务
  26. type EventService struct {
  27. pool *grpool.Pool
  28. }
  29. // NewEventService create
  30. func NewEventService() *EventService {
  31. return &EventService{pool: grpool.New(5)}
  32. }
  33. func (e *EventService) Save(data *models.Event) error {
  34. return server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
  35. }
  36. func (e *EventService) SaveAsync(data *models.Event) error {
  37. return e.pool.Add(func() {
  38. if err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
  39. return
  40. }
  41. })
  42. }
  43. // TODO:完成这里
  44. type TenantService struct {
  45. }
  46. func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
  47. var arg int
  48. var reply []*models.Vendor
  49. var result []*ruleEngine.Tenant
  50. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.GetVendors", &arg, &reply)
  51. if err != nil {
  52. return nil, err
  53. }
  54. for _, vendor := range reply {
  55. result = append(result, &ruleEngine.Tenant{
  56. Id: vendor.RecordId,
  57. })
  58. }
  59. return result, nil
  60. }
  61. func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
  62. var reply *models.Vendor
  63. err := server.RPCCallByName(context.Background(), rpcs.RegistryServerName, "Registry.FindVendor", &tId, &reply)
  64. if err != nil {
  65. return nil, err
  66. }
  67. return &ruleEngine.Tenant{
  68. Id: reply.RecordId,
  69. Name: reply.VendorName,
  70. }, nil
  71. }