123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 |
- package queue
- import (
- "bytes"
- "encoding/gob"
- "github.com/gogf/gf/util/guid"
- "sparrow/pkg/protocol"
- )
- type QueueMessage interface {
- GetKey() string
- GetData() []byte
- GetHeaders() QueueMsgHeaders
- }
- type QueueMsgHeaders interface {
- Put(key string, value []byte)
- Get(key string) []byte
- GetData() map[string][]byte
- }
- // GobQueueMessage 发到消息队列中的消息
- type GobQueueMessage struct {
- Key string
- Value []byte
- Headers *DefaultQueueMsgHeader
- }
- func (g *GobQueueMessage) GetKey() string {
- return g.Key
- }
- func (g *GobQueueMessage) GetData() []byte {
- return g.Value
- }
- func (g *GobQueueMessage) GetHeaders() QueueMsgHeaders {
- return g.Headers
- }
- func NewGobQueueMessage(msg *protocol.Message) (*GobQueueMessage, error) {
- b, err := msg.Encode()
- if err != nil {
- return nil, err
- }
- return &GobQueueMessage{
- Key: guid.S(),
- Value: b,
- Headers: new(DefaultQueueMsgHeader),
- }, nil
- }
- func (g *GobQueueMessage) Marshal() ([]byte, error) {
- var network bytes.Buffer
- enc := gob.NewEncoder(&network)
- err := enc.Encode(g)
- if err != nil {
- return nil, err
- }
- return network.Bytes(), nil
- }
- func (g *GobQueueMessage) UnMarshal(data []byte) error {
- var network bytes.Buffer
- network.Write(data)
- dec := gob.NewDecoder(&network)
- return dec.Decode(g)
- }
- type DefaultQueueMsgHeader struct {
- Data map[string][]byte
- }
- func (d *DefaultQueueMsgHeader) Put(key string, value []byte) {
- d.Data[key] = value
- }
- func (d *DefaultQueueMsgHeader) Get(key string) []byte {
- if v, ok := d.Data[key]; !ok {
- return nil
- } else {
- return v
- }
- }
- func (d *DefaultQueueMsgHeader) GetData() map[string][]byte {
- return d.Data
- }
|