فهرست منبع

增加队列producer及consumer接口及队列消息实现

lijian 4 سال پیش
والد
کامیت
121798ea93
3فایلهای تغییر یافته به همراه78 افزوده شده و 0 حذف شده
  1. 20 0
      pkg/queue/queue_consumer.go
  2. 50 0
      pkg/queue/queue_msg.go
  3. 8 0
      pkg/queue/queue_producer.go

+ 20 - 0
pkg/queue/queue_consumer.go

@@ -0,0 +1,20 @@
+package queue
+
+import "time"
+
+// queue consumer interface
+// for a message queue consumer implements
+type QueueConsumer interface {
+	// get current topic
+	GetTopic() string
+	// subscribe the topic
+	Subscribe() error
+	// subscribe with partitions, partitions is topics
+	SubscribeWithPartitions(partitions []TopicPartitionInfo) error
+	// unsubscribe
+	UnSubscribe()
+	// pop message from queue with time duration
+	Pop(time time.Duration) ([]QueueMessage, error)
+	// commit a message to queue if it is necessary
+	Commit() error
+}

+ 50 - 0
pkg/queue/queue_msg.go

@@ -0,0 +1,50 @@
+package queue
+
+type QueueMessage interface {
+	GetKey() string
+	GetData() []byte
+	GetHeaders() QueueMsgHeaders
+}
+
+type QueueMsgHeaders interface {
+	Put(key string, value []byte)
+	Get(key string) []byte
+	GetData() map[string][]byte
+}
+type DefaultQueueMsg struct {
+	Key     string
+	Data    []byte
+	Headers *DefaultQueueMsgHeader
+}
+
+func (d *DefaultQueueMsg) GetKey() string {
+	return d.Key
+}
+
+func (d *DefaultQueueMsg) GetData() []byte {
+	return d.Data
+}
+
+func (d *DefaultQueueMsg) GetHeaders() QueueMsgHeaders {
+	return d.Headers
+}
+
+type DefaultQueueMsgHeader struct {
+	data map[string][]byte
+}
+
+func (d *DefaultQueueMsgHeader) Put(key string, value []byte) {
+	d.data[key] = value
+}
+
+func (d *DefaultQueueMsgHeader) Get(key string) []byte {
+	if v, ok := d.data[key]; !ok {
+		return nil
+	} else {
+		return v
+	}
+}
+
+func (d *DefaultQueueMsgHeader) GetData() map[string][]byte {
+	return d.data
+}

+ 8 - 0
pkg/queue/queue_producer.go

@@ -0,0 +1,8 @@
+package queue
+
+type RuleEngineQueueProducer interface {
+	Init() error
+	GetDefaultTopic() string
+	Send(info TopicPartitionInfo, msg QueueMessage, callback Callback) error
+	Stop() error
+}