context.go 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. package ruleEngine
  2. import (
  3. "sparrow/pkg/protocol"
  4. "time"
  5. )
  6. type Context interface {
  7. // 向所有基于success类型的关系节点发消息
  8. TellSuccess(msg *protocol.Message)
  9. // 基于某个关系发消息
  10. TellNext(msg *protocol.Message, relationType protocol.RelationType)
  11. // 向当前节点发消息,duration 为延迟时间
  12. TellSelf(msg *protocol.Message, duration time.Duration)
  13. // 发送错误消息消息
  14. TellError(msg *protocol.Message, err error)
  15. // message ack
  16. Ack(msg *protocol.Message)
  17. // transform a message
  18. TransformMessage(msg *protocol.Message, msgType, originator string, metaData map[string]interface{}, data string) *protocol.Message
  19. }
  20. // DefaultContext 默认的上下文
  21. type DefaultContext struct {
  22. nodeCtx *RuleNodeCtx
  23. mainCtx *SystemContext
  24. }
  25. func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext {
  26. return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
  27. }
  28. func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
  29. d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
  30. }
  31. func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
  32. d.tellNext(msg, []protocol.RelationType{relationType}, nil)
  33. }
  34. func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
  35. if d.nodeCtx.Self.IsDebug {
  36. // TODO: 输出调试日志
  37. }
  38. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  39. d.nodeCtx.ChainActor.Tell(
  40. &RuleNodeToRuleChanTellNextMsg{
  41. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  42. RelationTypes: relationTypes,
  43. Message: msg,
  44. FailureMessage: err,
  45. })
  46. }
  47. func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
  48. if duration > 0 {
  49. time.AfterFunc(duration, func() {
  50. d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg})
  51. })
  52. }
  53. }
  54. func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
  55. if d.nodeCtx.Self.IsDebug {
  56. // TODO: 处理调试
  57. }
  58. d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
  59. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  60. RelationTypes: []protocol.RelationType{protocol.Failure},
  61. Message: msg,
  62. FailureMessage: err,
  63. })
  64. }
  65. func (d *DefaultContext) Ack(msg *protocol.Message) {
  66. if d.nodeCtx.Self.IsDebug {
  67. // TODO: 处理调试
  68. }
  69. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  70. msg.GetCallBack().OnSuccess()
  71. }
  72. func (d *DefaultContext) TransformMessage(msg *protocol.Message, msgType, originator string,
  73. metaData map[string]interface{}, data string) *protocol.Message {
  74. return &protocol.Message{
  75. QueueName: msg.QueueName,
  76. Id: msg.Id,
  77. Ts: msg.Ts,
  78. Type: msgType,
  79. Data: data,
  80. RuleChanId: msg.RuleChanId,
  81. RuleNodeId: msg.RuleNodeId,
  82. Callback: msg.Callback,
  83. MetaData: metaData,
  84. Originator: originator,
  85. }
  86. }