message.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type Message struct {
  9. QueueName string
  10. Id string
  11. Ts *time.Time
  12. Type string
  13. Data string
  14. RuleChanId string
  15. RuleNodeId string
  16. Callback IMessageCallBack
  17. MetaData map[string]interface{}
  18. Originator string
  19. execCounter int32
  20. }
  21. func (a *Message) TransformMsg(msgType string, ori string, data string) *Message {
  22. return &Message{
  23. QueueName: a.QueueName,
  24. Id: a.Id,
  25. Ts: a.Ts,
  26. Type: msgType,
  27. Data: data,
  28. RuleChanId: a.RuleChanId,
  29. RuleNodeId: a.RuleNodeId,
  30. Callback: a.Callback,
  31. MetaData: a.MetaData,
  32. Originator: ori,
  33. }
  34. }
  35. func (a *Message) GetAndIncrementRuleNodeCounter() int32 {
  36. return atomic.AddInt32(&a.execCounter, 1)
  37. }
  38. func (a *Message) Encode() ([]byte, error) {
  39. var network bytes.Buffer
  40. enc := gob.NewEncoder(&network)
  41. err := enc.Encode(a)
  42. if err != nil {
  43. return nil, err
  44. }
  45. return network.Bytes(), nil
  46. }
  47. func (a *Message) Decode(data []byte) error {
  48. var network bytes.Buffer
  49. defer network.Reset()
  50. network.Write(data)
  51. dec := gob.NewDecoder(&network)
  52. return dec.Decode(a)
  53. }
  54. // IMessageCallBack message call back
  55. type IMessageCallBack interface {
  56. // OnSuccess on success do sth.
  57. OnSuccess()
  58. // OnFailure on failure do sth.
  59. OnFailure(err error)
  60. // OnProcessingStart on process start do sth.
  61. OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
  62. // OnProcessingEnd on process end do sth.
  63. OnProcessingEnd(ruleNodeId string)
  64. }
  65. type emptyCallBack struct {
  66. }
  67. func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
  68. }
  69. func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
  70. }
  71. var EmptyCallBack = emptyCallBack{}
  72. func (e emptyCallBack) OnSuccess() {
  73. }
  74. func (e emptyCallBack) OnFailure(err error) {
  75. // server.Log.Error("消息出错:" + err.Error())
  76. }
  77. // SetCallBack
  78. func (a *Message) SetCallBack(callback IMessageCallBack) {
  79. a.Callback = callback
  80. }
  81. // GetCallBack get message call back
  82. func (a *Message) GetCallBack() IMessageCallBack {
  83. if a.Callback == nil {
  84. return EmptyCallBack
  85. } else {
  86. return a.Callback
  87. }
  88. }
  89. func (a *Message) GetQueueName() string {
  90. if a.QueueName == "" {
  91. return "MAIN"
  92. }
  93. return a.QueueName
  94. }
  95. func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
  96. return &Message{
  97. QueueName: a.QueueName,
  98. Id: a.Id,
  99. Ts: a.Ts,
  100. Type: a.Type,
  101. Data: a.Data,
  102. RuleChanId: ruleChainId,
  103. RuleNodeId: a.RuleNodeId,
  104. Callback: a.Callback,
  105. MetaData: a.MetaData,
  106. Originator: a.Originator,
  107. }
  108. }
  109. func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
  110. return &Message{
  111. QueueName: a.QueueName,
  112. Id: a.Id,
  113. Ts: a.Ts,
  114. Type: a.Type,
  115. Data: a.Data,
  116. RuleChanId: a.RuleChanId,
  117. RuleNodeId: ruleNodeId,
  118. Callback: a.Callback,
  119. MetaData: a.MetaData,
  120. Originator: a.Originator,
  121. }
  122. }