package main import ( "fmt" "github.com/gogf/gf/encoding/gjson" "github.com/gogf/gf/os/gcache" "sparrow/pkg/rpcs" "sparrow/pkg/server" "sync" "time" ) type Server interface { // SaveStatus 存储设备状态 SaveStatus(deviceCode string, data *gjson.Json) error // GetStatus 获取设备状态 GetStatus(deviceCode string) (*gjson.Json, error) // SaveCommand 保存指令 SaveCommand(deviceCode string, command *gjson.Json) error // GetCommands 获取指令列表 GetCommands(deviceCode string) ([]map[string]interface{}, error) // DeleteShadow 删除设备影子 DeleteShadow(deviceCode string) error } type ShadowServer struct { mLock sync.RWMutex storeService Server } func NewShadowServer(storeService Server) *ShadowServer { return &ShadowServer{ storeService: storeService, } } // DeviceReport 设备上报状态 func (a *ShadowServer) DeviceReport(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error { server.Log.Debugf("收到设备上报[%s], data:%s", args.DeviceCode, args.Data) a.mLock.RLock() defer a.mLock.RUnlock() return a.storeService.SaveStatus(args.DeviceCode, gjson.New(args.Data)) } // GetDeviceDesired 获取设备期望状态 func (a *ShadowServer) GetDeviceDesired(deviceId string, reply *rpcs.DeviceDesiredReply) error { a.mLock.RLock() defer a.mLock.RUnlock() j, err := a.storeService.GetStatus(deviceId) if err != nil { return err } reply.Data = j.GetJson("status.desired").MustToJson() return nil } // GetDeviceReported 获取设备上报的状态 func (a *ShadowServer) GetDeviceReported(deviceId string, reply *gjson.Json) error { a.mLock.RLock() defer a.mLock.RUnlock() j, err := a.storeService.GetStatus(deviceId) if err != nil { return err } reply = j.GetJson("status.reported") return nil } // UpdateDesired 更新期望值 func (a *ShadowServer) UpdateDesired(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error { a.mLock.Lock() defer a.mLock.Unlock() j, err := a.storeService.GetStatus(args.DeviceCode) if err != nil { return err } newJ := gjson.New(args.Data) for k, v := range newJ.Map() { if err = j.Set(fmt.Sprintf("status.desired.%s", k), v); err != nil { continue } } return nil } type shadowStoreService struct { store *gcache.Cache } func NewShadowStoreService() Server { return &shadowStoreService{store: gcache.New()} } func (s *shadowStoreService) SaveStatus(deviceCode string, data *gjson.Json) error { doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) { d := new(ShadowDoc) d.DeviceCode = deviceCode d.Timestamp = time.Now().Unix() d.Version = 1 return gjson.New(d), nil }, 0) if err != nil { return err } if j, ok := doc.(*gjson.Json); ok { for k, v := range data.Map() { if err := j.Set(fmt.Sprintf("status.reported.%s", k), v); err != nil { continue } } } return nil } func (s *shadowStoreService) GetStatus(deviceCode string) (*gjson.Json, error) { doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) { d := new(ShadowDoc) d.DeviceCode = deviceCode return gjson.New(d), nil }, 0) return doc.(*gjson.Json), err } func (s *shadowStoreService) SaveCommand(deviceCode string, command *gjson.Json) error { doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) { d := new(ShadowDoc) d.DeviceCode = deviceCode return gjson.New(d), nil }, 0) if err != nil { return err } if j, ok := doc.(*gjson.Json); ok { commands := j.GetMaps("commands") commands = append(commands, command.Map()) j.Set("commands", commands) } return nil } func (s *shadowStoreService) GetCommands(deviceCode string) ([]map[string]interface{}, error) { doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) { d := new(ShadowDoc) d.DeviceCode = deviceCode return gjson.New(d), nil }, 0) if err != nil { return nil, err } if j, ok := doc.(*gjson.Json); ok { return j.GetMaps("commands"), nil } return nil, nil } func (s *shadowStoreService) DeleteShadow(deviceCode string) error { //TODO implement me panic("implement me") }