package nodes import ( "bytes" "encoding/json" "fmt" "github.com/gogf/gf/os/grpool" "net/http" "sparrow/pkg/protocol" "sparrow/pkg/ruleEngine" "sparrow/pkg/server" "sparrow/pkg/utils" "time" ) // RestApiRequestNode 请求外部API节点 type RestApiRequestNode struct { pool *grpool.Pool config *RestApiRequestNodeConfig client *utils.HttpClient } func (r *RestApiRequestNode) Init(ctx ruleEngine.Context, config string) error { r.pool = grpool.New(10) if config == "" { r.config = &RestApiRequestNodeConfig{ Url: "http://localhost/", Headers: make(map[string]string), Retry: 1, Method: "GET", TimeOut: 5, RetryWait: 1, } } else { c := new(RestApiRequestNodeConfig) err := json.Unmarshal([]byte(config), c) if err != nil { return err } r.config = c } client := utils.NewHttpClientWithConfig(time.Duration(r.config.TimeOut)*time.Second, r.config.Retry, time.Duration(r.config.RetryWait)*time.Second) client.SetLogger(server.Log) r.client = client return nil } func (r *RestApiRequestNode) OnMessage(ctx ruleEngine.Context, message *protocol.Message) error { body := message.Data headers := make(map[string]interface{}) for k, v := range message.MetaData { headers[k] = v } for k, v := range r.config.Headers { headers[k] = v } fmt.Printf("%+v\r\n", headers) w := new(bytes.Buffer) if err := json.NewEncoder(w).Encode(body); err != nil { return err } req, err := utils.NewRequest(r.config.Method, r.config.Url, w) if err != nil { return err } return r.pool.Add(func() { res, err := r.client.Do(req) if err != nil { next := r.processError(ctx, message, err) ctx.TellError(next, err) return } if res.Response().StatusCode == http.StatusOK { msg := r.processResponse(ctx, message, res) ctx.TellSuccess(msg) } }) } func (r *RestApiRequestNode) processError(ctx ruleEngine.Context, msg *protocol.Message, err error) *protocol.Message { var metaData map[string]interface{} if msg.MetaData != nil { metaData = msg.MetaData } metaData = make(map[string]interface{}) metaData["error"] = err.Error() return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, msg.Data) } func (r *RestApiRequestNode) processResponse(ctx ruleEngine.Context, msg *protocol.Message, res utils.Responser) *protocol.Message { var metaData map[string]interface{} if msg.MetaData != nil { metaData = msg.MetaData } metaData = make(map[string]interface{}) metaData["STATUS"] = res.Response().Status metaData["STATUS_CODE"] = res.Response().StatusCode var data string if body, err := res.String(); err == nil { data = body } return ctx.TransformMessage(msg, msg.Type, msg.Originator, metaData, data) } type RestApiRequestNodeConfig struct { Url string `json:"url"` // 请求的地址 Headers map[string]string `json:"headers"` // 请求头 Retry int `json:"retry"` // 重试次数 Method string `json:"method"` // 请求方法 TimeOut int `json:"time_out"` // 超时时间(秒) RetryWait int `json:"retry_wait"` // 重试等待时间 }