Browse Source

优化redis连接池的泄露问题

lijian 3 years ago
parent
commit
2e2d398ab2

+ 4 - 2
pkg/online/online.go

@@ -35,6 +35,7 @@ func (mgr *Manager) GetStatus(id string) (*Status, error) {
 	if err != nil {
 		return nil, err
 	}
+	defer conn.Close()
 
 	status := &Status{}
 	// get status from redis
@@ -56,6 +57,7 @@ func (mgr *Manager) GetOnline(id string, status Status) error {
 	if err != nil {
 		return err
 	}
+	defer conn.Close()
 	// serialize and store the device's online status info in redis
 	bufferStr, err := serializer.Struct2String(status)
 	if err != nil {
@@ -88,7 +90,7 @@ func (mgr *Manager) SetHeartbeat(id string) error {
 	if err != nil {
 		return err
 	}
-
+	defer conn.Close()
 	_, err = conn.Do("EXPIRE", key, status.HeartbeatInterval+status.HeartbeatInterval/2)
 	if err != nil {
 		return err
@@ -103,7 +105,7 @@ func (mgr *Manager) GetOffline(id string) error {
 	if err != nil {
 		return err
 	}
-
+	defer conn.Close()
 	_, err = conn.Do("DEL", key)
 	if err != nil {
 		return err

+ 1 - 12
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -1,7 +1,6 @@
 package nodes
 
 import (
-	"bytes"
 	"encoding/json"
 	"github.com/gogf/gf/os/grpool"
 	"net/http"
@@ -55,11 +54,6 @@ func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
 		r.config.Retry, time.Duration(r.config.RetryWait)*time.Second)
 	client.SetLogger(server.Log)
 	r.client = client
-	r.bufPool = sync.Pool{
-		New: func() interface{} {
-			return new(bytes.Buffer)
-		},
-	}
 	return nil
 }
 
@@ -72,12 +66,6 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 	for k, v := range r.config.Headers {
 		headers[k] = v
 	}
-	//w := r.bufPool.Get().(*bytes.Buffer)
-	//if err := json.NewEncoder(w).Encode(body); err != nil {
-	//	return err
-	//}
-	//w.Reset()
-	//r.bufPool.Put(w)
 	req, err := utils.NewRequest(r.config.Method, r.config.Url, []byte(body))
 	if err != nil {
 		server.Log.Error(err)
@@ -96,6 +84,7 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 			return
 		}
 		if res != nil && res.Response() != nil {
+			defer res.Close()
 			if res.Response().StatusCode == http.StatusOK {
 				msg := r.processResponse(ctx, message, res)
 				ctx.TellSuccess(msg)

+ 5 - 5
pkg/token/token.go

@@ -35,7 +35,7 @@ func (helper *Helper) GenerateToken(recordId string) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
-
+	defer conn.Close()
 	key := DeviceTokenKeyPrefix + recordId
 
 	_, err = conn.Do("SET", key, token)
@@ -58,7 +58,7 @@ func (helper *Helper) ValidateToken(id string, token []byte) error {
 	if err != nil {
 		return err
 	}
-
+	defer conn.Close()
 	readToken, err := conn.Do("GET", key)
 	if err != nil {
 		return err
@@ -83,7 +83,7 @@ func (helper *Helper) ClearToken(id string) error {
 	if err != nil {
 		return err
 	}
-
+	defer conn.Close()
 	_, err = conn.Do("DEL", key)
 	if err != nil {
 		return err
@@ -102,7 +102,7 @@ func (helper *Helper) GenerateAppToken(id string, key string) ([]byte, error) {
 	if err != nil {
 		return nil, err
 	}
-
+	defer conn.Close()
 	//key := ApplicationTokenKeyPrefix + strconv.FormatUint(id, 10)
 
 	_, err = conn.Do("SET", key, token)
@@ -123,7 +123,7 @@ func (helper *Helper) ValidateAppToken(key string, token []byte) error {
 	if err != nil {
 		return err
 	}
-
+	defer conn.Close()
 	readToken, err := conn.Do("GET", key)
 	if err != nil {
 		return err

+ 1 - 0
services/fileaccess/fileaccess.go

@@ -118,6 +118,7 @@ func (f *FileAccess) getTempFileFromRedis() error {
 			f.mu.Unlock()
 		}
 	}
+	conn.Close()
 	return nil
 
 }