소스 검색

更新fileaccess服务,添加自动清理文件功能

lijian 6 년 전
부모
커밋
798a030605
7개의 변경된 파일98개의 추가작업 그리고 48개의 파일을 삭제
  1. 1 0
      pkg/queue/queue.go
  2. 7 6
      pkg/server/server_manager.go
  3. 8 1
      services/fileaccess/file.go
  4. 36 2
      services/fileaccess/fileaccess.go
  5. 12 8
      services/fileaccess/main.go
  6. BIN
      tests/device/device
  7. 34 31
      tests/device/device.go

+ 1 - 0
pkg/queue/queue.go

@@ -4,6 +4,7 @@ package queue
 import (
 import (
 	"errors"
 	"errors"
 	"sparrow/pkg/serializer"
 	"sparrow/pkg/serializer"
+
 	"github.com/streadway/amqp"
 	"github.com/streadway/amqp"
 )
 )
 
 

+ 7 - 6
pkg/server/server_manager.go

@@ -4,11 +4,12 @@ package server
 
 
 import (
 import (
 	"errors"
 	"errors"
-	"github.com/coreos/etcd/client"
-	"golang.org/x/net/context"
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"time"
 	"time"
+
+	"github.com/coreos/etcd/client"
+	"golang.org/x/net/context"
 )
 )
 
 
 const (
 const (
@@ -38,7 +39,7 @@ func NewServerManager(name string, etcd string) (*ServerManager, error) {
 	}, nil
 	}, nil
 }
 }
 
 
-// register server to etcd
+// RegisterServer register server to etcd
 func (mgr *ServerManager) RegisterServer() error {
 func (mgr *ServerManager) RegisterServer() error {
 	if serverInstance == nil {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
 		return errorf(errServerNotInit)
@@ -84,7 +85,7 @@ func (mgr *ServerManager) RegisterServer() error {
 	return nil
 	return nil
 }
 }
 
 
-// update server hosts
+// UpdateServerHosts update server hosts
 func (mgr *ServerManager) UpdateServerHosts() error {
 func (mgr *ServerManager) UpdateServerHosts() error {
 	if serverInstance == nil {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
 		return errorf(errServerNotInit)
@@ -136,12 +137,12 @@ func (mgr *ServerManager) UpdateServerHosts() error {
 
 
 	mgr.mapServers = servers
 	mgr.mapServers = servers
 
 
-	Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
+	//Log.Infof("UpdateServerHosts is done: %v", mgr.mapServers)
 	return nil
 	return nil
 
 
 }
 }
 
 
-// get host ips for the server, now return all hosts
+// GetServerHosts get host ips for the server, now return all hosts
 func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
 func (mgr *ServerManager) GetServerHosts(serverName string, hostType string) ([]string, error) {
 	server, ok := mgr.mapServers[serverName]
 	server, ok := mgr.mapServers[serverName]
 	if !ok {
 	if !ok {

+ 8 - 1
services/fileaccess/file.go

@@ -9,11 +9,12 @@ import (
 	"sparrow/pkg/server"
 	"sparrow/pkg/server"
 	"sparrow/pkg/utils"
 	"sparrow/pkg/utils"
 	"strconv"
 	"strconv"
+	"time"
 
 
 	"github.com/kataras/iris"
 	"github.com/kataras/iris"
 )
 )
 
 
-func registerRouter(app *iris.Application) {
+func registerRouter(app *iris.Application, fc *FileAccess) {
 	app.Post("/upload_file", iris.LimitRequestBodySize(int64(*conMaxSize)),
 	app.Post("/upload_file", iris.LimitRequestBodySize(int64(*conMaxSize)),
 		func(ctx iris.Context) {
 		func(ctx iris.Context) {
 			file, info, err := ctx.FormFile("file")
 			file, info, err := ctx.FormFile("file")
@@ -58,6 +59,12 @@ func registerRouter(app *iris.Application) {
 			} else {
 			} else {
 				fileURL = "http://" + *conDomain + "/upload/tmp/" + newname
 				fileURL = "http://" + *conDomain + "/upload/tmp/" + newname
 			}
 			}
+			fc.mu.Lock()
+			fc.tempFiles[newname] = &tempFile{
+				fileName:   newfile,
+				createTime: time.Now(),
+			}
+			fc.mu.Unlock()
 			ctx.JSON(map[string]interface{}{
 			ctx.JSON(map[string]interface{}{
 				"code":    0,
 				"code":    0,
 				"message": "success",
 				"message": "success",

+ 36 - 2
services/fileaccess/fileaccess.go

@@ -10,16 +10,50 @@ import (
 	"sparrow/pkg/server"
 	"sparrow/pkg/server"
 	"sparrow/pkg/utils"
 	"sparrow/pkg/utils"
 	"sync"
 	"sync"
+	"time"
 )
 )
 
 
+const checkTimeOut = 30 * time.Minute
+
 // FileAccess RPC服务
 // FileAccess RPC服务
 type FileAccess struct {
 type FileAccess struct {
-	mu sync.RWMutex
+	mu        sync.RWMutex
+	tempFiles map[string]*tempFile
+}
+
+// 代表一个临时文件
+type tempFile struct {
+	//文件路径
+	fileName string
+	//创建时间
+	createTime time.Time
 }
 }
 
 
 // NewFileAccess create a FileAccessor instance
 // NewFileAccess create a FileAccessor instance
 func NewFileAccess() *FileAccess {
 func NewFileAccess() *FileAccess {
-	return &FileAccess{}
+	return &FileAccess{
+		tempFiles: make(map[string]*tempFile),
+	}
+}
+
+// TODO:  临时解决文案,下个版本把文件信息写到redis中,利用redis的pub/sub机制,自动清理文件
+func (f *FileAccess) checker() {
+	server.Log.Info("开始文件过期检测")
+	for {
+		for k, v := range f.tempFiles {
+			if time.Now().Sub(v.createTime) > checkTimeOut {
+				//delete file
+				f.mu.Lock()
+				delete(f.tempFiles, k)
+				f.mu.Unlock()
+				err := os.Remove(v.fileName)
+				if err != nil {
+					server.Log.Errorf("自动清理文件失败:%v", err)
+				}
+			}
+		}
+		time.Sleep(30 * time.Minute)
+	}
 }
 }
 
 
 // MoveFile move a file to new path
 // MoveFile move a file to new path

+ 12 - 8
services/fileaccess/main.go

@@ -13,6 +13,14 @@ func main() {
 		server.Log.Fatal(err)
 		server.Log.Fatal(err)
 		return
 		return
 	}
 	}
+
+	fileaccess := NewFileAccess()
+	err = server.RegisterRPCHandler(fileaccess)
+	if err != nil {
+		server.Log.Errorf("RegisterRPCHandler Error: %s", err)
+		return
+	}
+
 	app := iris.New()
 	app := iris.New()
 	app.Use(iris.Gzip)
 	app.Use(iris.Gzip)
 	iris.WithPostMaxMemory(int64(*conMaxSize))
 	iris.WithPostMaxMemory(int64(*conMaxSize))
@@ -25,7 +33,7 @@ func main() {
 	app.Use(cors.New(opts))
 	app.Use(cors.New(opts))
 	app.AllowMethods(iris.MethodOptions)
 	app.AllowMethods(iris.MethodOptions)
 	app.StaticWeb("/upload", *conStaticPath)
 	app.StaticWeb("/upload", *conStaticPath)
-	registerRouter(app)
+	registerRouter(app, fileaccess)
 	app.Build()
 	app.Build()
 	// register a http handler
 	// register a http handler
 	err = server.RegisterHTTPHandler(app)
 	err = server.RegisterHTTPHandler(app)
@@ -33,13 +41,9 @@ func main() {
 		server.Log.Errorf("RegisterHTTPHandler Error: %s", err)
 		server.Log.Errorf("RegisterHTTPHandler Error: %s", err)
 		return
 		return
 	}
 	}
-	fileaccess := NewFileAccess()
-	err = server.RegisterRPCHandler(fileaccess)
-	if err != nil {
-		server.Log.Errorf("RegisterRPCHandler Error: %s", err)
-		return
-	}
-	// go
+
+	//
+	go fileaccess.checker()
 	err = server.Run()
 	err = server.Run()
 	if err != nil {
 	if err != nil {
 		server.Log.Fatal(err)
 		server.Log.Fatal(err)

BIN
tests/device/device


+ 34 - 31
tests/device/device.go

@@ -157,39 +157,42 @@ func (d *Device) DoLogin() error {
 
 
 func (d *Device) reportStatus(client *MQTT.Client) {
 func (d *Device) reportStatus(client *MQTT.Client) {
 
 
-	payloadHead := protocol.DataHead{
-		Flag:      0,
-		Timestamp: uint64(time.Now().Unix() * 1000),
-	}
-	param := []interface{}{uint8(1)}
-	params, err := tlv.MakeTLVs(param)
-	if err != nil {
-		fmt.Println(err)
-		return
-	}
-	sub := protocol.SubData{
-		Head: protocol.SubDataHead{
-			SubDeviceid: uint16(1),
-			PropertyNum: uint16(1),
-			ParamsCount: uint16(len(params)),
-		},
-		Params: params,
-	}
+	for {
+		time.Sleep(10 * time.Second)
+		payloadHead := protocol.DataHead{
+			Flag:      0,
+			Timestamp: uint64(time.Now().Unix() * 1000),
+		}
+		param := []interface{}{"li jian"}
+		params, err := tlv.MakeTLVs(param)
+		if err != nil {
+			fmt.Println(err)
+			return
+		}
+		sub := protocol.SubData{
+			Head: protocol.SubDataHead{
+				SubDeviceid: uint16(1),
+				PropertyNum: uint16(1),
+				ParamsCount: uint16(len(params)),
+			},
+			Params: params,
+		}
 
 
-	status := protocol.Data{
-		Head:    payloadHead,
-		SubData: []protocol.SubData{},
-	}
+		status := protocol.Data{
+			Head:    payloadHead,
+			SubData: []protocol.SubData{},
+		}
 
 
-	status.SubData = append(status.SubData, sub)
+		status.SubData = append(status.SubData, sub)
 
 
-	payload, err := status.Marshal()
-	if err != nil {
-		fmt.Println(err)
-		return
-	}
+		payload, err := status.Marshal()
+		if err != nil {
+			fmt.Println(err)
+			return
+		}
 
 
-	client.Publish("s", 1, false, payload)
+		client.Publish("s", 1, false, payload)
+	}
 
 
 }
 }
 
 
@@ -297,8 +300,8 @@ func (d *Device) DoAccess() error {
 	}
 	}
 
 
 	// beigin report event test
 	// beigin report event test
-	go d.reportEvent(c)
-
+	//go d.reportEvent(c)
+	go d.reportStatus(c)
 	// we just pause here to wait for messages
 	// we just pause here to wait for messages
 	<-make(chan int)
 	<-make(chan int)