12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package ruleEngine
- import (
- "sparrow/pkg/actor"
- "sparrow/pkg/protocol"
- "time"
- )
- type Context interface {
- // 向所有基于success类型的关系节点发消息
- TellSuccess(msg *protocol.Message)
- // 基于某个关系发消息
- TellNext(msg *protocol.Message, relationType protocol.RelationType)
- // 向当前节点发消息,duration 为延迟时间
- TellSelf(msg *protocol.Message, duration time.Duration)
- // 发送错误消息消息
- TellError(msg *protocol.Message, err error)
- // message ack
- Ack(msg *protocol.Message)
- }
- // DefaultContext 默认的上下文
- type DefaultContext struct {
- nodeCtx *RuleNodeCtx
- mainCtx *actor.SystemContext
- }
- func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *actor.SystemContext) *DefaultContext {
- return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
- }
- func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
- d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
- }
- func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
- d.tellNext(msg, []protocol.RelationType{relationType}, nil)
- }
- func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
- if d.nodeCtx.Self.IsDebug {
- // TODO: 输出调试日志
- }
- msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
- d.nodeCtx.ChainActor.Tell(
- &actor.RuleNodeToRuleChanTellNextMsg{
- RuleNodeId: d.nodeCtx.Self.RuleNodeId,
- RelationTypes: relationTypes,
- Message: msg,
- FailureMessage: err,
- })
- }
- func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
- if duration > 0 {
- time.AfterFunc(duration, func() {
- d.nodeCtx.SelfActor.Tell(&actor.RuleToSelfMsg{Message: msg})
- })
- }
- }
- func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
- if d.nodeCtx.Self.IsDebug {
- // TODO: 处理调试
- }
- d.nodeCtx.ChainActor.Tell(&actor.RuleNodeToRuleChanTellNextMsg{
- RuleNodeId: d.nodeCtx.Self.RuleNodeId,
- RelationTypes: []protocol.RelationType{protocol.Failure},
- Message: msg,
- FailureMessage: err,
- })
- }
- func (d *DefaultContext) Ack(msg *protocol.Message) {
- if d.nodeCtx.Self.IsDebug {
- // TODO: 处理调试
- }
- msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
- msg.GetCallBack().OnSuccess()
- }
|