package ruleEngine import ( "sparrow/pkg/protocol" "time" ) type Context interface { // 向所有基于success类型的关系节点发消息 TellSuccess(msg *protocol.Message) // 基于某个关系发消息 TellNext(msg *protocol.Message, relationType protocol.RelationType) // 向当前节点发消息,duration 为延迟时间 TellSelf(msg *protocol.Message, duration time.Duration) // 发送错误消息消息 TellError(msg *protocol.Message, err error) // message ack Ack(msg *protocol.Message) // transform a message TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message } // DefaultContext 默认的上下文 type DefaultContext struct { nodeCtx *RuleNodeCtx mainCtx *SystemContext } func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext { return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx} } func (d *DefaultContext) TellSuccess(msg *protocol.Message) { d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil) } func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) { d.tellNext(msg, []protocol.RelationType{relationType}, nil) } func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) { if d.nodeCtx.Self.IsDebug { // TODO: 输出调试日志 } msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId) d.nodeCtx.ChainActor.Tell( &RuleNodeToRuleChanTellNextMsg{ RuleNodeId: d.nodeCtx.Self.RuleNodeId, RelationTypes: relationTypes, Message: msg, FailureMessage: err, }) } func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) { if duration > 0 { time.AfterFunc(duration, func() { d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg}) }) } } func (d *DefaultContext) TellError(msg *protocol.Message, err error) { if d.nodeCtx.Self.IsDebug { // TODO: 处理调试 } d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{ RuleNodeId: d.nodeCtx.Self.RuleNodeId, RelationTypes: []protocol.RelationType{protocol.Failure}, Message: msg, FailureMessage: err, }) } func (d *DefaultContext) Ack(msg *protocol.Message) { if d.nodeCtx.Self.IsDebug { // TODO: 处理调试 } msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId) msg.GetCallBack().OnSuccess() } func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message { return &protocol.Message{ QueueName: msg.QueueName, Id: msg.Id, Ts: msg.Ts, Type: msgType, Data: data, RuleChanId: msg.RuleChanId, RuleNodeId: msg.RuleNodeId, Callback: msg.Callback, MetaData: metaData, Originator: originator, } }