Bläddra i källkod

更新opentracing方式

lijian 6 år sedan
förälder
incheckning
5ebf369a34

+ 3 - 3
pkg/rule/ifttt.go

@@ -19,7 +19,7 @@ func (ift *Ifttt) Check(deviceid uint64, eventid uint16) error {
 		RuleType: "ifttt",
 		DeviceID: int64(deviceid),
 	}
-	err := server.RPCCallByName("registry", "Registry.QueryRules", query, actions)
+	err := server.RPCCallByName(nil, "registry", "Registry.QueryRules", query, actions)
 	if err != nil {
 		server.Log.Warnf("load ifttt rules error : %v", err)
 		return err
@@ -27,14 +27,14 @@ func (ift *Ifttt) Check(deviceid uint64, eventid uint16) error {
 
 	if len(*actions) > 0 {
 		device := &models.Device{}
-		err := server.RPCCallByName("registry", "Registry.FindDeviceById", int64(deviceid), device)
+		err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceById", int64(deviceid), device)
 		if err != nil {
 			server.Log.Errorf("find device error : %v", err)
 			return err
 		}
 
 		product := &models.Product{}
-		err = server.RPCCallByName("registry", "Registry.FindProduct", device.ProductID, product)
+		err = server.RPCCallByName(nil, "registry", "Registry.FindProduct", device.ProductID, product)
 		if err != nil {
 			server.Log.Errorf("find product error : %v", err)
 			return err

+ 4 - 4
pkg/rule/rule_action.go

@@ -20,13 +20,13 @@ func performRuleAction(target string, action string) error {
 
 	identifier := parts[1]
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceByIdentifier", identifier, device)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceByIdentifier", identifier, device)
 	if err != nil {
 		return err
 	}
 
 	product := &models.Product{}
-	err = server.RPCCallByName("registry", "Registry.FindProduct", device.ProductID, product)
+	err = server.RPCCallByName(nil, "registry", "Registry.FindProduct", device.ProductID, product)
 	if err != nil {
 		return err
 	}
@@ -66,7 +66,7 @@ func performRuleAction(target string, action string) error {
 			Params:    command.Params,
 		}
 		cmdreply := rpcs.ReplySendCommand{}
-		err = server.RPCCallByName("controller", "Controller.SendCommand", cmdargs, &cmdreply)
+		err = server.RPCCallByName(nil, "controller", "Controller.SendCommand", cmdargs, &cmdreply)
 		if err != nil {
 			server.Log.Errorf("send device command error: %v", err)
 			return err
@@ -82,7 +82,7 @@ func performRuleAction(target string, action string) error {
 			Status:   status,
 		}
 		statusreply := rpcs.ReplySetStatus{}
-		err = server.RPCCallByName("controller", "Controller.SetStatus", statusargs, &statusreply)
+		err = server.RPCCallByName(nil, "controller", "Controller.SetStatus", statusargs, &statusreply)
 		if err != nil {
 			server.Log.Errorf("set devie status error: %v", err)
 			return err

+ 3 - 2
pkg/rule/timer.go

@@ -5,8 +5,9 @@ import (
 	"fmt"
 	"sparrow/pkg/models"
 	"sparrow/pkg/server"
-	"github.com/robfig/cron"
 	"time"
+
+	"github.com/robfig/cron"
 )
 
 type Timer struct {
@@ -37,7 +38,7 @@ func (t *Timer) refresh() {
 	query := &models.Rule{
 		RuleType: "timer",
 	}
-	err := server.RPCCallByName("registry", "Registry.QueryRules", query, timers)
+	err := server.RPCCallByName(nil, "registry", "Registry.QueryRules", query, timers)
 	if err != nil {
 		server.Log.Warnf("refresh timer rules error : %v", err)
 		return

+ 6 - 2
pkg/server/rpc_client.go

@@ -6,6 +6,8 @@ import (
 	"math/rand"
 	"net/rpc"
 	"time"
+
+	"github.com/opentracing/opentracing-go"
 )
 
 type RPCClient struct {
@@ -40,7 +42,8 @@ func rpcCallWithReconnect(client *rpc.Client, addr string, serverMethod string,
 }
 
 // RPC call with reconnect and retry.
-func (client *RPCClient) Call(severName string, serverMethod string, args interface{}, reply interface{}) error {
+func (client *RPCClient) Call(span opentracing.Span, severName string, serverMethod string, args interface{}, reply interface{}) error {
+	defer span.Finish()
 	addrs, err := serverInstance.svrmgr.GetServerHosts(severName, FlagRPCHost)
 	if err != nil {
 		return err
@@ -60,7 +63,8 @@ func (client *RPCClient) Call(severName string, serverMethod string, args interf
 				continue
 			}
 		}
-
+		span.SetTag("server.method", serverMethod)
+		span.SetTag("server.addr", addr)
 		err = rpcCallWithReconnect(client.clients[mapkey], addr, serverMethod, args, reply)
 		if err != nil {
 			Log.Warnf("RpcCallWithReconnect error : %s", err)

+ 1 - 1
pkg/server/rpc_client_test.go

@@ -22,7 +22,7 @@ func validateRPCClient(t *testing.T) {
 		t.Fatalf("rpc client test faild, want %d, got %d", testRPCArgs.A*testRPCArgs.B, reply)
 	}
 
-	err = RPCCallByName("test", "Arith.Multiply", args, &reply)
+	err = RPCCallByName(nil, "test", "Arith.Multiply", args, &reply)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 14 - 2
pkg/server/server.go

@@ -7,7 +7,10 @@
 package server
 
 import (
+	"context"
+
 	"github.com/opentracing/opentracing-go"
+	"github.com/opentracing/opentracing-go/log"
 
 	// "github.com/vharitonsky/iniflags"
 	"flag"
@@ -186,12 +189,21 @@ func RegisterTimerTask(task TimerTask) error {
 }
 
 // RPCCallByName rpc call by name
-func RPCCallByName(serverName string, serverMethod string, args interface{}, reply interface{}) error {
+func RPCCallByName(ctx context.Context, serverName string, serverMethod string, args interface{}, reply interface{}) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
 	}
+	var span opentracing.Span
 
-	return serverInstance.rpccli.Call(serverName, serverMethod, args, reply)
+	if ctx != nil {
+		sp := opentracing.SpanFromContext(ctx)
+		span = opentracing.StartSpan(serverName, opentracing.ChildOf(sp.Context()))
+	}
+	span.LogFields(
+		log.Object("args", args),
+	)
+	span.SetTag("server.name", serverName)
+	return serverInstance.rpccli.Call(span, serverName, serverMethod, args, reply)
 }
 
 // RPCCallByHost rpc call by host

+ 6 - 6
services/apiprovider/actions.go

@@ -58,7 +58,7 @@ func GetDeviceInfoByKey(params martini.Params, req *http.Request, r render.Rende
 	key := req.URL.Query().Get("device_key")
 	server.Log.Printf("ACTION GetDeviceInfoByKey, key:: %v", key)
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.ValidateDevice", key, device)
+	err := server.RPCCallByName(nil, "registry", "Registry.ValidateDevice", key, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return
@@ -80,7 +80,7 @@ func GetDeviceInfoByIdentifier(urlparams martini.Params, r render.Render) {
 	identifier := urlparams["identifier"]
 	server.Log.Printf("ACTION GetDeviceInfoByIdentifier, identifier:: %v", identifier)
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceByIdentifier", identifier, device)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceByIdentifier", identifier, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return
@@ -106,7 +106,7 @@ func GetDeviceCurrentStatus(device *models.Device, config *productconfig.Product
 		Id: uint64(device.ID),
 	}
 	statusreply := rpcs.ReplyGetStatus{}
-	err := server.RPCCallByName("controller", "Controller.GetStatus", statusargs, &statusreply)
+	err := server.RPCCallByName(nil, "controller", "Controller.GetStatus", statusargs, &statusreply)
 	if err != nil {
 		server.Log.Errorf("get devie status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -159,7 +159,7 @@ func SetDeviceStatus(device *models.Device, config *productconfig.ProductConfig,
 		Status:   status,
 	}
 	statusreply := rpcs.ReplySetStatus{}
-	err = server.RPCCallByName("controller", "Controller.SetStatus", statusargs, &statusreply)
+	err = server.RPCCallByName(nil, "controller", "Controller.SetStatus", statusargs, &statusreply)
 	if err != nil {
 		server.Log.Errorf("set devie status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -206,7 +206,7 @@ func SendCommandToDevice(device *models.Device, config *productconfig.ProductCon
 		Params:    command.Params,
 	}
 	cmdreply := rpcs.ReplySendCommand{}
-	err = server.RPCCallByName("controller", "Controller.SendCommand", cmdargs, &cmdreply)
+	err = server.RPCCallByName(nil, "controller", "Controller.SendCommand", cmdargs, &cmdreply)
 	if err != nil {
 		server.Log.Errorf("send devie command error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -236,7 +236,7 @@ func AddRule(device *models.Device, req *http.Request, r render.Render) {
 	}
 	reply := &rpcs.ReplyEmptyResult{}
 
-	err = server.RPCCallByName("registry", "Registry.CreateRule", rule, reply)
+	err = server.RPCCallByName(nil, "registry", "Registry.CreateRule", rule, reply)
 	if err != nil {
 		server.Log.Errorf("create device rule error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))

+ 9 - 8
services/apiprovider/middleware.go

@@ -2,15 +2,16 @@ package main
 
 import (
 	"errors"
+	"net/http"
 	"sparrow/pkg/models"
 	"sparrow/pkg/productconfig"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
-	"github.com/go-martini/martini"
-	"github.com/martini-contrib/render"
-	"net/http"
 	"strconv"
 	"strings"
+
+	"github.com/go-martini/martini"
+	"github.com/martini-contrib/render"
 )
 
 func checkAppDomain(domain string, identifier string) error {
@@ -74,7 +75,7 @@ func ApplicationAuthOnDeviceIdentifer(context martini.Context, params martini.Pa
 	}
 
 	app := &models.Application{}
-	err := server.RPCCallByName("registry", "Registry.ValidateApplication", key, app)
+	err := server.RPCCallByName(nil, "registry", "Registry.ValidateApplication", key, app)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrAccessDenied, err))
 		return
@@ -93,7 +94,7 @@ func CheckDeviceOnline(context martini.Context, params martini.Params, req *http
 	identifier := params["identifier"]
 
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceByIdentifier", identifier, device)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceByIdentifier", identifier, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return
@@ -103,7 +104,7 @@ func CheckDeviceOnline(context martini.Context, params martini.Params, req *http
 		Id: uint64(device.ID),
 	}
 	onlinereply := rpcs.ReplyGetDeviceOnlineStatus{}
-	err = server.RPCCallByName("devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
+	err = server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", onlineargs, &onlinereply)
 	if err != nil {
 		server.Log.Errorf("get devie online status error: %v", err)
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotOnline, err))
@@ -118,7 +119,7 @@ func CheckDeviceIdentifier(context martini.Context, params martini.Params, req *
 	identifier := params["identifier"]
 
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceByIdentifier", identifier, device)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceByIdentifier", identifier, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return
@@ -131,7 +132,7 @@ func CheckDeviceIdentifier(context martini.Context, params martini.Params, req *
 func CheckProductConfig(context martini.Context, device *models.Device,
 	params martini.Params, req *http.Request, r render.Render) {
 	product := &models.Product{}
-	err := server.RPCCallByName("registry", "Registry.FindProduct", device.ProductID, product)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindProduct", device.ProductID, product)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrProductNotFound, err))
 		return

+ 3 - 3
services/apiprovider/notifier.go

@@ -64,7 +64,7 @@ func (n *Notifier) processStatus() error {
 func (n *Notifier) updateApplications() error {
 	for {
 
-		err := server.RPCCallByName("registry", "Registry.GetApplications", 0, &n.apps)
+		err := server.RPCCallByName(nil, "registry", "Registry.GetApplications", 0, &n.apps)
 		if err != nil {
 			server.Log.Errorf("get applications error : %v", err)
 		}
@@ -77,14 +77,14 @@ func (n *Notifier) reportEvent(event rpcs.ArgsOnEvent) error {
 	server.Log.Debugf("reporting event %v", event)
 
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceById", int64(event.DeviceId), device)
+	err := server.RPCCallByName(nil, "registry", "Registry.FindDeviceById", int64(event.DeviceId), device)
 	if err != nil {
 		server.Log.Errorf("find device error : %v", err)
 		return err
 	}
 
 	product := &models.Product{}
-	err = server.RPCCallByName("registry", "Registry.FindProduct", device.ProductID, product)
+	err = server.RPCCallByName(nil, "registry", "Registry.FindProduct", device.ProductID, product)
 	if err != nil {
 		server.Log.Errorf("find product error : %v", err)
 		return err

+ 4 - 4
services/coapaccess/coap_provider.go

@@ -18,7 +18,7 @@ func (mp *CoAPProvider) ValidateDeviceToken(deviceid uint64, token []byte) error
 		AccessToken: token,
 	}
 	reply := rpcs.ReplyValidateDeviceAccessToken{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
 	if err != nil {
 		server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
 		return err
@@ -27,7 +27,7 @@ func (mp *CoAPProvider) ValidateDeviceToken(deviceid uint64, token []byte) error
 }
 func (mp *CoAPProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
 	reply := rpcs.ReplyGetOnline{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOnline", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOnline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
 	}
@@ -39,7 +39,7 @@ func (mp *CoAPProvider) OnDeviceOffline(deviceid uint64) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyGetOffline{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOffline", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOffline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -51,7 +51,7 @@ func (mp *CoAPProvider) OnDeviceHeartBeat(deviceid uint64) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyHeartBeat{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.HeartBeat", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.HeartBeat", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
 	}

+ 1 - 1
services/controller/controller.go

@@ -131,7 +131,7 @@ func getAccessRPCHost(deviceid uint64) (string, error) {
 		Id: deviceid,
 	}
 	reply := &rpcs.ReplyGetDeviceOnlineStatus{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetDeviceOnlineStatus", args, reply)
 	if err != nil {
 		return "", err
 	}

+ 0 - 7
services/fileaccess/fileaccess.go

@@ -15,8 +15,6 @@ import (
 	"time"
 
 	"github.com/garyburd/redigo/redis"
-	"github.com/opentracing/opentracing-go"
-	"github.com/opentracing/opentracing-go/ext"
 )
 
 const checkTimeOut = 30 * time.Minute
@@ -157,10 +155,6 @@ func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmpt
 // MoveFile move a file to new path
 // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
 func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
-	spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
-	span := opentracing.StartSpan("movefile", ext.RPCServerOption(spanCtx))
-	defer span.Finish()
-	span.LogKV("tmpfile", args.Source)
 	// check source file
 	reg := regexp.MustCompile(`tmp/\$*.*`)
 	src := reg.FindString(args.Source)
@@ -201,7 +195,6 @@ func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile
 			fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
 		}
 		reply.FilePath = fpath
-		span.LogKV("tartgetfile", fpath)
 		//delete src
 		os.Remove(src)
 		return nil

+ 5 - 4
services/httpaccess/actions.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"context"
 	"encoding/hex"
 	"errors"
 	"math/rand"
@@ -51,7 +52,7 @@ type DeviceAuthArgs struct {
 // RegisterDevice 设备激活
 func RegisterDevice(args DeviceRegisterArgs, r render.Render) {
 	server.Log.Printf("ACTION RegisterDevice, args:: %v ", args)
-	span := opentracing.StartSpan("/v1/devices/registration")
+	span, ctx := opentracing.StartSpanFromContext(context.Background(), "RegisterDevice")
 	defer span.Finish()
 
 	rpcargs := &rpcs.ArgsDeviceRegister{
@@ -68,7 +69,7 @@ func RegisterDevice(args DeviceRegisterArgs, r render.Render) {
 		opentracing.TextMap,
 		opentracing.TextMapCarrier(rpcargs.SpanCarrier),
 	)
-	err := server.RPCCallByName("registry", "Registry.RegisterDevice", rpcargs, device)
+	err := server.RPCCallByName(ctx, "registry", "Registry.RegisterDevice", rpcargs, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
 		return
@@ -96,7 +97,7 @@ func AuthDevice(args DeviceAuthArgs, r render.Render) {
 	}
 	arg.SpanCarrier = make(map[string]string)
 
-	span := opentracing.StartSpan("/v1/devices/authentication")
+	span, ctx := opentracing.StartSpanFromContext(context.Background(), "AuthDevice")
 	defer span.Finish()
 
 	ext.SpanKindRPCClient.Set(span)
@@ -105,7 +106,7 @@ func AuthDevice(args DeviceAuthArgs, r render.Render) {
 		opentracing.TextMap,
 		opentracing.TextMapCarrier(arg.SpanCarrier),
 	)
-	err := server.RPCCallByName("registry", "Registry.FindDeviceById", arg, device)
+	err := server.RPCCallByName(ctx, "registry", "Registry.FindDeviceById", arg, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return

+ 5 - 9
services/knowoapi/controllers/application.go

@@ -1,6 +1,7 @@
 package controllers
 
 import (
+	"context"
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
@@ -38,16 +39,11 @@ func (a *AppController) Post() {
 		}
 		args.SpanCarrier = make(map[string]string)
 		reply := &rpcs.ReplyMoveFile{}
-		span := opentracing.StartSpan(a.Ctx.Path())
+		span, ctx := opentracing.StartSpanFromContext(context.Background(), a.Ctx.Path())
 		defer span.Finish()
 		ext.SpanKindRPCClient.Set(span)
 		ext.HTTPMethod.Set(span, a.Ctx.Method())
-		span.Tracer().Inject(
-			span.Context(),
-			opentracing.TextMap,
-			opentracing.TextMapCarrier(args.SpanCarrier),
-		)
-		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
+		err := server.RPCCallByName(ctx, "fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)
 			app.AppIcon = ""
@@ -131,7 +127,7 @@ func (a *AppController) Put() {
 			Target: "application",
 		}
 		reply := &rpcs.ReplyMoveFile{}
-		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
+		err := server.RPCCallByName(nil, "fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)
 			app.AppIcon = ""
@@ -145,7 +141,7 @@ func (a *AppController) Put() {
 			FileName: old.AppIcon,
 		}
 		reply := &rpcs.ReplyEmptyResult{}
-		server.RPCCallByName("fileaccess", "FileAccess.DeleteFile", arg, reply)
+		server.RPCCallByName(nil, "fileaccess", "FileAccess.DeleteFile", arg, reply)
 	}
 	result, err := a.Service.Update(app)
 	if err != nil {

+ 5 - 10
services/knowoapi/controllers/produdct.go

@@ -1,6 +1,7 @@
 package controllers
 
 import (
+	"context"
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
@@ -38,17 +39,11 @@ func (a *ProductController) Post() {
 		args.SpanCarrier = make(map[string]string)
 		reply := &rpcs.ReplyMoveFile{}
 
-		span := opentracing.StartSpan(a.Ctx.Path())
+		span, ctx := opentracing.StartSpanFromContext(context.Background(), a.Ctx.Path())
 		defer span.Finish()
 		ext.SpanKindRPCClient.Set(span)
 		ext.HTTPMethod.Set(span, a.Ctx.Method())
-		span.Tracer().Inject(
-			span.Context(),
-			opentracing.TextMap,
-			opentracing.TextMapCarrier(args.SpanCarrier),
-		)
-
-		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
+		err := server.RPCCallByName(ctx, "fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)
 			product.ProductImage = ""
@@ -102,7 +97,7 @@ func (a *ProductController) Put() {
 			Target: "product",
 		}
 		reply := &rpcs.ReplyMoveFile{}
-		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
+		err := server.RPCCallByName(nil, "fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)
 			product.ProductImage = ""
@@ -117,7 +112,7 @@ func (a *ProductController) Put() {
 			FileName: old.ProductImage,
 		}
 		reply := &rpcs.ReplyEmptyResult{}
-		server.RPCCallByName("fileaccess", "FileAccess.DeleteFile", arg, reply)
+		server.RPCCallByName(nil, "fileaccess", "FileAccess.DeleteFile", arg, reply)
 	}
 	pro, err := a.Service.Update(product)
 	if err != nil {

+ 6 - 6
services/mqttaccess/mqtt_provider.go

@@ -18,7 +18,7 @@ func (mp *MQTTProvider) ValidateDeviceToken(deviceid uint64, token []byte) error
 		AccessToken: token,
 	}
 	reply := rpcs.ReplyValidateDeviceAccessToken{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
 	if err != nil {
 		server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
 		return err
@@ -27,7 +27,7 @@ func (mp *MQTTProvider) ValidateDeviceToken(deviceid uint64, token []byte) error
 }
 func (mp *MQTTProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
 	reply := rpcs.ReplyGetOnline{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOnline", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOnline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
 	}
@@ -39,7 +39,7 @@ func (mp *MQTTProvider) OnDeviceOffline(deviceid uint64) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyGetOffline{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOffline", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.GetOffline", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -51,7 +51,7 @@ func (mp *MQTTProvider) OnDeviceHeartBeat(deviceid uint64) error {
 		Id: deviceid,
 	}
 	reply := rpcs.ReplyHeartBeat{}
-	err := server.RPCCallByName("devicemanager", "DeviceManager.HeartBeat", args, &reply)
+	err := server.RPCCallByName(nil, "devicemanager", "DeviceManager.HeartBeat", args, &reply)
 	if err != nil {
 		server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
 	}
@@ -82,7 +82,7 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid uint64, msgtype string, message
 			Timestamp: data.Head.Timestamp,
 			Subdata:   data.SubData,
 		}
-		err = server.RPCCallByName("controller", "Controller.OnStatus", args, &reply)
+		err = server.RPCCallByName(nil, "controller", "Controller.OnStatus", args, &reply)
 		if err != nil {
 			server.Log.Errorf("device report status error. args: %v, error: %v", args, err)
 			return
@@ -104,7 +104,7 @@ func (mp *MQTTProvider) OnDeviceMessage(deviceid uint64, msgtype string, message
 			Priority:  event.Head.Priority,
 			Params:    event.Params,
 		}
-		err = server.RPCCallByName("controller", "Controller.OnEvent", args, &reply)
+		err = server.RPCCallByName(nil, "controller", "Controller.OnEvent", args, &reply)
 		if err != nil {
 			server.Log.Errorf("device on event error. args: %v, error: %v", args, err)
 			return

+ 1 - 1
tools/pdcfg/application.go

@@ -54,7 +54,7 @@ func addApplication() error {
 
 	reply := &models.Application{}
 
-	err = server.RPCCallByName("registry", "Registry.SaveApplication", &args, reply)
+	err = server.RPCCallByName(nil, "registry", "Registry.SaveApplication", &args, reply)
 	if err != nil {
 		return err
 	}

+ 1 - 1
tools/pdcfg/product.go

@@ -66,7 +66,7 @@ func addProduct() error {
 
 	reply := &models.Product{}
 
-	err = server.RPCCallByName("registry", "Registry.SaveProduct", &args, reply)
+	err = server.RPCCallByName(nil, "registry", "Registry.SaveProduct", &args, reply)
 	if err != nil {
 		return err
 	}

+ 1 - 1
tools/pdcfg/vendor.go

@@ -31,7 +31,7 @@ func addVendor() error {
 
 	reply := &models.Vendor{}
 
-	err = server.RPCCallByName("registry", "Registry.SaveVendor", &args, reply)
+	err = server.RPCCallByName(nil, "registry", "Registry.SaveVendor", &args, reply)
 	if err != nil {
 		return err
 	}