message.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type MessageSerializer interface {
  9. Encode() bytes.Buffer
  10. Decode()
  11. }
  12. type Message struct {
  13. QueueName string
  14. Id string
  15. Ts *time.Time
  16. Type string
  17. Data string
  18. RuleChanId string
  19. RuleNodeId string
  20. Callback IMessageCallBack
  21. MetaData map[string]interface{}
  22. Originator string
  23. execCounter int32
  24. }
  25. func (a *Message) GetAndIncrementRuleNodeCounter() int32 {
  26. return atomic.AddInt32(&a.execCounter, 1)
  27. }
  28. func (a *Message) Encode() ([]byte, error) {
  29. var network bytes.Buffer
  30. enc := gob.NewEncoder(&network)
  31. err := enc.Encode(a)
  32. if err != nil {
  33. return nil, err
  34. }
  35. return network.Bytes(), nil
  36. }
  37. func (a *Message) Decode(data []byte) error {
  38. var network bytes.Buffer
  39. network.Write(data)
  40. dec := gob.NewDecoder(&network)
  41. return dec.Decode(a)
  42. }
  43. // IMessageCallBack message call back
  44. type IMessageCallBack interface {
  45. // on success do sth.
  46. OnSuccess()
  47. // on failure do sth.
  48. OnFailure(err error)
  49. // on process start do sth.
  50. OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
  51. // on process end do sth.
  52. OnProcessingEnd(ruleNodeId string)
  53. }
  54. type emptyCallBack struct {
  55. }
  56. func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
  57. }
  58. func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
  59. }
  60. var EmptyCallBack = emptyCallBack{}
  61. func (e emptyCallBack) OnSuccess() {
  62. }
  63. func (e emptyCallBack) OnFailure(err error) {
  64. }
  65. // SetCallBack
  66. func (a *Message) SetCallBack(callback IMessageCallBack) {
  67. a.Callback = callback
  68. }
  69. // GetCallBack get message call back
  70. func (a *Message) GetCallBack() IMessageCallBack {
  71. if a.Callback == nil {
  72. return EmptyCallBack
  73. } else {
  74. return a.Callback
  75. }
  76. }
  77. func (a *Message) GetQueueName() string {
  78. if a.QueueName == "" {
  79. return "MAIN"
  80. }
  81. return a.QueueName
  82. }
  83. func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
  84. return &Message{
  85. QueueName: a.QueueName,
  86. Id: a.Id,
  87. Ts: a.Ts,
  88. Type: a.Type,
  89. Data: a.Data,
  90. RuleChanId: ruleChainId,
  91. RuleNodeId: a.RuleNodeId,
  92. Callback: a.Callback,
  93. MetaData: a.MetaData,
  94. Originator: a.Originator,
  95. }
  96. }
  97. func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
  98. return &Message{
  99. QueueName: a.QueueName,
  100. Id: a.Id,
  101. Ts: a.Ts,
  102. Type: a.Type,
  103. Data: a.Data,
  104. RuleChanId: a.RuleChanId,
  105. RuleNodeId: ruleNodeId,
  106. Callback: a.Callback,
  107. MetaData: a.MetaData,
  108. Originator: a.Originator,
  109. }
  110. }