context.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package ruleEngine
  2. import (
  3. "sparrow/pkg/entities"
  4. "sparrow/pkg/protocol"
  5. "sparrow/pkg/server"
  6. "time"
  7. )
  8. type Context interface {
  9. // 向所有基于success类型的关系节点发消息
  10. TellSuccess(msg *protocol.Message)
  11. // 基于某个关系发消息
  12. TellNext(msg *protocol.Message, relationType protocol.RelationType)
  13. // 向当前节点发消息,duration 为延迟时间
  14. TellSelf(msg *protocol.Message, duration time.Duration)
  15. // 发送错误消息消息
  16. TellError(msg *protocol.Message, err error)
  17. // message ack
  18. Ack(msg *protocol.Message)
  19. // transform a message
  20. TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
  21. GetRuleNodeCtx() *RuleNodeCtx
  22. }
  23. // DefaultContext 默认的上下文
  24. type DefaultContext struct {
  25. nodeCtx *RuleNodeCtx
  26. mainCtx *SystemContext
  27. }
  28. func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext {
  29. return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
  30. }
  31. func (d *DefaultContext) GetRuleNodeCtx() *RuleNodeCtx {
  32. return d.nodeCtx
  33. }
  34. func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
  35. d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
  36. }
  37. func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
  38. d.tellNext(msg, []protocol.RelationType{relationType}, nil)
  39. }
  40. func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
  41. if d.nodeCtx.Self.IsDebug {
  42. for _, item := range relationTypes {
  43. if Err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  44. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  45. msg,
  46. item.String(),
  47. err); Err != nil {
  48. continue
  49. }
  50. }
  51. }
  52. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  53. d.nodeCtx.ChainActor.Tell(
  54. &RuleNodeToRuleChanTellNextMsg{
  55. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  56. RelationTypes: relationTypes,
  57. Message: msg,
  58. FailureMessage: err,
  59. })
  60. }
  61. func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
  62. if duration > 0 {
  63. time.AfterFunc(duration, func() {
  64. d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg})
  65. })
  66. }
  67. }
  68. func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
  69. if d.nodeCtx.Self.IsDebug {
  70. if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  71. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  72. msg,
  73. protocol.Failure.String(),
  74. err,
  75. ); err != nil {
  76. server.Log.WithField("method", "DefaultContext.TellError").
  77. Error(err)
  78. }
  79. }
  80. d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
  81. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  82. RelationTypes: []protocol.RelationType{protocol.Failure},
  83. Message: msg,
  84. FailureMessage: err,
  85. })
  86. }
  87. func (d *DefaultContext) Ack(msg *protocol.Message) {
  88. if d.nodeCtx.Self.IsDebug {
  89. if err := d.mainCtx.PersistDebugOutput(d.nodeCtx.TenantId,
  90. &entities.RuleNodeId{Id: d.nodeCtx.Self.RuleNodeId},
  91. msg,
  92. "ACK",
  93. nil,
  94. ); err != nil {
  95. server.Log.WithField("method", "DefaultContext.Ack").
  96. Error(err)
  97. }
  98. }
  99. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  100. msg.GetCallBack().OnSuccess()
  101. }
  102. func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
  103. metaData map[string]interface{}, data string) *protocol.Message {
  104. return &protocol.Message{
  105. QueueName: msg.QueueName,
  106. Id: msg.Id,
  107. Ts: msg.Ts,
  108. Type: msgType,
  109. Data: data,
  110. RuleChanId: msg.RuleChanId,
  111. RuleNodeId: msg.RuleNodeId,
  112. Callback: msg.Callback,
  113. MetaData: metaData,
  114. Originator: originator,
  115. }
  116. }