package queue import ( "bytes" "encoding/gob" "github.com/gogf/gf/util/guid" "sparrow/pkg/protocol" ) type QueueMessage interface { GetKey() string GetData() []byte GetHeaders() QueueMsgHeaders } type QueueMsgHeaders interface { Put(key string, value []byte) Get(key string) []byte GetData() map[string][]byte } // GobQueueMessage 发到消息队列中的消息 type GobQueueMessage struct { Key string Value []byte Headers *DefaultQueueMsgHeader } func (g *GobQueueMessage) GetKey() string { return g.Key } func (g *GobQueueMessage) GetData() []byte { return g.Value } func (g *GobQueueMessage) GetHeaders() QueueMsgHeaders { return g.Headers } func NewGobQueueMessage(msg *protocol.Message) (*GobQueueMessage, error) { b, err := msg.Encode() if err != nil { return nil, err } return &GobQueueMessage{ Key: guid.S(), Value: b, Headers: new(DefaultQueueMsgHeader), }, nil } func (g *GobQueueMessage) Marshal() ([]byte, error) { var network bytes.Buffer enc := gob.NewEncoder(&network) err := enc.Encode(g) if err != nil { return nil, err } return network.Bytes(), nil } func (g *GobQueueMessage) UnMarshal(data []byte) error { var network bytes.Buffer network.Write(data) dec := gob.NewDecoder(&network) return dec.Decode(g) } type DefaultQueueMsgHeader struct { Data map[string][]byte } func (d *DefaultQueueMsgHeader) Put(key string, value []byte) { d.Data[key] = value } func (d *DefaultQueueMsgHeader) Get(key string) []byte { if v, ok := d.Data[key]; !ok { return nil } else { return v } } func (d *DefaultQueueMsgHeader) GetData() map[string][]byte { return d.Data }