Browse Source

完善初始化consumer

lijian 2 years ago
parent
commit
c70e94695e

+ 3 - 2
pkg/rpcs/access.go

@@ -11,8 +11,9 @@ type ArgsSetStatus struct {
 type ReplySetStatus ReplyEmptyResult
 
 type ArgsGetStatus struct {
-	Id       string
-	VendorId string
+	Id          string
+	VendorId    string
+	SubDeviceId string
 }
 type ReplyGetStatus struct {
 	Status []protocol.SubData

+ 1 - 5
services/apiprovider/actions.go

@@ -123,12 +123,8 @@ func GetDeviceCurrentStatus(device *models.Device, config *productconfig.Product
 		Id: device.DeviceIdentifier,
 	}
 	statusreply := rpcs.ReplyGetStatus{}
-	//opentracing
-	span, ctx := opentracing.StartSpanFromContext(context.Background(), "GetDeviceCurrentStatus")
-	defer span.Finish()
-	ext.SpanKindRPCClient.Set(span)
 
-	err := server.RPCCallByName(ctx, rpcs.ControllerName, "Controller.GetStatus", statusargs, &statusreply)
+	err := server.RPCCallByName(context.Background(), rpcs.ControllerName, "Controller.GetStatus", statusargs, &statusreply)
 	if err != nil {
 		server.Log.Errorf("get device status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))

+ 22 - 15
services/controller/controller.go

@@ -34,21 +34,9 @@ func NewController(rabbithost string) (*Controller, error) {
 	admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
 	producer := msgQueue.NewRabbitMqProducer(admin, "default")
 	consumer := msgQueue.NewRabbitConsumer(admin, "MAIN")
-	tp := make([]*queue.TopicPartitionInfo, 0)
-	tp = append(tp, &queue.TopicPartitionInfo{
-		Topic:       "MAIN",
-		TenantId:    "1ps9djpswi0cds7cofynkso300eql4iu",
-		Partition:   0,
-		MyPartition: false,
-	})
-	tp = append(tp, &queue.TopicPartitionInfo{
-		Topic:       "MAIN",
-		TenantId:    "1ps9djpswi0cds7cofynkso300eql4sw",
-		Partition:   0,
-		MyPartition: false,
-	})
-	_ = consumer.SubscribeWithPartitions(tp)
-
+	if err := initConsumers(consumer); err != nil {
+		return nil, err
+	}
 	if err := producer.Init(); err != nil {
 		return nil, err
 	}
@@ -61,6 +49,25 @@ func NewController(rabbithost string) (*Controller, error) {
 	}, nil
 }
 
+func initConsumers(consumer *msgQueue.RabbitMqConsumer) error {
+	tService := &TenantService{}
+	tp := make([]*queue.TopicPartitionInfo, 0)
+
+	tenants, err := tService.FindTenants()
+	if err != nil {
+		return err
+	}
+	for _, t := range tenants {
+		tp = append(tp, &queue.TopicPartitionInfo{
+			Topic:       "MAIN",
+			TenantId:    t.Id,
+			Partition:   0,
+			MyPartition: false,
+		})
+	}
+	return consumer.SubscribeWithPartitions(tp)
+}
+
 // SetStatus 设置设备状态
 func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
 	rpchost, err := getAccessRPCHost(args.DeviceId)

+ 4 - 3
services/mqttaccess/access.go

@@ -47,9 +47,10 @@ func (a *Access) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus)
 	server.Log.Infof("Access Get Status: %v", args)
 	// first send a get status command
 	cmdArgs := rpcs.ArgsSendCommand{
-		DeviceId: args.Id,
-		WaitTime: 0,
-		Cmd:      "report",
+		DeviceId:  args.Id,
+		WaitTime:  0,
+		SubDevice: args.SubDeviceId,
+		Cmd:       "report",
 	}
 	cmdReply := rpcs.ReplySendCommand{}
 	return a.SendCommand(cmdArgs, &cmdReply)