package ruleEngine import ( "encoding/json" "errors" "fmt" "github.com/gogf/gf/os/grpool" "github.com/gogf/gf/util/guid" "golang.org/x/time/rate" "sparrow/pkg/entities" "sparrow/pkg/models" "sparrow/pkg/protocol" "sparrow/pkg/server" "sync" "time" ) // SystemContext actor system context, with some func type SystemContext struct { ActorSystem System AppActor Ref ClusterService ClusterService RuleChainService RuleChainService TenantService TenantService EventService EventService // 调试信息的限流器 debugPerTenantLimits map[string]*rate.Limiter } type SystemContextServiceConfig struct { ClusterService ClusterService RuleChainService RuleChainService TenantService TenantService EventService EventService } func NewSystemContext(sys System, config SystemContextServiceConfig) *SystemContext { if config.TenantService == nil || config.RuleChainService == nil || config.ClusterService == nil || config.EventService == nil { panic("RuleEngine init error: services is not set") } return &SystemContext{ ActorSystem: sys, ClusterService: config.ClusterService, RuleChainService: config.RuleChainService, TenantService: config.TenantService, debugPerTenantLimits: make(map[string]*rate.Limiter), EventService: config.EventService, } } // PersistDebugInput 保存输入调试信息 func (s *SystemContext) PersistDebugInput(tenantId string, entityId entities.EntityId, msg *protocol.Message, relationType string, err error) error { return s.persistDebugAsync(tenantId, entityId, "IN", msg, relationType, err) } // PersistDebugOutput 保存输出调试信息 func (s *SystemContext) PersistDebugOutput(tenantId string, entityId entities.EntityId, msg *protocol.Message, relationType string, err error) error { return s.persistDebugAsync(tenantId, entityId, "OUT", msg, relationType, err) } func (s *SystemContext) persistDebugAsync(tenantId string, id entities.EntityId, eType string, msg *protocol.Message, rType string, errInfo error) error { var limiter *rate.Limiter if v, ok := s.debugPerTenantLimits[tenantId]; !ok { limiter = rate.NewLimiter(10, 200) s.debugPerTenantLimits[tenantId] = limiter } else { limiter = v } if limiter.AllowN(time.Now(), 200) { var errStr string if v, ok := msg.MetaData["error"]; ok { errStr = v.(string) } if errInfo != nil { errStr = errInfo.Error() } buf, err := json.Marshal(msg.MetaData) if err != nil { server.Log.WithField("method", "persistDebugAsync").Error(err) } if s.EventService != nil { if err := s.EventService.SaveAsync(&models.Event{ RecordId: guid.S(), ServerId: server.InternalIP, EventType: eType, EntityType: id.GetEntityType().String(), EntityId: id.GetId(), MessageId: msg.Id, RelationType: rType, Data: msg.Data, MetaData: string(buf), Error: errStr, }); err != nil { return err } } } return nil } func (s *SystemContext) Tell(msg protocol.ActorMsg) { s.AppActor.Tell(msg) } func (s *SystemContext) TellWithHighPriority(msg protocol.ActorMsg) { s.AppActor.TellWithHighPriority(msg) } // System actor system interface type System interface { // CreateDispatcher 创建分发器 CreateDispatcher(name string, dispatcher IDispatcher) error // DestroyDispatcher 销毁分发器 DestroyDispatcher(name string) error // GetActor 获取一个actor ref GetActor(actorId string) Ref // CreateRootActor create root actor CreateRootActor(dispatcherName string, creator Creator) (Ref, error) // CreateChildActor create child actor by parent actor CreateChildActor(dispatcherName string, creator Creator, parentId string) (Ref, error) // Tell tell actor a message Tell(actorId string, msg protocol.ActorMsg) error // TellWithHighPriority tell actor message with high priority TellWithHighPriority(actorId string, msg protocol.ActorMsg) error // StopActorById stop actor by actor id StopActorById(actorId string) error // StopActorByRef stop actor by actor ref StopActorByRef(ref Ref) error // BroadcastToChildren broadcast message to children BroadcastToChildren(parentActorId string, msg protocol.ActorMsg) error } // DefaultActorSystem a default actor system implements System interface type DefaultActorSystem struct { dispatchers map[string]IDispatcher actors map[string]*MailBox parentActors map[string][]string scheduler *grpool.Pool config *DefaultActorSystemConfig mu sync.Mutex } // DefaultActorSystemConfig system config type DefaultActorSystemConfig struct { SchedulerPoolSize int // 系统调度执行池大小 AppDispatcherPoolSize int // 应用调度执行池大小 TenantDispatcherPoolSize int // 租户执行池大小 RuleEngineDispatcherPoolSize int // 规则引擎执行池大小 } func NewDefaultActorSystem(config *DefaultActorSystemConfig) *DefaultActorSystem { return &DefaultActorSystem{ dispatchers: make(map[string]IDispatcher), parentActors: make(map[string][]string), // scheduler: grpool.New(config.SchedulerPoolSize), config: config, actors: make(map[string]*MailBox), } } func (d *DefaultActorSystem) CreateDispatcher(name string, dispatcher IDispatcher) error { if _, ok := d.dispatchers[name]; ok { return errors.New(fmt.Sprintf("dispatcher name :%s is already registered!", name)) } d.dispatchers[name] = dispatcher return nil } func (d *DefaultActorSystem) DestroyDispatcher(name string) error { if _, ok := d.dispatchers[name]; !ok { return errors.New(fmt.Sprintf("dispatcher %s is not registered!", name)) } if err := d.dispatchers[name].Destroy(); err != nil { return err } delete(d.dispatchers, name) return nil } func (d *DefaultActorSystem) GetActor(actorId string) Ref { if ref, ok := d.actors[actorId]; !ok { return nil } else { return ref } } func (d *DefaultActorSystem) CreateRootActor(dispatcherName string, creator Creator) (Ref, error) { return d.creator(dispatcherName, creator, "") } func (d *DefaultActorSystem) Tell(actorId string, msg protocol.ActorMsg) error { return d.tell(actorId, msg, false) } func (d *DefaultActorSystem) TellWithHighPriority(actorId string, msg protocol.ActorMsg) error { return d.tell(actorId, msg, true) } func (d *DefaultActorSystem) tell(actorId string, msg protocol.ActorMsg, isHighPriority bool) error { if mailBox, ok := d.actors[actorId]; ok { if isHighPriority { mailBox.TellWithHighPriority(msg) } else { mailBox.Tell(msg) } } else { return errors.New(fmt.Sprintf("actor with id %s is not registered!", actorId)) } return nil } func (d *DefaultActorSystem) stop(actorId string) error { children := d.parentActors[actorId] if len(children) > 0 { for _, child := range children { _ = d.stop(child) } } if m, found := d.actors[actorId]; found { return m.destroy() } return nil } func (d *DefaultActorSystem) StopActorById(actorId string) error { return d.stop(actorId) } func (d *DefaultActorSystem) StopActorByRef(ref Ref) error { return d.stop(ref.GetActorId()) } func (d *DefaultActorSystem) BroadcastToChildren(parentActorId string, msg protocol.ActorMsg) error { children := d.parentActors[parentActorId] for _, item := range children { if err := d.Tell(item, msg); err != nil { return err } } return nil } func (d *DefaultActorSystem) CreateChildActor(dispatcherName string, creator Creator, parentId string) (Ref, error) { return d.creator(dispatcherName, creator, parentId) } func (d *DefaultActorSystem) creator(name string, creator Creator, parentId string) (Ref, error) { d.mu.Lock() defer d.mu.Unlock() dispatcher := d.dispatchers[name] if dispatcher == nil { return nil, errors.New(fmt.Sprintf("dispatcher %s not found!", name)) } actorId := creator.CreateActorId() var actorMailBox *MailBox actorMailBox = d.actors[actorId] if actorMailBox != nil { server.Log.Debugf("actor with id :%s is already registered!", actorId) } else { server.Log.Debugf("creating actor with id :%s", actorId) actor := creator.CreateActor() var parentActor Ref if parentId != "" { parentActor = d.GetActor(parentId) if parentActor == nil { return nil, errors.New(fmt.Sprintf("Parent actor with id :%s is not registered!", parentId)) } } mailBox := NewMailBox(d, actorId, parentActor, actor, dispatcher) d.actors[actorId] = mailBox if err := mailBox.Init(); err != nil { server.Log.Error(err) return nil, err } actorMailBox = mailBox if parentActor != nil { d.parentActors[parentId] = append(d.parentActors[parentId], actorId) } } return actorMailBox, nil }