|
@@ -6,6 +6,7 @@ import (
|
|
|
"github.com/gogf/gf/os/grpool"
|
|
|
"github.com/gogf/gf/util/guid"
|
|
|
"sparrow/pkg/actors"
|
|
|
+ "sparrow/pkg/entities"
|
|
|
"sparrow/pkg/klink"
|
|
|
"sparrow/pkg/protocol"
|
|
|
"sparrow/pkg/queue"
|
|
@@ -17,6 +18,7 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+//Controller 消息控制器
|
|
|
type Controller struct {
|
|
|
producer queue.QueueProducer
|
|
|
timer *rule.Timer
|
|
@@ -27,6 +29,7 @@ type Controller struct {
|
|
|
pool *grpool.Pool
|
|
|
}
|
|
|
|
|
|
+//NewController 新建消息控制器
|
|
|
func NewController(rabbithost string) (*Controller, error) {
|
|
|
admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
|
|
|
producer := msgQueue.NewRabbitMqProducer(admin, "default")
|
|
@@ -58,6 +61,7 @@ func NewController(rabbithost string) (*Controller, error) {
|
|
|
}, nil
|
|
|
}
|
|
|
|
|
|
+// SetStatus 设置设备状态
|
|
|
func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
|
|
|
rpchost, err := getAccessRPCHost(args.DeviceId)
|
|
|
if err != nil {
|
|
@@ -67,6 +71,7 @@ func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStat
|
|
|
return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply)
|
|
|
}
|
|
|
|
|
|
+//GetStatus 获取设备状态
|
|
|
func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
|
|
|
rpchost, err := getAccessRPCHost(args.Id)
|
|
|
if err != nil {
|
|
@@ -76,6 +81,7 @@ func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStat
|
|
|
return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
|
|
|
}
|
|
|
|
|
|
+//Online 设备上线
|
|
|
func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
|
|
|
data := gjson.New(nil)
|
|
|
_ = data.Set("device_id", args.Id)
|
|
@@ -104,6 +110,7 @@ func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResul
|
|
|
return c.producer.Send(tpi, g, nil)
|
|
|
}
|
|
|
|
|
|
+//Offline 设备下线
|
|
|
func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
|
|
|
if args.Id == "" || args.VendorId == "" {
|
|
|
return nil
|
|
@@ -135,6 +142,7 @@ func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResu
|
|
|
return c.producer.Send(tpi, g, nil)
|
|
|
}
|
|
|
|
|
|
+//OnStatus 状态上报消息处理
|
|
|
func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
|
|
|
t := time.Unix(int64(args.Timestamp/1000), 0)
|
|
|
data, err := c.processStatusToQueue(args)
|
|
@@ -195,6 +203,7 @@ func (c *Controller) processEventToQueue(args rpcs.ArgsOnEvent) (string, error)
|
|
|
return result.MustToJsonString(), nil
|
|
|
}
|
|
|
|
|
|
+//OnEvent 事件消息处理
|
|
|
func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
|
|
|
t := time.Unix(int64(args.TimeStamp/1000), 0)
|
|
|
data, err := c.processEventToQueue(args)
|
|
@@ -226,6 +235,47 @@ func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) er
|
|
|
return c.producer.Send(tpi, g, nil)
|
|
|
}
|
|
|
|
|
|
+// OnCreateRuleChain 规则链生命周期-创建
|
|
|
+func (c *Controller) CreateRuleChain(args rpcs.ArgsRuleChainAct, reply *rpcs.ReplyEmptyResult) error {
|
|
|
+ if c.actorContext != nil {
|
|
|
+ msg := &ruleEngine.ComponentLifecycleMsg{
|
|
|
+ TenantId: args.VendorId,
|
|
|
+ EntityId: &entities.RuleChainId{Id: args.RuleChainId},
|
|
|
+ EventType: ruleEngine.CREATED,
|
|
|
+ }
|
|
|
+ c.actorContext.AppActor.TellWithHighPriority(msg)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// DeleteRuleChain 规则链生命周期-删除
|
|
|
+func (c *Controller) DeleteRuleChain(args rpcs.ArgsRuleChainAct, reply *rpcs.ReplyEmptyResult) error {
|
|
|
+ if c.actorContext != nil {
|
|
|
+ msg := &ruleEngine.ComponentLifecycleMsg{
|
|
|
+ TenantId: args.VendorId,
|
|
|
+ EntityId: &entities.RuleChainId{Id: args.RuleChainId},
|
|
|
+ EventType: ruleEngine.DELETED,
|
|
|
+ }
|
|
|
+ c.actorContext.AppActor.TellWithHighPriority(msg)
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// UpdateRuleChain 规则链生命周期-更新
|
|
|
+func (c *Controller) UpdateRuleChain(args rpcs.ArgsRuleChainAct, reply *rpcs.ReplyEmptyResult) error {
|
|
|
+ if c.actorContext != nil {
|
|
|
+ msg := &ruleEngine.ComponentLifecycleMsg{
|
|
|
+ TenantId: args.VendorId,
|
|
|
+ EntityId: &entities.RuleChainId{Id: args.RuleChainId},
|
|
|
+ EventType: ruleEngine.UPDATED,
|
|
|
+ }
|
|
|
+ c.actorContext.AppActor.TellWithHighPriority(msg)
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+//SendCommand 下发设备控制指令
|
|
|
func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
|
|
|
rpchost, err := getAccessRPCHost(args.DeviceId)
|
|
|
if err != nil {
|
|
@@ -247,6 +297,7 @@ func getAccessRPCHost(deviceid string) (string, error) {
|
|
|
return reply.AccessRPCHost, nil
|
|
|
}
|
|
|
|
|
|
+//ActorSystem actor system
|
|
|
type ActorSystem struct {
|
|
|
rootActor ruleEngine.Ref
|
|
|
}
|