123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- package ruleEngine
- import (
- "sparrow/pkg/entities"
- "sparrow/pkg/protocol"
- "sparrow/pkg/server"
- "time"
- )
- type Context interface {
- // 向所有基于success类型的关系节点发消息
- TellSuccess(msg *protocol.Message)
- // 基于某个关系发消息
- TellNext(msg *protocol.Message, relationType protocol.RelationType)
- // 向当前节点发消息,duration 为延迟时间
- TellSelf(msg *protocol.Message, duration time.Duration)
- // 发送错误消息消息
- TellError(msg *protocol.Message, err error)
- // message ack
- Ack(msg *protocol.Message)
- // transform a message
- TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
- }
- // DefaultContext 默认的上下文
- type DefaultContext struct {
- nodeCtx *RuleNodeCtx
- mainCtx *SystemContext
- }
- func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext {
- return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
- }
- func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
- d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
- }
- func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
- d.tellNext(msg, []protocol.RelationType{relationType}, nil)
- }
- func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
- if d.nodeCtx.Self.IsDebug {
- for _, item := range relationTypes {
- if Err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
- &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
- msg,
- item.String(),
- err); Err != nil {
- continue
- }
- }
- }
- msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
- d.nodeCtx.ChainActor.Tell(
- &RuleNodeToRuleChanTellNextMsg{
- RuleNodeId: d.nodeCtx.Self.RuleNodeId,
- RelationTypes: relationTypes,
- Message: msg,
- FailureMessage: err,
- })
- }
- func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
- if duration > 0 {
- time.AfterFunc(duration, func() {
- d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg})
- })
- }
- }
- func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
- if d.nodeCtx.Self.IsDebug {
- if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
- &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
- msg,
- protocol.Failure.String(),
- err,
- ); err != nil {
- server.Log.WithField("method", "DefaultContext.TellError").
- Error(err)
- }
- }
- d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
- RuleNodeId: d.nodeCtx.Self.RuleNodeId,
- RelationTypes: []protocol.RelationType{protocol.Failure},
- Message: msg,
- FailureMessage: err,
- })
- }
- func (d *DefaultContext) Ack(msg *protocol.Message) {
- if d.nodeCtx.Self.IsDebug {
- if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
- &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
- msg,
- "ACK",
- nil,
- ); err != nil {
- server.Log.WithField("method", "DefaultContext.Ack").
- Error(err)
- }
- }
- msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
- msg.GetCallBack().OnSuccess()
- }
- func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
- metaData map[string]interface{}, data string) *protocol.Message {
- return &protocol.Message{
- QueueName: msg.QueueName,
- Id: msg.Id,
- Ts: msg.Ts,
- Type: msgType,
- Data: data,
- RuleChanId: msg.RuleChanId,
- RuleNodeId: msg.RuleNodeId,
- Callback: msg.Callback,
- MetaData: metaData,
- Originator: originator,
- }
- }
|