service.go 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. package main
  2. import (
  3. "github.com/gogf/gf/os/grpool"
  4. "sparrow/pkg/models"
  5. "sparrow/pkg/protocol"
  6. "sparrow/pkg/queue"
  7. "sparrow/pkg/rpcs"
  8. "sparrow/pkg/server"
  9. )
  10. // ClusterService 集群服务
  11. type ClusterService struct {
  12. producer queue.QueueProducer
  13. }
  14. func (c *ClusterService) PushMessageToRuleEngine(info *queue.TopicPartitionInfo, msgId string, msg *protocol.Message, callback queue.Callback) {
  15. m, err := queue.NewGobQueueMessage(msg)
  16. if err != nil {
  17. return
  18. }
  19. if err := c.producer.Send(info, m, callback); err != nil {
  20. server.Log.Error(err)
  21. }
  22. }
  23. // EventService 事件持久化服务
  24. type EventService struct {
  25. pool *grpool.Pool
  26. }
  27. // NewEventService create
  28. func NewEventService() *EventService {
  29. return &EventService{pool: grpool.New(5)}
  30. }
  31. func (e *EventService) Save(data *models.Event) error {
  32. return server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{})
  33. }
  34. func (e *EventService) SaveAsync(data *models.Event) error {
  35. return e.pool.Add(func() {
  36. if err := server.RPCCallByName(nil, "registry", "Registry.CreateEvent", data, &rpcs.ReplyEmptyResult{}); err != nil {
  37. return
  38. }
  39. })
  40. }
  41. //type TenantService struct {
  42. //
  43. //}
  44. //
  45. //func (t *TenantService) FindTenants() ([]*ruleEngine.Tenant, error) {
  46. // var arg int
  47. // var reply []*models.Vendor
  48. // var result []*ruleEngine.Tenant
  49. // err :=server.RPCCallByName(context.Background(), "registry", "registry.GetVendors", &arg, &reply)
  50. // if err != nil {
  51. // return nil, err
  52. // }
  53. // for _, vendor := range reply {
  54. // result = append(result, &ruleEngine.Tenant{
  55. // Id: vendor.RecordId,
  56. // })
  57. // }
  58. // return result, nil
  59. //}
  60. //
  61. //func (t *TenantService) GetTenant(tId string) (*ruleEngine.Tenant, error) {
  62. // var reply *models.Vendor
  63. // err := server.
  64. //}