123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- package main
- import (
- "fmt"
- "github.com/gogf/gf/encoding/gjson"
- "github.com/gogf/gf/os/gcache"
- "sparrow/pkg/rpcs"
- "sparrow/pkg/server"
- "sync"
- "time"
- )
- type Server interface {
- // SaveStatus 存储设备状态
- SaveStatus(deviceCode string, data *gjson.Json) error
- // GetStatus 获取设备状态
- GetStatus(deviceCode string) (*gjson.Json, error)
- // SaveCommand 保存指令
- SaveCommand(deviceCode string, command *gjson.Json) error
- // GetCommands 获取指令列表
- GetCommands(deviceCode string) ([]map[string]interface{}, error)
- // DeleteShadow 删除设备影子
- DeleteShadow(deviceCode string) error
- }
- type ShadowServer struct {
- mLock sync.RWMutex
- storeService Server
- }
- func NewShadowServer(storeService Server) *ShadowServer {
- return &ShadowServer{
- storeService: storeService,
- }
- }
- // DeviceReport 设备上报状态
- func (a *ShadowServer) DeviceReport(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error {
- server.Log.Debugf("收到设备上报[%s], data:%s", args.DeviceCode, args.Data)
- a.mLock.RLock()
- defer a.mLock.RUnlock()
- return a.storeService.SaveStatus(args.DeviceCode, gjson.New(args.Data))
- }
- // GetDeviceDesired 获取设备期望状态
- func (a *ShadowServer) GetDeviceDesired(deviceId string, reply *rpcs.DeviceDesiredReply) error {
- a.mLock.RLock()
- defer a.mLock.RUnlock()
- j, err := a.storeService.GetStatus(deviceId)
- if err != nil {
- return err
- }
- reply.Data = j.GetJson("status.desired").MustToJson()
- return nil
- }
- // GetDeviceReported 获取设备上报的状态
- func (a *ShadowServer) GetDeviceReported(deviceId string, reply *gjson.Json) error {
- a.mLock.RLock()
- defer a.mLock.RUnlock()
- j, err := a.storeService.GetStatus(deviceId)
- if err != nil {
- return err
- }
- reply = j.GetJson("status.reported")
- return nil
- }
- // UpdateDesired 更新期望值
- func (a *ShadowServer) UpdateDesired(args *rpcs.ArgsDeviceReport, reply *rpcs.ReplyEmptyResult) error {
- a.mLock.Lock()
- defer a.mLock.Unlock()
- j, err := a.storeService.GetStatus(args.DeviceCode)
- if err != nil {
- return err
- }
- newJ := gjson.New(args.Data)
- for k, v := range newJ.Map() {
- if err = j.Set(fmt.Sprintf("status.desired.%s", k), v); err != nil {
- continue
- }
- }
- return nil
- }
- type shadowStoreService struct {
- store *gcache.Cache
- }
- func NewShadowStoreService() Server {
- return &shadowStoreService{store: gcache.New()}
- }
- func (s *shadowStoreService) SaveStatus(deviceCode string, data *gjson.Json) error {
- doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
- d := new(ShadowDoc)
- d.DeviceCode = deviceCode
- d.Timestamp = time.Now().Unix()
- d.Version = 1
- return gjson.New(d), nil
- }, 0)
- if err != nil {
- return err
- }
- if j, ok := doc.(*gjson.Json); ok {
- for k, v := range data.Map() {
- if err := j.Set(fmt.Sprintf("status.reported.%s", k), v); err != nil {
- continue
- }
- }
- }
- return nil
- }
- func (s *shadowStoreService) GetStatus(deviceCode string) (*gjson.Json, error) {
- doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
- d := new(ShadowDoc)
- d.DeviceCode = deviceCode
- return gjson.New(d), nil
- }, 0)
- return doc.(*gjson.Json), err
- }
- func (s *shadowStoreService) SaveCommand(deviceCode string, command *gjson.Json) error {
- doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
- d := new(ShadowDoc)
- d.DeviceCode = deviceCode
- return gjson.New(d), nil
- }, 0)
- if err != nil {
- return err
- }
- if j, ok := doc.(*gjson.Json); ok {
- commands := j.GetMaps("commands")
- commands = append(commands, command.Map())
- j.Set("commands", commands)
- }
- return nil
- }
- func (s *shadowStoreService) GetCommands(deviceCode string) ([]map[string]interface{}, error) {
- doc, err := s.store.GetOrSetFuncLock(deviceCode, func() (interface{}, error) {
- d := new(ShadowDoc)
- d.DeviceCode = deviceCode
- return gjson.New(d), nil
- }, 0)
- if err != nil {
- return nil, err
- }
- if j, ok := doc.(*gjson.Json); ok {
- return j.GetMaps("commands"), nil
- }
- return nil, nil
- }
- func (s *shadowStoreService) DeleteShadow(deviceCode string) error {
- //TODO implement me
- panic("implement me")
- }
|