queue_msg.go 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package queue
  2. import (
  3. "bytes"
  4. "encoding/gob"
  5. "github.com/gogf/gf/util/guid"
  6. "sparrow/pkg/protocol"
  7. )
  8. type QueueMessage interface {
  9. GetKey() string
  10. GetData() []byte
  11. GetHeaders() QueueMsgHeaders
  12. }
  13. type QueueMsgHeaders interface {
  14. Put(key string, value []byte)
  15. Get(key string) []byte
  16. GetData() map[string][]byte
  17. }
  18. // GobQueueMessage 发到消息队列中的消息
  19. type GobQueueMessage struct {
  20. Key string
  21. Value []byte
  22. Headers *DefaultQueueMsgHeader
  23. }
  24. func (g *GobQueueMessage) GetKey() string {
  25. return g.Key
  26. }
  27. func (g *GobQueueMessage) GetData() []byte {
  28. return g.Value
  29. }
  30. func (g *GobQueueMessage) GetHeaders() QueueMsgHeaders {
  31. return g.Headers
  32. }
  33. func NewGobQueueMessage(msg *protocol.Message) (*GobQueueMessage, error) {
  34. b, err := msg.Encode()
  35. if err != nil {
  36. return nil, err
  37. }
  38. return &GobQueueMessage{
  39. Key: guid.S(),
  40. Value: b,
  41. Headers: new(DefaultQueueMsgHeader),
  42. }, nil
  43. }
  44. func (g *GobQueueMessage) Marshal() ([]byte, error) {
  45. var network bytes.Buffer
  46. enc := gob.NewEncoder(&network)
  47. err := enc.Encode(g)
  48. if err != nil {
  49. return nil, err
  50. }
  51. return network.Bytes(), nil
  52. }
  53. func (g *GobQueueMessage) UnMarshal(data []byte) error {
  54. var network bytes.Buffer
  55. network.Write(data)
  56. dec := gob.NewDecoder(&network)
  57. return dec.Decode(g)
  58. }
  59. type DefaultQueueMsgHeader struct {
  60. Data map[string][]byte
  61. }
  62. func (d *DefaultQueueMsgHeader) Put(key string, value []byte) {
  63. d.Data[key] = value
  64. }
  65. func (d *DefaultQueueMsgHeader) Get(key string) []byte {
  66. if v, ok := d.Data[key]; !ok {
  67. return nil
  68. } else {
  69. return v
  70. }
  71. }
  72. func (d *DefaultQueueMsgHeader) GetData() map[string][]byte {
  73. return d.Data
  74. }