message.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package protocol
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "sync/atomic"
  6. "time"
  7. )
  8. type Message struct {
  9. QueueName string
  10. Id string
  11. Ts *time.Time
  12. Type string
  13. Data string
  14. RuleChanId string
  15. RuleNodeId string
  16. Callback IMessageCallBack
  17. MetaData map[string]interface{}
  18. Originator string
  19. execCounter int32
  20. AlarmMessage string
  21. }
  22. func (a *Message) TransformMsg(msgType string, ori string, data string) *Message {
  23. return &Message{
  24. QueueName: a.QueueName,
  25. Id: a.Id,
  26. Ts: a.Ts,
  27. Type: msgType,
  28. Data: data,
  29. RuleChanId: a.RuleChanId,
  30. RuleNodeId: a.RuleNodeId,
  31. Callback: a.Callback,
  32. MetaData: a.MetaData,
  33. Originator: ori,
  34. }
  35. }
  36. func (a *Message) GetAndIncrementRuleNodeCounter() int32 {
  37. return atomic.AddInt32(&a.execCounter, 1)
  38. }
  39. func (a *Message) Encode() ([]byte, error) {
  40. var network bytes.Buffer
  41. enc := gob.NewEncoder(&network)
  42. err := enc.Encode(a)
  43. if err != nil {
  44. return nil, err
  45. }
  46. return network.Bytes(), nil
  47. }
  48. func (a *Message) Decode(data []byte) error {
  49. var network bytes.Buffer
  50. defer network.Reset()
  51. network.Write(data)
  52. dec := gob.NewDecoder(&network)
  53. return dec.Decode(a)
  54. }
  55. // IMessageCallBack message call back
  56. type IMessageCallBack interface {
  57. // OnSuccess on success do sth.
  58. OnSuccess()
  59. // OnFailure on failure do sth.
  60. OnFailure(err error)
  61. // OnProcessingStart on process start do sth.
  62. OnProcessingStart(ruleNodeInfo *RuleNodeInfo)
  63. // OnProcessingEnd on process end do sth.
  64. OnProcessingEnd(ruleNodeId string)
  65. }
  66. type emptyCallBack struct {
  67. }
  68. func (e emptyCallBack) OnProcessingStart(ruleNodeInfo *RuleNodeInfo) {
  69. }
  70. func (e emptyCallBack) OnProcessingEnd(ruleNodeId string) {
  71. }
  72. var EmptyCallBack = emptyCallBack{}
  73. func (e emptyCallBack) OnSuccess() {
  74. }
  75. func (e emptyCallBack) OnFailure(err error) {
  76. // server.Log.Error("消息出错:" + err.Error())
  77. }
  78. // SetCallBack
  79. func (a *Message) SetCallBack(callback IMessageCallBack) {
  80. a.Callback = callback
  81. }
  82. // GetCallBack get message call back
  83. func (a *Message) GetCallBack() IMessageCallBack {
  84. if a.Callback == nil {
  85. return EmptyCallBack
  86. } else {
  87. return a.Callback
  88. }
  89. }
  90. func (a *Message) GetQueueName() string {
  91. if a.QueueName == "" {
  92. return "MAIN"
  93. }
  94. return a.QueueName
  95. }
  96. func (a *Message) CopyWithRuleChainId(ruleChainId string) *Message {
  97. return &Message{
  98. QueueName: a.QueueName,
  99. Id: a.Id,
  100. Ts: a.Ts,
  101. Type: a.Type,
  102. Data: a.Data,
  103. RuleChanId: ruleChainId,
  104. RuleNodeId: a.RuleNodeId,
  105. Callback: a.Callback,
  106. MetaData: a.MetaData,
  107. Originator: a.Originator,
  108. }
  109. }
  110. func (a *Message) CopyWithRuleNodeId(ruleNodeId string) *Message {
  111. return &Message{
  112. QueueName: a.QueueName,
  113. Id: a.Id,
  114. Ts: a.Ts,
  115. Type: a.Type,
  116. Data: a.Data,
  117. RuleChanId: a.RuleChanId,
  118. RuleNodeId: ruleNodeId,
  119. Callback: a.Callback,
  120. MetaData: a.MetaData,
  121. Originator: a.Originator,
  122. }
  123. }