package main import ( "encoding/json" "fmt" "github.com/gogf/gf/os/grpool" "github.com/gogf/gf/util/guid" "sparrow/pkg/actors" "sparrow/pkg/models" "sparrow/pkg/productconfig" "sparrow/pkg/protocol" "sparrow/pkg/queue" "sparrow/pkg/queue/msgQueue" "sparrow/pkg/rpcs" "sparrow/pkg/rule" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "time" ) type Controller struct { producer queue.QueueProducer timer *rule.Timer ift *rule.Ifttt actorContext *ruleEngine.SystemContext consumer queue.QueueConsumer cluster *ClusterService pool *grpool.Pool } func NewController(rabbithost string) (*Controller, error) { admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil) producer := msgQueue.NewRabbitMqProducer(admin, "default") consumer := msgQueue.NewRabbitConsumer(admin, "MAIN") _ = consumer.Subscribe() // timer t := rule.NewTimer() t.Run() // ifttt ttt := rule.NewIfttt() if err := producer.Init(); err != nil { return nil, err } return &Controller{ producer: producer, timer: t, ift: ttt, consumer: consumer, cluster: &ClusterService{producer: producer}, pool: grpool.New(), }, nil } func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error { rpchost, err := getAccessRPCHost(args.DeviceId) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply) } func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error { rpchost, err := getAccessRPCHost(args.Id) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply) } func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error { t := time.Unix(int64(args.Timestamp/1000), 0) data, err := c.processStatusToQueue(args) if err != nil { return err } msg := &protocol.Message{ Id: guid.S(), Ts: &t, Type: protocol.POST_ATTRIBUTES_REQUEST, Data: data, Callback: nil, MetaData: map[string]interface{}{ "tenant_id": args.VendorId, "device_id": args.DeviceId, }, Originator: "device", } tpi := queue.ResolvePartition("RULE_ENGINE", msg.GetQueueName(), args.VendorId, args.DeviceId) g, err := queue.NewGobQueueMessage(msg) if err != nil { return err } return c.producer.Send(tpi, g, nil) } func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error) { var result string device := &models.Device{} err := server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindDeviceByRecordId", &rpcs.ArgsDeviceAuth{DeviceID: args.DeviceId}, device) if err != nil { server.Log.Errorf("find device error : %v", err) return result, err } product := &models.Product{} err = server.RPCCallByName(nil, rpcs.RegistryServerName, "Registry.FindProduct", &device.ProductID, product) if err != nil { server.Log.Errorf("find product error : %v", err) return result, err } pc, err := productconfig.New(product.ProductConfig) if err != nil { server.Log.Errorf("product config error : %v", err) return result, err } ev := &protocol.Data{} ev.SubData = args.Subdata m, err := pc.StatusToMap(ev.SubData) if err != nil { server.Log.Errorf("gen status json error : %v", err) return result, err } b, err := json.Marshal(&m) if err != nil { server.Log.Errorf("marshal json error : %v", err) } result = string(b) return result, nil } func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error { go func() { err := c.ift.Check(args.DeviceId, args.No) if err != nil { server.Log.Warnf("perform ifttt rules error : %v", err) } }() return nil } func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error { rpchost, err := getAccessRPCHost(args.DeviceId) if err != nil { return err } return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply) } func getAccessRPCHost(deviceid string) (string, error) { args := rpcs.ArgsGetDeviceOnlineStatus{ Id: deviceid, } reply := &rpcs.ReplyGetDeviceOnlineStatus{} err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply) if err != nil { return "", err } return reply.AccessRPCHost, nil } type ActorSystem struct { rootActor ruleEngine.Ref } // 初始化actor system func (c *Controller) initActorSystem() (*ActorSystem, error) { system := ruleEngine.NewDefaultActorSystem(&ruleEngine.DefaultActorSystemConfig{ SchedulerPoolSize: 5, AppDispatcherPoolSize: 4, TenantDispatcherPoolSize: 4, RuleEngineDispatcherPoolSize: 4, }) _ = system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(5)) _ = system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4)) _ = system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(4)) // init services tenantService := &ruleEngine.TestTenantService{} ruleChainService := &ruleEngine.TestRuleChainService{} actorContext := ruleEngine.NewSystemContext(system, ruleEngine.SystemContextServiceConfig{ ClusterService: c.cluster, RuleChainService: ruleChainService, TenantService: tenantService, EventService: NewEventService(), }) appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME, actors.NewAppActorCreator(actorContext)) if err != nil { return nil, err } actorContext.AppActor = appActor server.Log.Debugln("actor system initialized") time.Sleep(time.Second * 1) appActor.Tell(&ruleEngine.AppInitMsg{}) c.actorContext = actorContext return &ActorSystem{rootActor: appActor}, nil } // 启动mq consumers func (c *Controller) launchConsumer() { msgs, err := c.consumer.Pop(100 * time.Millisecond) if err != nil { server.Log.Error(err) } for { select { case msg := <-msgs: ruleEngineMsg := &protocol.Message{} if err := ruleEngineMsg.Decode(msg.GetData()); err != nil { fmt.Println("解析消息失败") } if c.actorContext != nil { c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{ TenantId: "tenant_1", Message: ruleEngineMsg, }) } } } }