controller.go 8.2 KB


  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/os/grpool"
  6. "github.com/gogf/gf/util/guid"
  7. "sparrow/pkg/actors"
  8. "sparrow/pkg/klink"
  9. "sparrow/pkg/protocol"
  10. "sparrow/pkg/queue"
  11. "sparrow/pkg/queue/msgQueue"
  12. "sparrow/pkg/rpcs"
  13. "sparrow/pkg/rule"
  14. "sparrow/pkg/ruleEngine"
  15. "sparrow/pkg/server"
  16. "time"
  17. )
  18. type Controller struct {
  19. producer queue.QueueProducer
  20. timer *rule.Timer
  21. ift *rule.Ifttt
  22. actorContext *ruleEngine.SystemContext
  23. consumer queue.QueueConsumer
  24. cluster *ClusterService
  25. pool *grpool.Pool
  26. }
  27. func NewController(rabbithost string) (*Controller, error) {
  28. admin := msgQueue.NewRabbitMessageQueueAdmin(&msgQueue.RabbitMqSettings{Host: rabbithost}, nil)
  29. producer := msgQueue.NewRabbitMqProducer(admin, "default")
  30. consumer := msgQueue.NewRabbitConsumer(admin, "MAIN")
  31. tp := make([]*queue.TopicPartitionInfo, 0)
  32. tp = append(tp, &queue.TopicPartitionInfo{
  33. Topic: "MAIN",
  34. TenantId: "1ps9djpswi0cds7cofynkso300eql4iu",
  35. Partition: 0,
  36. MyPartition: false,
  37. })
  38. tp = append(tp, &queue.TopicPartitionInfo{
  39. Topic: "MAIN",
  40. TenantId: "1ps9djpswi0cds7cofynkso300eql4sw",
  41. Partition: 0,
  42. MyPartition: false,
  43. })
  44. _ = consumer.SubscribeWithPartitions(tp)
  45. if err := producer.Init(); err != nil {
  46. return nil, err
  47. }
  48. return &Controller{
  49. producer: producer,
  50. consumer: consumer,
  51. cluster: &ClusterService{producer: producer},
  52. pool: grpool.New(),
  53. }, nil
  54. }
  55. func (c *Controller) SetStatus(args rpcs.ArgsSetStatus, reply *rpcs.ReplySetStatus) error {
  56. rpchost, err := getAccessRPCHost(args.DeviceId)
  57. if err != nil {
  58. return err
  59. }
  60. return server.RPCCallByHost(rpchost, "Access.SetStatus", args, reply)
  61. }
  62. func (c *Controller) GetStatus(args rpcs.ArgsGetStatus, reply *rpcs.ReplyGetStatus) error {
  63. rpchost, err := getAccessRPCHost(args.Id)
  64. if err != nil {
  65. return err
  66. }
  67. return server.RPCCallByHost(rpchost, "Access.GetStatus", args, reply)
  68. }
  69. func (c *Controller) Online(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
  70. data := gjson.New(nil)
  71. _ = data.Set("device_id", args.Id)
  72. t := time.Now()
  73. msg := &protocol.Message{
  74. Id: guid.S(),
  75. Ts: &t,
  76. Type: protocol.CONNECT_EVENT,
  77. Data: data.MustToJsonString(),
  78. Callback: nil,
  79. MetaData: map[string]interface{}{
  80. "device_id": args.Id,
  81. "vendor_id": args.VendorId,
  82. },
  83. Originator: "device",
  84. }
  85. tpi := queue.ResolvePartition("RULE_ENGINE",
  86. msg.GetQueueName(),
  87. args.VendorId,
  88. args.Id)
  89. g, err := queue.NewGobQueueMessage(msg)
  90. if err != nil {
  91. return err
  92. }
  93. g.Headers.Put("tenant_id", []byte(args.VendorId))
  94. return c.producer.Send(tpi, g, nil)
  95. }
  96. func (c *Controller) Offline(args rpcs.ArgsGetStatus, reply *rpcs.ReplyEmptyResult) error {
  97. if args.Id == "" || args.VendorId == "" {
  98. return nil
  99. }
  100. data := gjson.New(nil)
  101. _ = data.Set("device_id", args.Id)
  102. t := time.Now()
  103. msg := &protocol.Message{
  104. Id: guid.S(),
  105. Ts: &t,
  106. Type: protocol.DISCONNECT_EVENT,
  107. Data: data.MustToJsonString(),
  108. Callback: nil,
  109. MetaData: map[string]interface{}{
  110. "device_id": args.Id,
  111. "vendor_id": args.VendorId,
  112. },
  113. Originator: "device",
  114. }
  115. tpi := queue.ResolvePartition("RULE_ENGINE",
  116. msg.GetQueueName(),
  117. args.VendorId,
  118. args.Id)
  119. g, err := queue.NewGobQueueMessage(msg)
  120. if err != nil {
  121. return err
  122. }
  123. g.Headers.Put("tenant_id", []byte(args.VendorId))
  124. return c.producer.Send(tpi, g, nil)
  125. }
  126. func (c *Controller) OnStatus(args rpcs.ArgsOnStatus, reply *rpcs.ReplyOnStatus) error {
  127. t := time.Unix(int64(args.Timestamp/1000), 0)
  128. data, err := c.processStatusToQueue(args)
  129. if err != nil {
  130. return err
  131. }
  132. msg := &protocol.Message{
  133. Id: guid.S(),
  134. Ts: &t,
  135. Type: protocol.POST_ATTRIBUTES_REQUEST,
  136. Data: data,
  137. Callback: nil,
  138. MetaData: map[string]interface{}{
  139. "tenant_id": args.VendorId,
  140. "device_id": args.DeviceId,
  141. "sub_device_id": args.SubDeviceId,
  142. },
  143. Originator: "device",
  144. }
  145. tpi := queue.ResolvePartition("RULE_ENGINE",
  146. msg.GetQueueName(),
  147. args.VendorId,
  148. args.DeviceId)
  149. g, err := queue.NewGobQueueMessage(msg)
  150. if err != nil {
  151. return err
  152. }
  153. g.Headers.Put("tenant_id", []byte(args.VendorId))
  154. return c.producer.Send(tpi, g, nil)
  155. }
  156. func (c *Controller) processStatusToQueue(args rpcs.ArgsOnStatus) (string, error) {
  157. result := gjson.New(nil)
  158. j, err := gjson.DecodeToJson(args.SubData)
  159. if err != nil {
  160. return "", err
  161. }
  162. switch args.Action {
  163. case klink.DevSendAction:
  164. params := j.GetMap("params")
  165. if err = result.Set(j.GetString("cmd"), params); err != nil {
  166. return "", err
  167. }
  168. }
  169. return result.MustToJsonString(), nil
  170. }
  171. func (c *Controller) processEventToQueue(args rpcs.ArgsOnEvent) (string, error) {
  172. result := gjson.New(nil)
  173. j, err := gjson.DecodeToJson(args.SubData)
  174. if err != nil {
  175. return "", nil
  176. }
  177. params := j.GetMap("params")
  178. if err = result.Set(j.GetString("cmd"), params); err != nil {
  179. return "", err
  180. }
  181. return result.MustToJsonString(), nil
  182. }
  183. func (c *Controller) OnEvent(args rpcs.ArgsOnEvent, reply *rpcs.ReplyOnEvent) error {
  184. t := time.Unix(int64(args.TimeStamp/1000), 0)
  185. data, err := c.processEventToQueue(args)
  186. if err != nil {
  187. return err
  188. }
  189. msg := &protocol.Message{
  190. Id: guid.S(),
  191. Ts: &t,
  192. Type: protocol.POST_EVENT_REQUEST,
  193. Data: data,
  194. Callback: nil,
  195. MetaData: map[string]interface{}{
  196. "tenant_id": args.VendorId,
  197. "device_id": args.DeviceId,
  198. "sub_device_id": args.SubDeviceId,
  199. },
  200. Originator: "device",
  201. }
  202. tpi := queue.ResolvePartition("RULE_ENGINE",
  203. msg.GetQueueName(),
  204. args.VendorId,
  205. args.DeviceId)
  206. g, err := queue.NewGobQueueMessage(msg)
  207. if err != nil {
  208. return err
  209. }
  210. g.Headers.Put("tenant_id", []byte(args.VendorId))
  211. return c.producer.Send(tpi, g, nil)
  212. }
  213. func (c *Controller) SendCommand(args rpcs.ArgsSendCommand, reply *rpcs.ReplySendCommand) error {
  214. rpchost, err := getAccessRPCHost(args.DeviceId)
  215. if err != nil {
  216. return err
  217. }
  218. return server.RPCCallByHost(rpchost, "Access.SendCommand", args, reply)
  219. }
  220. func getAccessRPCHost(deviceid string) (string, error) {
  221. args := rpcs.ArgsGetDeviceOnlineStatus{
  222. Id: deviceid,
  223. }
  224. reply := &rpcs.ReplyGetDeviceOnlineStatus{}
  225. err := server.RPCCallByName(nil, rpcs.DeviceManagerName, "DeviceManager.GetDeviceOnlineStatus", args, reply)
  226. if err != nil {
  227. return "", err
  228. }
  229. return reply.AccessRPCHost, nil
  230. }
  231. type ActorSystem struct {
  232. rootActor ruleEngine.Ref
  233. }
  234. // 初始化actor system
  235. func (c *Controller) initActorSystem() (*ActorSystem, error) {
  236. system := ruleEngine.NewDefaultActorSystem(&ruleEngine.DefaultActorSystemConfig{
  237. SchedulerPoolSize: 5,
  238. AppDispatcherPoolSize: 4,
  239. TenantDispatcherPoolSize: 4,
  240. RuleEngineDispatcherPoolSize: 4,
  241. })
  242. _ = system.CreateDispatcher(ruleEngine.APP_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0))
  243. _ = system.CreateDispatcher(ruleEngine.TENANT_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0))
  244. _ = system.CreateDispatcher(ruleEngine.RULE_DISPATCHER_NAME, ruleEngine.NewPoolDispatcher(0))
  245. // init services
  246. tenantService := &TenantService{}
  247. ruleChainService := &RuleChainService{}
  248. actorContext := ruleEngine.NewSystemContext(system, ruleEngine.SystemContextServiceConfig{
  249. ClusterService: c.cluster,
  250. RuleChainService: ruleChainService,
  251. TenantService: tenantService,
  252. EventService: NewEventService(),
  253. })
  254. appActor, err := system.CreateRootActor(ruleEngine.APP_DISPATCHER_NAME,
  255. actors.NewAppActorCreator(actorContext))
  256. if err != nil {
  257. return nil, err
  258. }
  259. actorContext.AppActor = appActor
  260. server.Log.Debugln("actor system initialized")
  261. time.Sleep(time.Second * 1)
  262. appActor.Tell(&ruleEngine.AppInitMsg{})
  263. c.actorContext = actorContext
  264. return &ActorSystem{rootActor: appActor}, nil
  265. }
  266. // 启动mq consumers
  267. func (c *Controller) launchConsumer() {
  268. msgs, err := c.consumer.Pop(100 * time.Millisecond)
  269. if err != nil {
  270. server.Log.Error(err)
  271. }
  272. for {
  273. select {
  274. case msg := <-msgs:
  275. ruleEngineMsg := &protocol.Message{}
  276. if err := ruleEngineMsg.Decode(msg.GetData()); err != nil {
  277. fmt.Println("解析消息失败")
  278. }
  279. tanantId := msg.GetHeaders().Get("tenant_id")
  280. if c.actorContext != nil {
  281. c.actorContext.Tell(&ruleEngine.QueueToRuleEngineMsg{
  282. TenantId: string(tanantId),
  283. Message: ruleEngineMsg,
  284. })
  285. }
  286. }
  287. }
  288. }