浏览代码

更新rest api request 节点

lijian 4 年之前
父节点
当前提交
dbf4608584
共有 4 个文件被更改,包括 35 次插入9 次删除
  1. 4 1
      pkg/ruleEngine/mailbox.go
  2. 11 7
      pkg/ruleEngine/nodes/rest_api_request_node.go
  3. 1 1
      tests/device/main.go
  4. 19 0
      tests/httpserver/main.go

+ 4 - 1
pkg/ruleEngine/mailbox.go

@@ -117,7 +117,10 @@ func (m *MailBox) processMailbox() {
 		if getQueue == nil {
 			break
 		}
-		msg = getQueue.Pop().(protocol.ActorMsg)
+		msg, ok := getQueue.Pop().(protocol.ActorMsg)
+		if !ok {
+			break
+		}
 		if msg != nil {
 			server.Log.Debugf("Going to process message:%s, %v", m.id, msg)
 			if err := m.actor.Process(msg); err != nil {

+ 11 - 7
pkg/ruleEngine/nodes/rest_api_request_node.go

@@ -23,7 +23,7 @@ import (
     "time_out": 5,
     "retry_wait": 1
 }
- */
+*/
 type RestApiRequestNode struct {
 	pool   *grpool.Pool
 	config *RestApiRequestNodeConfig
@@ -32,13 +32,12 @@ type RestApiRequestNode struct {
 
 func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
 	r.pool = grpool.New(10)
-
 	if config == "" {
 		r.config = &RestApiRequestNodeConfig{
-			Url:       "http://localhost/",
+			Url:       "http://localhost:8899/test",
 			Headers:   make(map[string]string),
 			Retry:     1,
-			Method:    "GET",
+			Method:    "POST",
 			TimeOut:   5,
 			RetryWait: 1,
 		}
@@ -59,9 +58,9 @@ func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error {
 
 func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error {
 	body := message.Data
-	headers := make(map[string]interface{})
+	headers := make(map[string]string)
 	for k, v := range message.MetaData {
-		headers[k] = v
+		headers[k] = v.(string)
 	}
 	for k, v := range r.config.Headers {
 		headers[k] = v
@@ -70,10 +69,15 @@ func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol
 	if err := json.NewEncoder(w).Encode(body); err != nil {
 		return err
 	}
-	req, err := utils.NewRequest(r.config.Method, r.config.Url, w)
+	req, err := utils.NewRequest(r.config.Method, r.config.Url, []byte(body))
 	if err != nil {
+		server.Log.Error(err)
 		return err
 	}
+	for k, v := range headers {
+		req.Header.Add(k, v)
+	}
+	req.Header.Add("Content-Type", "application/json")
 	return r.pool.Add(func() {
 		res, err := r.client.Do(req)
 		if err != nil {

+ 1 - 1
tests/device/main.go

@@ -6,7 +6,7 @@ import (
 )
 
 var (
-	testURL        = flag.String("url", "http://192.168.1.100:8088", "login url")
+	testURL        = flag.String("url", "http://192.168.1.116:8088", "login url")
 	testProductKey = flag.String("productkey", "2e397f5599a3f6f6a5a3c8fcd45437169501b3c6e239042ad5e9b65303561e41ab5519e9d205facbcbe75a2784354501", "product key")
 	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )

+ 19 - 0
tests/httpserver/main.go

@@ -0,0 +1,19 @@
+package main
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+)
+
+func main() {
+	http.HandleFunc("/test", func(w http.ResponseWriter, r *http.Request) {
+		b, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			panic(err)
+		}
+		fmt.Println(string(b))
+	})
+
+	http.ListenAndServe("127.0.0.1:8899", nil)
+}