queue_msg.go 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package queue
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "github.com/gogf/gf/util/guid"
  6. "sparrow/pkg/protocol"
  7. )
  8. type QueueMessage interface {
  9. GetKey() string
  10. GetData() []byte
  11. GetHeaders() QueueMsgHeaders
  12. }
  13. type QueueMsgHeaders interface {
  14. Put(key string, value []byte)
  15. Get(key string) []byte
  16. GetData() map[string][]byte
  17. }
  18. // GobQueueMessage 发到消息队列中的消息
  19. type GobQueueMessage struct {
  20. Key string
  21. Value []byte
  22. Headers *DefaultQueueMsgHeader
  23. }
  24. func (g *GobQueueMessage) GetKey() string {
  25. return g.Key
  26. }
  27. func (g *GobQueueMessage) GetData() []byte {
  28. return g.Value
  29. }
  30. func (g *GobQueueMessage) GetHeaders() QueueMsgHeaders {
  31. return g.Headers
  32. }
  33. func NewGobQueueMessage(msg *protocol.Message) (*GobQueueMessage, error) {
  34. b, err := msg.Encode()
  35. if err != nil {
  36. return nil, err
  37. }
  38. return &GobQueueMessage{
  39. Key: guid.S(),
  40. Value: b,
  41. Headers: NewDefaultQueueMsgHeader(),
  42. }, nil
  43. }
  44. func (g *GobQueueMessage) Marshal() ([]byte, error) {
  45. var network bytes.Buffer
  46. enc := gob.NewEncoder(&network)
  47. err := enc.Encode(g)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return network.Bytes(), nil
  52. }
  53. func (g *GobQueueMessage) UnMarshal(data []byte) error {
  54. var network bytes.Buffer
  55. network.Write(data)
  56. dec := gob.NewDecoder(&network)
  57. return dec.Decode(g)
  58. }
  59. type DefaultQueueMsgHeader struct {
  60. Data map[string][]byte
  61. }
  62. func NewDefaultQueueMsgHeader() *DefaultQueueMsgHeader {
  63. return &DefaultQueueMsgHeader{Data: make(map[string][]byte)}
  64. }
  65. func (d *DefaultQueueMsgHeader) Put(key string, value []byte) {
  66. d.Data[key] = value
  67. }
  68. func (d *DefaultQueueMsgHeader) Get(key string) []byte {
  69. if v, ok := d.Data[key]; !ok {
  70. return nil
  71. } else {
  72. return v
  73. }
  74. }
  75. func (d *DefaultQueueMsgHeader) GetData() map[string][]byte {
  76. return d.Data
  77. }