lijian 3 years ago
parent
commit
c24bdac717

+ 2 - 2
pkg/protocol/message.go

@@ -3,7 +3,6 @@ package protocol
 import (
 	"bytes"
 	"encoding/gob"
-	"sparrow/pkg/server"
 	"sync/atomic"
 	"time"
 )
@@ -53,6 +52,7 @@ func (a *Message) Encode() ([]byte, error) {
 
 func (a *Message) Decode(data []byte) error {
 	var network bytes.Buffer
+	defer network.Reset()
 	network.Write(data)
 	dec := gob.NewDecoder(&network)
 	return dec.Decode(a)
@@ -88,7 +88,7 @@ func (e emptyCallBack) OnSuccess() {
 }
 
 func (e emptyCallBack) OnFailure(err error) {
-	server.Log.Error("消息出错:" + err.Error())
+	// server.Log.Error("消息出错:" + err.Error())
 }
 
 // SetCallBack

+ 6 - 6
pkg/queue/msgQueue/rabbitmq.go

@@ -84,11 +84,11 @@ func (r *RabbitMessageQueueAdmin) handleReInit(conn *amqp.Connection) bool {
 		select {
 		case <-r.done:
 			return true
-		case <-r.notifyCloseChan:
-			fmt.Println("Connection closed. Reconnecting...")
+		case err :=<-r.notifyCloseChan:
+			fmt.Println("Connection closed. Reconnecting..."+ err.Error())
 			return false
-		case <-r.notifyChanClose:
-			fmt.Println("Channel closed. Re-running init...")
+		case err :=<-r.notifyChanClose:
+			fmt.Println("Channel closed. Re-running init..." + err.Error())
 		}
 	}
 }
@@ -296,7 +296,7 @@ func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
 			msgs, err := r.admin.ch.Consume(
 				tpc,
 				"",    // consumer
-				false, // auto-ack
+				true, // auto-ack
 				false, // exclusive
 				false, // no-local
 				false, // no-wait
@@ -308,7 +308,7 @@ func (r *RabbitMqConsumer) doPop(duration time.Duration) error {
 			}
 			for d := range msgs {
 				r.recvChan <- d.Body
-				d.Ack(true)
+				// d.Ack(true)
 			}
 		}(topic)
 	}

+ 1 - 6
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -33,7 +33,7 @@ type RestApiRequestNode struct {
 }
 
 func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
-	r.pool = grpool.New(500)
+	r.pool = grpool.New(10)
 	if config == "" {
 		r.config = &RestApiRequestNodeConfig{
 			Url:       "http://localhost:8899/test",
@@ -95,11 +95,6 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 			ctx.TellError(next, err)
 			return
 		}
-		defer func() {
-			if res != nil {
-				res.Close()
-			}
-		}()
 		if res != nil && res.Response() != nil {
 			if res.Response().StatusCode == http.StatusOK {
 				msg := r.processResponse(ctx, message, res)

+ 9 - 4
pkg/ruleEngine/nodes/script_filter_node.go

@@ -43,7 +43,6 @@ func (j *FilterJavascriptNode) Init(ctx ruleEngine.Context, config string) error
 
 func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
 	body := message.Data
-	fmt.Println(body)
 	metaData, err := json.Marshal(message.MetaData)
 	if err != nil {
 		server.Log.Errorf("metadata marshal error:%s", err.Error())
@@ -51,7 +50,6 @@ func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protoc
 		ctx.TellError(next, err)
 		return errors.New("metadata marshal error " + err.Error())
 	}
-	fmt.Println(string(metaData))
 	res, err := j.vm.Call("filter", nil, body, string(metaData), message.Type)
 	if err != nil {
 		next := j.processError(ctx, message, err)
@@ -59,8 +57,15 @@ func (j *FilterJavascriptNode) OnMessage(ctx ruleEngine.Context, message *protoc
 		server.Log.Errorf("vm call filter error:%s", err.Error())
 		return err
 	}
-	result := res.IsBoolean()
-	if result {
+	var v bool
+	result, err := res.ToBoolean()
+	if err != nil {
+		v = false
+	} else {
+		v = result
+	}
+
+	if v {
 		ctx.TellNext(message, protocol.True)
 	} else {
 		ctx.TellNext(message, protocol.False)

+ 7 - 3
pkg/ruleEngine/nodes/script_filter_node_test.go

@@ -6,18 +6,22 @@ import (
 )
 
 func TestGenScript(t *testing.T) {
-	result :=generateRuleNodeScript("filter", "return msg.id > 20")
+	result :=generateRuleNodeScript("filter", "return msg.heart_beat !== undefined")
 	t.Log(result)
 	vm :=otto.New()
 	_, err :=vm.Run(result)
 	if err != nil {
 		t.Error(err)
 	}
-	call, err := vm.Call("filter", nil, "{\"id\":100}", "{\"id\":1}", "")
+	call, err := vm.Call("filter", nil, "{\"device_id\":2,\"heart_beat\":{\"device_id\":20274757167}}", "{\"id\":1}", "")
 	if err != nil {
 		t.Error(err)
 	}
-	t.Logf("%+v", call)
+	v, err :=call.ToBoolean()
+	if err != nil {
+		t.Error(err)
+	}
+	t.Logf("%+v", v)
 
 
 }