message.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "sparrow/pkg/server"
  6. "sync/atomic"
  7. "time"
  8. )
  9. type Message struct {
  10. QueueName string
  11. Id string
  12. Ts *time.Time
  13. Type string
  14. Data string
  15. RuleChanId string
  16. RuleNodeId string
  17. Callback IMessageCallBack
  18. MetaData map[string]interface{}
  19. Originator string
  20. execCounter int32
  21. }
  22. func (a *Message) TransformMsg(msgType string, ori string, data string) *Message {
  23. return &Message{
  24. QueueName: a.QueueName,
  25. Id: a.Id,
  26. Ts: a.Ts,
  27. Type: msgType,
  28. Data: data,
  29. RuleChanId: a.RuleChanId,
  30. RuleNodeId: a.RuleNodeId,
  31. Callback: a.Callback,
  32. MetaData: a.MetaData,
  33. Originator: ori,
  34. }
  35. }
  36. func (a *Message) GetAndIncrementRuleNodeCounter() int32 {
  37. return atomic.AddInt32(&a.execCounter, 1)
  38. }
  39. func (a *Message) Encode() ([]byte, error) {
  40. var network bytes.Buffer
  41. enc := gob.NewEncoder(&network)
  42. err := enc.Encode(a)
  43. if err != nil {
  44. return nil, err
  45. }
  46. return network.Bytes(), nil
  47. }
  48. func (a *Message) Decode(data []byte) error {
  49. var network bytes.Buffer
  50. network.Write(data)
  51. dec := gob.NewDecoder(&network)
  52. return dec.Decode(a)
  53. }
  54. // IMessageCallBack message call back
  55. type IMessageCallBack interface {
  56. // on success do sth.
  57. OnSuccess()
  58. // on failure do sth.
  59. OnFailure(err error)
  60. // on process start do sth.
  61. OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
  62. // on process end do sth.
  63. OnProcessingEnd(ruleNodeId string)
  64. }
  65. type emptyCallBack struct {
  66. }
  67. func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
  68. }
  69. func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
  70. }
  71. var EmptyCallBack = emptyCallBack{}
  72. func (e emptyCallBack) OnSuccess() {
  73. }
  74. func (e emptyCallBack) OnFailure(err error) {
  75. server.Log.Error("消息出错:" + err.Error())
  76. }
  77. // SetCallBack
  78. func (a *Message) SetCallBack(callback IMessageCallBack) {
  79. a.Callback = callback
  80. }
  81. // GetCallBack get message call back
  82. func (a *Message) GetCallBack() IMessageCallBack {
  83. if a.Callback == nil {
  84. return EmptyCallBack
  85. } else {
  86. return a.Callback
  87. }
  88. }
  89. func (a *Message) GetQueueName() string {
  90. if a.QueueName == "" {
  91. return "MAIN"
  92. }
  93. return a.QueueName
  94. }
  95. func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
  96. return &Message{
  97. QueueName: a.QueueName,
  98. Id: a.Id,
  99. Ts: a.Ts,
  100. Type: a.Type,
  101. Data: a.Data,
  102. RuleChanId: ruleChainId,
  103. RuleNodeId: a.RuleNodeId,
  104. Callback: a.Callback,
  105. MetaData: a.MetaData,
  106. Originator: a.Originator,
  107. }
  108. }
  109. func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
  110. return &Message{
  111. QueueName: a.QueueName,
  112. Id: a.Id,
  113. Ts: a.Ts,
  114. Type: a.Type,
  115. Data: a.Data,
  116. RuleChanId: a.RuleChanId,
  117. RuleNodeId: ruleNodeId,
  118. Callback: a.Callback,
  119. MetaData: a.MetaData,
  120. Originator: a.Originator,
  121. }
  122. }