message.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "time"
  6. )
  7. type MessageSerializer interface {
  8. Encode() bytes.Buffer
  9. Decode()
  10. }
  11. type Message struct {
  12. QueueName string
  13. Id string
  14. Ts *time.Time
  15. Type string
  16. Data string
  17. RuleChanId string
  18. RuleNodeId string
  19. Callback IMessageCallBack
  20. MetaData map[string]interface{}
  21. Originator string
  22. }
  23. func (a *Message) Encode() ([]byte, error) {
  24. var network bytes.Buffer
  25. enc := gob.NewEncoder(&network)
  26. err := enc.Encode(a)
  27. if err != nil {
  28. return nil, err
  29. }
  30. return network.Bytes(), nil
  31. }
  32. func (a *Message) Decode(data []byte) error {
  33. var network bytes.Buffer
  34. network.Write(data)
  35. dec := gob.NewDecoder(&network)
  36. return dec.Decode(a)
  37. }
  38. // IMessageCallBack message call back
  39. type IMessageCallBack interface {
  40. // on success do sth.
  41. OnSuccess()
  42. // on failure do sth.
  43. OnFailure(err error)
  44. // on process start do sth.
  45. OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
  46. // on process end do sth.
  47. OnProcessingEnd(ruleNodeId string)
  48. }
  49. type emptyCallBack struct {
  50. }
  51. func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
  52. }
  53. func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
  54. }
  55. var EmptyCallBack = emptyCallBack{}
  56. func (e emptyCallBack) OnSuccess() {
  57. }
  58. func (e emptyCallBack) OnFailure(err error) {
  59. }
  60. // SetCallBack
  61. func (a *Message) SetCallBack(callback IMessageCallBack) {
  62. a.Callback = callback
  63. }
  64. // GetCallBack get message call back
  65. func (a *Message) GetCallBack() IMessageCallBack {
  66. if a.Callback == nil {
  67. return EmptyCallBack
  68. } else {
  69. return a.Callback
  70. }
  71. }
  72. func (a *Message) GetQueueName() string {
  73. if a.QueueName == "" {
  74. return "MAIN"
  75. }
  76. return a.QueueName
  77. }
  78. func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
  79. return &Message{
  80. QueueName: a.QueueName,
  81. Id: a.Id,
  82. Ts: a.Ts,
  83. Type: a.Type,
  84. Data: a.Data,
  85. RuleChanId: ruleChainId,
  86. RuleNodeId: a.RuleNodeId,
  87. Callback: a.Callback,
  88. MetaData: a.MetaData,
  89. Originator: a.Originator,
  90. }
  91. }
  92. func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
  93. return &Message{
  94. QueueName: a.QueueName,
  95. Id: a.Id,
  96. Ts: a.Ts,
  97. Type: a.Type,
  98. Data: a.Data,
  99. RuleChanId: a.RuleChanId,
  100. RuleNodeId: ruleNodeId,
  101. Callback: a.Callback,
  102. MetaData: a.MetaData,
  103. Originator: a.Originator,
  104. }
  105. }