server.go 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/encoding/gjson"
  5. "github.com/gogf/gf/os/gcache"
  6. "sparrow/pkg/rpcs"
  7. "sparrow/pkg/server"
  8. "sync"
  9. "time"
  10. )
  11. type Server interface {
  12. // SaveStatus 存储设备状态
  13. SaveStatus(deviceCode string, data *gjson.Json) error
  14. // GetStatus 获取设备状态
  15. GetStatus(deviceCode string) (*gjson.Json, error)
  16. // SaveCommand 保存指令
  17. SaveCommand(deviceCode string, command *gjson.Json) error
  18. // GetCommands 获取指令列表
  19. GetCommands(deviceCode string) ([]map[string]interface{}, error)
  20. // DeleteShadow 删除设备影子
  21. DeleteShadow(deviceCode string) error
  22. }
  23. type ShadowServer struct {
  24. mLock sync.RWMutex
  25. storeService Server
  26. }
  27. func NewShadowServer(storeService Server) *ShadowServer {
  28. return &ShadowServer{
  29. storeService: storeService,
  30. }
  31. }
  32. // DeviceReport 设备上报状态
  33. func (a *ShadowServer) DeviceReport(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error {
  34. server.Log.Debugf("收到设备上报[%s], data:%s", args.DeviceCode, args.Data)
  35. a.mLock.RLock()
  36. defer a.mLock.RUnlock()
  37. return a.storeService.SaveStatus(args.DeviceCode, gjson.New(args.Data))
  38. }
  39. // GetDeviceDesired 获取设备期望状态
  40. func (a *ShadowServer) GetDeviceDesired(deviceId string, reply *rpcs.DeviceDesiredReply) error {
  41. a.mLock.RLock()
  42. defer a.mLock.RUnlock()
  43. j, err := a.storeService.GetStatus(deviceId)
  44. if err != nil {
  45. return err
  46. }
  47. reply.Data = j.GetJson("status.desired").MustToJson()
  48. return nil
  49. }
  50. // GetDeviceReported 获取设备上报的状态
  51. func (a *ShadowServer) GetDeviceReported(deviceId string, reply *gjson.Json) error {
  52. a.mLock.RLock()
  53. defer a.mLock.RUnlock()
  54. j, err := a.storeService.GetStatus(deviceId)
  55. if err != nil {
  56. return err
  57. }
  58. reply = j.GetJson("status.reported")
  59. return nil
  60. }
  61. // UpdateDesired 更新期望值
  62. func (a *ShadowServer) UpdateDesired(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error {
  63. a.mLock.Lock()
  64. defer a.mLock.Unlock()
  65. j, err := a.storeService.GetStatus(args.DeviceCode)
  66. if err != nil {
  67. return err
  68. }
  69. newJ := gjson.New(args.Data)
  70. for k, v := range newJ.Map() {
  71. if err = j.Set(fmt.Sprintf("status.desired.%s", k), v); err != nil {
  72. continue
  73. }
  74. }
  75. return nil
  76. }
  77. type shadowStoreService struct {
  78. store *gcache.Cache
  79. }
  80. func NewShadowStoreService() Server {
  81. return &shadowStoreService{store: gcache.New()}
  82. }
  83. func (s *shadowStoreService) SaveStatus(deviceCode string, data *gjson.Json) error {
  84. doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
  85. d := new(ShadowDoc)
  86. d.DeviceCode = deviceCode
  87. d.Timestamp = time.Now().Unix()
  88. d.Version = 1
  89. return gjson.New(d), nil
  90. }, 0)
  91. if err != nil {
  92. return err
  93. }
  94. if j, ok := doc.(*gjson.Json); ok {
  95. for k, v := range data.Map() {
  96. if err := j.Set(fmt.Sprintf("status.reported.%s", k), v); err != nil {
  97. continue
  98. }
  99. }
  100. }
  101. return nil
  102. }
  103. func (s *shadowStoreService) GetStatus(deviceCode string) (*gjson.Json, error) {
  104. doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
  105. d := new(ShadowDoc)
  106. d.DeviceCode = deviceCode
  107. return gjson.New(d), nil
  108. }, 0)
  109. return doc.(*gjson.Json), err
  110. }
  111. func (s *shadowStoreService) SaveCommand(deviceCode string, command *gjson.Json) error {
  112. doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
  113. d := new(ShadowDoc)
  114. d.DeviceCode = deviceCode
  115. return gjson.New(d), nil
  116. }, 0)
  117. if err != nil {
  118. return err
  119. }
  120. if j, ok := doc.(*gjson.Json); ok {
  121. commands := j.GetMaps("commands")
  122. commands = append(commands, command.Map())
  123. j.Set("commands", commands)
  124. }
  125. return nil
  126. }
  127. func (s *shadowStoreService) GetCommands(deviceCode string) ([]map[string]interface{}, error) {
  128. doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
  129. d := new(ShadowDoc)
  130. d.DeviceCode = deviceCode
  131. return gjson.New(d), nil
  132. }, 0)
  133. if err != nil {
  134. return nil, err
  135. }
  136. if j, ok := doc.(*gjson.Json); ok {
  137. return j.GetMaps("commands"), nil
  138. }
  139. return nil, nil
  140. }
  141. func (s *shadowStoreService) DeleteShadow(deviceCode string) error {
  142. //TODO implement me
  143. panic("implement me")
  144. }