context.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. package ruleEngine
  2. import (
  3. "sparrow/pkg/protocol"
  4. "time"
  5. )
  6. type Context interface {
  7. // 向所有基于success类型的关系节点发消息
  8. TellSuccess(msg *protocol.Message)
  9. // 基于某个关系发消息
  10. TellNext(msg *protocol.Message, relationType protocol.RelationType)
  11. // 向当前节点发消息,duration 为延迟时间
  12. TellSelf(msg *protocol.Message, duration time.Duration)
  13. // 发送错误消息消息
  14. TellError(msg *protocol.Message, err error)
  15. // message ack
  16. Ack(msg *protocol.Message)
  17. }
  18. // DefaultContext 默认的上下文
  19. type DefaultContext struct {
  20. nodeCtx *RuleNodeCtx
  21. mainCtx *SystemContext
  22. }
  23. func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *SystemContext) *DefaultContext {
  24. return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
  25. }
  26. func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
  27. d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
  28. }
  29. func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
  30. d.tellNext(msg, []protocol.RelationType{relationType}, nil)
  31. }
  32. func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
  33. if d.nodeCtx.Self.IsDebug {
  34. // TODO: 输出调试日志
  35. }
  36. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  37. d.nodeCtx.ChainActor.Tell(
  38. &RuleNodeToRuleChanTellNextMsg{
  39. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  40. RelationTypes: relationTypes,
  41. Message: msg,
  42. FailureMessage: err,
  43. })
  44. }
  45. func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
  46. if duration > 0 {
  47. time.AfterFunc(duration, func() {
  48. d.nodeCtx.SelfActor.Tell(&RuleToSelfMsg{Message: msg})
  49. })
  50. }
  51. }
  52. func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
  53. if d.nodeCtx.Self.IsDebug {
  54. // TODO: 处理调试
  55. }
  56. d.nodeCtx.ChainActor.Tell(&RuleNodeToRuleChanTellNextMsg{
  57. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  58. RelationTypes: []protocol.RelationType{protocol.Failure},
  59. Message: msg,
  60. FailureMessage: err,
  61. })
  62. }
  63. func (d *DefaultContext) Ack(msg *protocol.Message) {
  64. if d.nodeCtx.Self.IsDebug {
  65. // TODO: 处理调试
  66. }
  67. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  68. msg.GetCallBack().OnSuccess()
  69. }