package ruleEngine import ( "sparrow/pkg/entities" "sparrow/pkg/protocol" "sparrow/pkg/server" "time" ) type Context interface { // 向所有基于success类型的关系节点发消息 TellSuccess(msg *protocol.Message) // 基于某个关系发消息 TellNext(msg *protocol.Message, relationType protocol.RelationType) // 向当前节点发消息,duration 为延迟时间 TellSelf(msg *protocol.Message, duration time.Duration) // 发送错误消息消息 TellError(msg *protocol.Message, err error) // message ack Ack(msg *protocol.Message) // transform a message TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message GetRuleNodeCtx() *RuleNodeCtx } // DefaultContext 默认的上下文 type DefaultContext struct { nodeCtx *RuleNodeCtx mainCtx *SystemContext } func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext { return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx} } func (d *DefaultContext) GetRuleNodeCtx() *RuleNodeCtx { return d.nodeCtx } func (d *DefaultContext) TellSuccess(msg *protocol.Message) { d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil) } func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) { d.tellNext(msg, []protocol.RelationType{relationType}, nil) } func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) { if d.nodeCtx.Self.IsDebug { for _, item := range relationTypes { if Err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId, &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId}, msg, item.String(), err); Err != nil { continue } } } msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId) d.nodeCtx.ChainActor.Tell( &RuleNodeToRuleChanTellNextMsg{ RuleNodeId: d.nodeCtx.Self.RuleNodeId, RelationTypes: relationTypes, Message: msg, FailureMessage: err, }) } func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) { if duration > 0 { time.AfterFunc(duration, func() { d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg}) }) } } func (d *DefaultContext) TellError(msg *protocol.Message, err error) { if d.nodeCtx.Self.IsDebug { if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId, &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId}, msg, protocol.Failure.String(), err, ); err != nil { server.Log.WithField("method", "DefaultContext.TellError"). Error(err) } } d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{ RuleNodeId: d.nodeCtx.Self.RuleNodeId, RelationTypes: []protocol.RelationType{protocol.Failure}, Message: msg, FailureMessage: err, }) } func (d *DefaultContext) Ack(msg *protocol.Message) { if d.nodeCtx.Self.IsDebug { if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId, &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId}, msg, "ACK", nil, ); err != nil { server.Log.WithField("method", "DefaultContext.Ack"). Error(err) } } msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId) msg.GetCallBack().OnSuccess() } func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message { return &protocol.Message{ QueueName: msg.QueueName, Id: msg.Id, Ts: msg.Ts, Type: msgType, Data: data, RuleChanId: msg.RuleChanId, RuleNodeId: msg.RuleNodeId, Callback: msg.Callback, MetaData: metaData, Originator: originator, } }