package main import ( "fmt" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/os/grpool" "github.com/gogf/gf/util/guid" "sparrow/pkg/actors" "sparrow/pkg/klink" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/queue/msgQueue" "sparrow/pkg/rpcs" "sparrow/pkg/rule" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "time" ) type Controller struct { producer queue.QueueProducer timer *rule.Timer ift *rule.Ifttt actorContext *ruleEngine.SystemContext consumer queue.QueueConsumer cluster *ClusterService pool *grpool.Pool } func NewController(rabbithost string) (*Controller, error) { admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil) producer := msgQueue.NewRabbitMqProducer(admin, "default") consumer := msgQueue.NewRabbitConsumer(admin, "MAIN") tp := make([]*queue.TopicPartitionInfo, 0) tp = append(tp, &queue.TopicPartitionInfo{ Topic: "MAIN", TenantId: "1ps9djpswi0cds7cofynkso300eql4iu", Partition: 0, MyPartition: false, }) tp = append(tp, &queue.TopicPartitionInfo{ Topic: "MAIN", TenantId: "1ps9djpswi0cds7cofynkso300eql4sw", Partition: 0, MyPartition: false, }) _ = consumer.SubscribeWithPartitions(tp) if err := producer.Init(); err != nil { return nil, err } return &Controller{ producer: producer, consumer: consumer, cluster: &ClusterService{producer: producer}, pool: grpool.New(), }, nil } func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error { rpchost, err := getAccessRPCHost(args.DeviceId) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply) } func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error { rpchost, err := getAccessRPCHost(args.Id) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply) } func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error { data := gjson.New(nil) _ = data.Set("device_id", args.Id) t := time.Now() msg := &protocol.Message{ Id: guid.S(), Ts: &t, Type: protocol.CONNECT_EVENT, Data: data.MustToJsonString(), Callback: nil, MetaData: map[string]interface{}{ "device_id": args.Id, "vendor_id": args.VendorId, }, Originator: "device", } tpi := queue.ResolvePartition("RULE_ENGINE", msg.GetQueueName(), args.VendorId, args.Id) g, err := queue.NewGobQueueMessage(msg) if err != nil { return err } g.Headers.Put("tenant_id", []byte(args.VendorId)) return c.producer.Send(tpi, g, nil) } func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error { data := gjson.New(nil) _ = data.Set("device_id", args.Id) t := time.Now() msg := &protocol.Message{ Id: guid.S(), Ts: &t, Type: protocol.DISCONNECT_EVENT, Data: data.MustToJsonString(), Callback: nil, MetaData: map[string]interface{}{ "device_id": args.Id, "vendor_id": args.VendorId, }, Originator: "device", } tpi := queue.ResolvePartition("RULE_ENGINE", msg.GetQueueName(), args.VendorId, args.Id) g, err := queue.NewGobQueueMessage(msg) if err != nil { return err } g.Headers.Put("tenant_id", []byte(args.VendorId)) return c.producer.Send(tpi, g, nil) } func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error { t := time.Unix(int64(args.Timestamp/1000), 0) data, err := c.processStatusToQueue(args) if err != nil { return err } msg := &protocol.Message{ Id: guid.S(), Ts: &t, Type: protocol.POST_ATTRIBUTES_REQUEST, Data: data, Callback: nil, MetaData: map[string]interface{}{ "tenant_id": args.VendorId, "device_id": args.DeviceId, "sub_device_id": args.SubDeviceId, }, Originator: "device", } tpi := queue.ResolvePartition("RULE_ENGINE", msg.GetQueueName(), args.VendorId, args.DeviceId) g, err := queue.NewGobQueueMessage(msg) if err != nil { return err } g.Headers.Put("tenant_id", []byte(args.VendorId)) return c.producer.Send(tpi, g, nil) } func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error) { result := gjson.New(nil) j, err := gjson.DecodeToJson(args.SubData) if err != nil { return "", err } switch args.Action { case klink.DevSendAction: params := j.GetMap("params") if err = result.Set(j.GetString("cmd"), params); err != nil { return "", err } } return result.MustToJsonString(), nil } func (c *Controller) processEventToQueue(args rpcs.ArgsOnEvent) (string, error) { result := gjson.New(nil) j, err := gjson.DecodeToJson(args.SubData) if err != nil { return "", nil } params := j.GetMap("params") if err = result.Set(j.GetString("cmd"), params); err != nil { return "", err } return result.MustToJsonString(), nil } func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error { t := time.Unix(int64(args.TimeStamp/1000), 0) data, err := c.processEventToQueue(args) if err != nil { return err } msg := &protocol.Message{ Id: guid.S(), Ts: &t, Type: protocol.POST_EVENT_REQUEST, Data: data, Callback: nil, MetaData: map[string]interface{}{ "tenant_id": args.VendorId, "device_id": args.DeviceId, "sub_device_id": args.SubDeviceId, }, Originator: "device", } tpi := queue.ResolvePartition("RULE_ENGINE", msg.GetQueueName(), args.VendorId, args.DeviceId) g, err := queue.NewGobQueueMessage(msg) if err != nil { return err } g.Headers.Put("tenant_id", []byte(args.VendorId)) return c.producer.Send(tpi, g, nil) } func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error { rpchost, err := getAccessRPCHost(args.DeviceId) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil } type ActorSystem struct { rootActor ruleEngine.Ref } // 初始化actor system func (c *Controller) initActorSystem() (*ActorSystem, error) { system := ruleEngine.NewDefaultActorSystem(&ruleEngine.DefaultActorSystemConfig{ SchedulerPoolSize: 5, AppDispatcherPoolSize: 4, TenantDispatcherPoolSize: 4, RuleEngineDispatcherPoolSize: 4, }) _ = system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0)) _ = system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0)) _ = system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0)) // init services tenantService := &TenantService{} ruleChainService := &RuleChainService{} actorContext := ruleEngine.NewSystemContext(system, ruleEngine.SystemContextServiceConfig{ ClusterService: c.cluster, RuleChainService: ruleChainService, TenantService: tenantService, EventService: NewEventService(), }) appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME, actors.NewAppActorCreator(actorContext)) if err != nil { return nil, err } actorContext.AppActor = appActor server.Log.Debugln("actor system initialized") time.Sleep(time.Second * 1) appActor.Tell(&ruleEngine.AppInitMsg{}) c.actorContext = actorContext return &ActorSystem{rootActor: appActor}, nil } // 启动mq consumers func (c *Controller) launchConsumer() { msgs, err := c.consumer.Pop(100 * time.Millisecond) if err != nil { server.Log.Error(err) } for { select { case msg := <-msgs: ruleEngineMsg := &protocol.Message{} if err := ruleEngineMsg.Decode(msg.GetData()); err != nil { fmt.Println("解析消息失败") } tanantId := msg.GetHeaders().Get("tenant_id") if c.actorContext != nil { c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{ TenantId: string(tanantId), Message: ruleEngineMsg, }) } } } }