Ver código fonte

支持event 转发,优化http节点

lijian 3 anos atrás
pai
commit
b5426086ab

+ 1 - 1
pkg/generator/key_gen_test.go

@@ -9,7 +9,7 @@ func TestKeyGen(t *testing.T) {
 	if err == nil {
 		t.Error("should return error when key length is invalid")
 	}
-	 testid := "1ps9djpivy0cdrkuofw86083000jfcet"
+	testid := "1ps9djpivy0cdrkuofw86083000j1234"
 	generator, err = NewKeyGenerator("ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP")
 	if err != nil {
 		t.Fatal(err)

+ 15 - 11
pkg/productconfig/productconfig.go

@@ -145,27 +145,31 @@ func (config *ProductConfig) StatusToMap(status []protocol.SubData) (map[string]
 }
 
 // EventToMap event ot map
-func (config *ProductConfig) EventToMap(event *protocol.Event) (map[string][]interface{}, error) {
-	result := make(map[string][]interface{})
-	name := ""
-	for _, ev := range config.Events {
-		if ev.No == int(event.Head.No) {
-			name = ev.Name
-		}
-	}
+func (config *ProductConfig) EventToMap(event *protocol.Event) (map[string]interface{}, error) {
+	result := make(map[string]interface{})
 	val, err := tlv.ReadTLVs(event.Params)
 	if err != nil {
 		return nil, err
 	}
+	label := ""
+	values := make(map[string]interface{})
+	for _, obj := range config.Events {
+		if obj.No == int(event.Head.No) {
+			label = obj.Name
+			for k, v := range obj.Params {
+				values[v.Name] = val[k]
+			}
+		}
+	}
 
-	result[name] = val
-
+	result[label] = values
+	result["device_id"] = event.Head.SubDeviceid
 	return result, nil
 }
 
 // MapToStatus map to status
 func (config *ProductConfig) MapToStatus(data map[string]interface{}) ([]protocol.SubData, error) {
-	result := []protocol.SubData{}
+	var result []protocol.SubData
 
 	for label, one := range data {
 		params, ok := one.([]interface{})

+ 4 - 4
pkg/productconfig/productconfig_test.go

@@ -2,9 +2,9 @@ package productconfig
 
 import (
 	"encoding/json"
+	"reflect"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/tlv"
-	"reflect"
 	"testing"
 )
 
@@ -58,11 +58,11 @@ func testStatus(c *ProductConfig, t *testing.T) {
 	if err != nil {
 		t.Error(err)
 	}
-	str, err :=json.Marshal(res)
+	str, err := json.Marshal(res)
 	t.Log(string(str))
 
 	m := make(map[string]interface{})
-	m["location"] = []interface{}{float64(1)}
+	m["location"] = []interface{}{float64(1), float64(1)}
 	_, err = c.MapToStatus(m)
 	if err != nil {
 		t.Error(err)
@@ -92,7 +92,7 @@ func testEvent(c *ProductConfig, t *testing.T) {
 	}
 
 	got := string(result)
-
+	t.Log(got)
 	if got != want {
 		t.Errorf("event to map error: want: %v, got : %v", want, got)
 	}

+ 4 - 4
pkg/queue/msgQueue/rabbitmq.go

@@ -159,7 +159,7 @@ func (r *RabbitMqConsumer) Subscribe() error {
 	defer r.mu.Unlock()
 	r.partitions = append(r.partitions, &queue.TopicPartitionInfo{
 		Topic:       r.topic,
-		TenantId:    "1ps9djpswi0cds7cofynkso300eql4iu",
+		TenantId:    "*",
 		Partition:   0,
 		MyPartition: true,
 	})
@@ -224,9 +224,9 @@ func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
 		return errors.New("ch and conn is not init")
 	}
 	for _, topic := range r.topics {
-		go func() {
+		go func(tpc string) {
 			msgs, err := r.admin.ch.Consume(
-				topic,
+				tpc,
 				"",    // consumer
 				false, // auto-ack
 				false, // exclusive
@@ -242,7 +242,7 @@ func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
 				r.recvChan <- d.Body
 				d.Ack(true)
 			}
-		}()
+		}(topic)
 	}
 	return nil
 }

+ 5 - 4
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -26,9 +26,9 @@ import (
 }
 */
 type RestApiRequestNode struct {
-	pool   *grpool.Pool
-	config *RestApiRequestNodeConfig
-	client *utils.HttpClient
+	pool    *grpool.Pool
+	config  *RestApiRequestNodeConfig
+	client  *utils.HttpClient
 	bufPool sync.Pool
 }
 
@@ -56,7 +56,7 @@ func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
 	client.SetLogger(server.Log)
 	r.client = client
 	r.bufPool = sync.Pool{
-		New: func() interface{}{
+		New: func() interface{} {
 			return new(bytes.Buffer)
 		},
 	}
@@ -95,6 +95,7 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 			ctx.TellError(next, err)
 			return
 		}
+		defer res.Close()
 		if res != nil && res.Response() != nil {
 			if res.Response().StatusCode == http.StatusOK {
 				msg := r.processResponse(ctx, message, res)

+ 3 - 3
pkg/utils/http_client.go

@@ -318,9 +318,9 @@ func NewHttpClient() *HttpClient {
 		httpClient: &http.Client{
 			Timeout: defaultRequestTimeout,
 			Transport: &http.Transport{
-				MaxIdleConns:           0,
-				MaxIdleConnsPerHost:    0,
-				MaxConnsPerHost:        0,
+				MaxIdleConns:        0,
+				MaxIdleConnsPerHost: 1,
+				MaxConnsPerHost:     1,
 			},
 		},
 		retryWaitTime: defaultRetryWaitTime,

+ 4 - 4
services/apiprovider/notifier.go

@@ -19,10 +19,10 @@ const (
 
 // report structure
 type ReportPack struct {
-	Tag        string                   `json:"tag"`
-	Identifier string                   `json:"identifier"`
-	TimeStamp  int64                    `json:"timestamp"`
-	Data       map[string][]interface{} `json:"data"`
+	Tag        string                 `json:"tag"`
+	Identifier string                 `json:"identifier"`
+	TimeStamp  int64                  `json:"timestamp"`
+	Data       map[string]interface{} `json:"data"`
 }
 
 var notifier *Notifier

+ 56 - 10
services/controller/controller.go

@@ -32,21 +32,27 @@ func NewController(rabbithost string) (*Controller, error) {
 	admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
 	producer := msgQueue.NewRabbitMqProducer(admin, "default")
 	consumer := msgQueue.NewRabbitConsumer(admin, "MAIN")
-	_ = consumer.Subscribe()
-	//// timer
-	//t := rule.NewTimer()
-	//t.Run()
-	//
-	//// ifttt
-	//ttt := rule.NewIfttt()
+	tp := make([]*queue.TopicPartitionInfo, 0)
+	tp = append(tp, &queue.TopicPartitionInfo{
+		Topic:       "MAIN",
+		TenantId:    "1ps9djpswi0cds7cofynkso300eql4iu",
+		Partition:   0,
+		MyPartition: false,
+	})
+	tp = append(tp, &queue.TopicPartitionInfo{
+		Topic:       "MAIN",
+		TenantId:    "1ps9djpswi0cds7cofynkso300eql4sw",
+		Partition:   0,
+		MyPartition: false,
+	})
+	_ = consumer.SubscribeWithPartitions(tp)
+
 	if err := producer.Init(); err != nil {
 		return nil, err
 	}
 
 	return &Controller{
 		producer: producer,
-		// timer:    t,
-		// ift:      ttt,
 		consumer: consumer,
 		cluster:  &ClusterService{producer: producer},
 		pool:     grpool.New(),
@@ -137,6 +143,47 @@ func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error
 	return result, nil
 }
 
+func (c *Controller) processEventToQueue(args rpcs.ArgsOnEvent) (string, error) {
+	var result string
+	device := &models.Device{}
+	err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByRecordId", &rpcs.ArgsDeviceAuth{DeviceID: args.DeviceId}, device)
+	if err != nil {
+		server.Log.Errorf("find device error : %v", err)
+		return result, err
+	}
+
+	product := &models.Product{}
+	err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", &device.ProductID, product)
+	if err != nil {
+		server.Log.Errorf("find product error : %v", err)
+		return result, err
+	}
+
+	pc, err := productconfig.New(product.ProductConfig)
+	if err != nil {
+		server.Log.Errorf("product config error : %v", err)
+		return result, err
+	}
+	ev := &protocol.Event{}
+	ev.Head.SubDeviceid = args.SubDevice
+	ev.Head.Priority = args.Priority
+	ev.Head.No = args.No
+	ev.Head.Timestamp = args.TimeStamp
+	ev.Head.ParamsCount = uint16(len(args.Params))
+	ev.Params = args.Params
+	m, err := pc.EventToMap(ev)
+	if err != nil {
+		server.Log.Errorf("gen status json error : %v", err)
+		return result, err
+	}
+	b, err := json.Marshal(&m)
+	if err != nil {
+		server.Log.Errorf("marshal json error : %v", err)
+	}
+	result = string(b)
+	return result, nil
+}
+
 func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
 	go func() {
 		err := c.ift.Check(args.DeviceId, args.No)
@@ -223,7 +270,6 @@ func (c *Controller) launchConsumer() {
 				fmt.Println("解析消息失败")
 			}
 			tanantId := msg.GetHeaders().Get("tanant_id")
-			server.Log.Debugf("tanant_id:%s", tanantId)
 			if c.actorContext != nil {
 				c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{
 					TenantId: string(tanantId),

+ 1 - 1
tools/pdcfg/run.sh

@@ -1 +1 @@
-./pdcfg -etcd http://127.0.0.1:18153
+./pdcfg -etcd http://106.14.63.46:18153