Browse Source

状态上报消息头添加时间戳

liuxiulin 2 years ago
parent
commit
c42bfc5a3d
1 changed files with 10 additions and 9 deletions
  1. 10 9
      services/controller/controller.go

+ 10 - 9
services/controller/controller.go

@@ -18,7 +18,7 @@ import (
 	"time"
 )
 
-//Controller 消息控制器
+// Controller 消息控制器
 type Controller struct {
 	producer     queue.QueueProducer
 	timer        *rule.Timer
@@ -29,7 +29,7 @@ type Controller struct {
 	pool         *grpool.Pool
 }
 
-//NewController 新建消息控制器
+// NewController 新建消息控制器
 func NewController(rabbithost string) (*Controller, error) {
 	admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
 	producer := msgQueue.NewRabbitMqProducer(admin, "default")
@@ -78,7 +78,7 @@ func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStat
 	return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply)
 }
 
-//GetStatus 获取设备状态
+// GetStatus 获取设备状态
 func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
 	rpchost, err := getAccessRPCHost(args.Id)
 	if err != nil {
@@ -88,7 +88,7 @@ func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStat
 	return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
 }
 
-//Online 设备上线
+// Online 设备上线
 func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
 	data := gjson.New(nil)
 	_ = data.Set("device_id", args.Id)
@@ -117,7 +117,7 @@ func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResul
 	return c.producer.Send(tpi, g, nil)
 }
 
-//Offline 设备下线
+// Offline 设备下线
 func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
 	if args.Id == "" || args.VendorId == "" {
 		return nil
@@ -149,7 +149,7 @@ func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResu
 	return c.producer.Send(tpi, g, nil)
 }
 
-//OnStatus 状态上报消息处理
+// OnStatus 状态上报消息处理
 func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
 	t := time.Unix(int64(args.Timestamp/1000), 0)
 	data, err := c.processStatusToQueue(args)
@@ -166,6 +166,7 @@ func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus)
 			"vendor_id":     args.VendorId,
 			"device_id":     args.DeviceId,
 			"sub_device_id": args.SubDeviceId,
+			"timestamp":     args.Timestamp,
 		},
 		Originator: "device",
 	}
@@ -210,7 +211,7 @@ func (c *Controller) processEventToQueue(args rpcs.ArgsOnEvent) (string, error)
 	return result.MustToJsonString(), nil
 }
 
-//OnEvent 事件消息处理
+// OnEvent 事件消息处理
 func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
 	t := time.Unix(int64(args.TimeStamp/1000), 0)
 	data, err := c.processEventToQueue(args)
@@ -282,7 +283,7 @@ func (c *Controller) UpdateRuleChain(args rpcs.ArgsRuleChainAct, reply *rpcs.Rep
 	return nil
 }
 
-//SendCommand 下发设备控制指令
+// SendCommand 下发设备控制指令
 func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
 	rpchost, err := getAccessRPCHost(args.DeviceId)
 	if err != nil {
@@ -304,7 +305,7 @@ func getAccessRPCHost(deviceid string) (string, error) {
 	return reply.AccessRPCHost, nil
 }
 
-//ActorSystem actor system
+// ActorSystem actor system
 type ActorSystem struct {
 	rootActor ruleEngine.Ref
 }