context.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package ruleEngine
  2. import (
  3. "sparrow/pkg/entities"
  4. "sparrow/pkg/protocol"
  5. "sparrow/pkg/server"
  6. "time"
  7. )
  8. type Context interface {
  9. // 向所有基于success类型的关系节点发消息
  10. TellSuccess(msg *protocol.Message)
  11. // 基于某个关系发消息
  12. TellNext(msg *protocol.Message, relationType protocol.RelationType)
  13. // 向当前节点发消息,duration 为延迟时间
  14. TellSelf(msg *protocol.Message, duration time.Duration)
  15. // 发送错误消息消息
  16. TellError(msg *protocol.Message, err error)
  17. // message ack
  18. Ack(msg *protocol.Message)
  19. // transform a message
  20. TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
  21. }
  22. // DefaultContext 默认的上下文
  23. type DefaultContext struct {
  24. nodeCtx *RuleNodeCtx
  25. mainCtx *SystemContext
  26. }
  27. func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext {
  28. return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
  29. }
  30. func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
  31. d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
  32. }
  33. func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
  34. d.tellNext(msg, []protocol.RelationType{relationType}, nil)
  35. }
  36. func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
  37. if d.nodeCtx.Self.IsDebug {
  38. for _, item := range relationTypes {
  39. if Err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  40. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  41. msg,
  42. item.String(),
  43. err); Err != nil {
  44. continue
  45. }
  46. }
  47. }
  48. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  49. d.nodeCtx.ChainActor.Tell(
  50. &RuleNodeToRuleChanTellNextMsg{
  51. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  52. RelationTypes: relationTypes,
  53. Message: msg,
  54. FailureMessage: err,
  55. })
  56. }
  57. func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
  58. if duration > 0 {
  59. time.AfterFunc(duration, func() {
  60. d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg})
  61. })
  62. }
  63. }
  64. func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
  65. if d.nodeCtx.Self.IsDebug {
  66. if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  67. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  68. msg,
  69. protocol.Failure.String(),
  70. err,
  71. ); err != nil {
  72. server.Log.WithField("method", "DefaultContext.TellError").
  73. Error(err)
  74. }
  75. }
  76. d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
  77. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  78. RelationTypes: []protocol.RelationType{protocol.Failure},
  79. Message: msg,
  80. FailureMessage: err,
  81. })
  82. }
  83. func (d *DefaultContext) Ack(msg *protocol.Message) {
  84. if d.nodeCtx.Self.IsDebug {
  85. if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  86. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  87. msg,
  88. "ACK",
  89. nil,
  90. ); err != nil {
  91. server.Log.WithField("method", "DefaultContext.Ack").
  92. Error(err)
  93. }
  94. }
  95. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  96. msg.GetCallBack().OnSuccess()
  97. }
  98. func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
  99. metaData map[string]interface{}, data string) *protocol.Message {
  100. return &protocol.Message{
  101. QueueName: msg.QueueName,
  102. Id: msg.Id,
  103. Ts: msg.Ts,
  104. Type: msgType,
  105. Data: data,
  106. RuleChanId: msg.RuleChanId,
  107. RuleNodeId: msg.RuleNodeId,
  108. Callback: msg.Callback,
  109. MetaData: metaData,
  110. Originator: originator,
  111. }
  112. }