context.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package ruleEngine
  2. import (
  3. "sparrow/pkg/actor"
  4. "sparrow/pkg/protocol"
  5. "time"
  6. )
  7. type Context interface {
  8. // 向所有基于success类型的关系节点发消息
  9. TellSuccess(msg *protocol.Message)
  10. // 基于某个关系发消息
  11. TellNext(msg *protocol.Message, relationType protocol.RelationType)
  12. // 向当前节点发消息,duration 为延迟时间
  13. TellSelf(msg *protocol.Message, duration time.Duration)
  14. // 发送错误消息消息
  15. TellError(msg *protocol.Message, err error)
  16. // message ack
  17. Ack(msg *protocol.Message)
  18. }
  19. // DefaultContext 默认的上下文
  20. type DefaultContext struct {
  21. nodeCtx *RuleNodeCtx
  22. mainCtx *actor.SystemContext
  23. }
  24. func NewDefaultContext(nodeCtx *RuleNodeCtx, mainCtx *actor.SystemContext) *DefaultContext {
  25. return &DefaultContext{nodeCtx: nodeCtx, mainCtx: mainCtx}
  26. }
  27. func (d *DefaultContext) TellSuccess(msg *protocol.Message) {
  28. d.tellNext(msg, []protocol.RelationType{protocol.Success}, nil)
  29. }
  30. func (d *DefaultContext) TellNext(msg *protocol.Message, relationType protocol.RelationType) {
  31. d.tellNext(msg, []protocol.RelationType{relationType}, nil)
  32. }
  33. func (d *DefaultContext) tellNext(msg *protocol.Message, relationTypes []protocol.RelationType, err error) {
  34. if d.nodeCtx.Self.IsDebug {
  35. // TODO: 输出调试日志
  36. }
  37. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  38. d.nodeCtx.ChainActor.Tell(
  39. &actor.RuleNodeToRuleChanTellNextMsg{
  40. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  41. RelationTypes: relationTypes,
  42. Message: msg,
  43. FailureMessage: err,
  44. })
  45. }
  46. func (d *DefaultContext) TellSelf(msg *protocol.Message, duration time.Duration) {
  47. if duration > 0 {
  48. time.AfterFunc(duration, func() {
  49. d.nodeCtx.SelfActor.Tell(&actor.RuleToSelfMsg{Message: msg})
  50. })
  51. }
  52. }
  53. func (d *DefaultContext) TellError(msg *protocol.Message, err error) {
  54. if d.nodeCtx.Self.IsDebug {
  55. // TODO: 处理调试
  56. }
  57. d.nodeCtx.ChainActor.Tell(&actor.RuleNodeToRuleChanTellNextMsg{
  58. RuleNodeId: d.nodeCtx.Self.RuleNodeId,
  59. RelationTypes: []protocol.RelationType{protocol.Failure},
  60. Message: msg,
  61. FailureMessage: err,
  62. })
  63. }
  64. func (d *DefaultContext) Ack(msg *protocol.Message) {
  65. if d.nodeCtx.Self.IsDebug {
  66. // TODO: 处理调试
  67. }
  68. msg.GetCallBack().OnProcessingEnd(d.nodeCtx.Self.RuleNodeId)
  69. msg.GetCallBack().OnSuccess()
  70. }