Browse Source

修复Bug

lijian 3 years ago
parent
commit
f65b9e7fed

+ 2 - 0
pkg/ruleEngine/mailbox.go

@@ -115,10 +115,12 @@ func (m *MailBox) processMailbox() {
 			getQueue = m.normalPriorityMessages
 		}
 		if getQueue == nil {
+			noMoreElement = true
 			break
 		}
 		msg, ok := getQueue.Pop().(protocol.ActorMsg)
 		if !ok {
+			noMoreElement = true
 			break
 		}
 		if msg != nil {

+ 16 - 0
pkg/ruleEngine/nodes/kafka_node.go

@@ -0,0 +1,16 @@
+package nodes
+
+// KafkaNode Kafka Node sends messages to Kafka brokers. Expects messages with any message type. Will send record via Kafka producer to Kafka server
+type KafkaNode struct {
+}
+
+type KafkaNodeConfiguration struct {
+	TopicPattern        string            `json:"topic_pattern"`
+	BootstrapServer     string            `json:"bootstrap_server"`
+	RetryTimes          int               `json:"retry_times"`            // if fails retry times
+	BatchSize           int64             `json:"batch_size"`             // produces batch size in bytes
+	LocallyBufferTime   int64             `json:"locally_buffer_time"`    // time to buffer locally(ms)
+	MaxSizeClientBuffer int64             `json:"max_size_client_buffer"` // client buffer max size in bytes
+	MetaData            map[string]string `json:"meta_data"`
+	AckNumber           int               `json:"ack_number"` // number of ack
+}

+ 0 - 2
services/controller/main.go

@@ -1,7 +1,6 @@
 package main
 
 import (
-	"sparrow/pkg/debug"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
@@ -13,7 +12,6 @@ func main() {
 		server.Log.Fatal(err)
 		return
 	}
-	debug.StartHTTPPprof("127.0.0.1:8999")
 	// register a rpc service
 	controller, err := NewController(*confRabbitHost)
 	if err != nil {

BIN
tests/device/device


+ 2 - 2
tests/device/device.go

@@ -171,7 +171,7 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 			Flag:      0,
 			Timestamp: uint64(time.Now().Unix() * 1000),
 		}
-		param := []interface{}{"lijian"}
+		param := []interface{}{uint16(1), uint16(2)}
 		params, err := tlv.MakeTLVs(param)
 		if err != nil {
 			fmt.Println(err)
@@ -179,7 +179,7 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 		}
 		sub := protocol.SubData{
 			Head: protocol.SubDataHead{
-				SubDeviceid: uint16(225),
+				SubDeviceid: uint16(1),
 				PropertyNum: uint16(1),
 				ParamsCount: uint16(len(params)),
 			},

+ 2 - 2
tests/device/main.go

@@ -6,8 +6,8 @@ import (
 )
 
 var (
-	testURL        = flag.String("url", "http://192.168.1.173:8088", "login url")
-	testProductKey = flag.String("productkey", "2e397f5599a3f6f6a5a3c8fcd45437169501b3c6e239042ad5e9b65303561e41ab5519e9d205facbcbe75a2784354501", "product key")
+	testURL        = flag.String("url", "http://106.14.63.46:18100", "login url")
+	testProductKey = flag.String("productkey", "958daf8b3a533f0d9516ac8fd17ef0cb06b439e664787a2a89608a10eeee8eb3c35c82c505d19f8a4417e530de0678fd", "product key")
 	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )