Selaa lähdekoodia

增加opentracing服务追踪

lijian 6 vuotta sitten
vanhempi
commit
6d77d9342a

+ 5 - 7
pkg/cache/cache.go

@@ -1,8 +1,6 @@
 package cache
 
-
-
-//return status of chache
+// CacheStatus return status of chache
 type CacheStatus struct {
 	Gets        int64
 	Hits        int64
@@ -10,10 +8,10 @@ type CacheStatus struct {
 	CurrentSize int
 }
 
-//this is a interface which defines some common functions
-type Cache interface{
+// Cache this is a interface which defines some common functions
+type Cache interface {
 	Set(key string, value interface{})
 	Get(key string) (interface{}, bool)
 	Delete(key string)
-	Status()(*CacheStatus)
-}
+	Status() *CacheStatus
+}

+ 7 - 5
pkg/cache/memcache.go

@@ -24,7 +24,7 @@ type entry struct {
 	value interface{}
 }
 
-// If maxItemSize is zero, the cache has no limit.
+// NewMemCache If maxItemSize is zero, the cache has no limit.
 //if maxItemSize is not zero, when cache's size beyond maxItemSize,start to swap
 func NewMemCache(maxItemSize int) *MemCache {
 	return &MemCache{
@@ -34,8 +34,8 @@ func NewMemCache(maxItemSize int) *MemCache {
 	}
 }
 
-//return the status of cache
-func (c *MemCache) Status() *CacheStatus{
+// Status return the status of cache
+func (c *MemCache) Status() *CacheStatus {
 	c.mutex.RLock()
 	defer c.mutex.RUnlock()
 	return &CacheStatus{
@@ -46,7 +46,7 @@ func (c *MemCache) Status() *CacheStatus{
 	}
 }
 
-//get value with key
+//Get value with key
 func (c *MemCache) Get(key string) (interface{}, bool) {
 	c.mutex.RLock()
 	defer c.mutex.RUnlock()
@@ -58,7 +58,7 @@ func (c *MemCache) Get(key string) (interface{}, bool) {
 	return nil, false
 }
 
-//set a value with key
+//Set a value with key
 func (c *MemCache) Set(key string, value interface{}) {
 	c.mutex.Lock()
 	defer c.mutex.Unlock()
@@ -80,6 +80,7 @@ func (c *MemCache) Set(key string, value interface{}) {
 	}
 }
 
+// Delete a value with key
 func (c *MemCache) Delete(key string) {
 	c.mutex.Lock()
 	defer c.mutex.Unlock()
@@ -94,6 +95,7 @@ func (c *MemCache) Delete(key string) {
 	}
 }
 
+// RemoveOldest remove oldest key
 func (c *MemCache) RemoveOldest() {
 	if c.cache == nil {
 		return

+ 4 - 6
pkg/cache/memcache_test.go

@@ -25,25 +25,23 @@ var getTests = []struct {
 }
 
 func TestSet(t *testing.T) {
-	var cache Cache
-	cache = NewMemCache(0)
+	cache := NewMemCache(0)
 	values := []string{"test1", "test2", "test3"}
 	key := "key1"
 	for _, v := range values {
 		cache.Set(key, v)
 		val, ok := cache.Get(key)
-		if !ok{
+		if !ok {
 			t.Fatalf("expect key:%v ,value:%v", key, v)
 		} else if ok && val != v {
-			t.Fatalf("expect value:%v, get value:%v", key, v, val)
+			t.Fatalf("key :%v,expect value:%v, get value:%v", key, v, val)
 		}
 		t.Logf("value:%v ", val)
 	}
 }
 
 func TestGet(t *testing.T) {
-	var cache Cache
-	cache = NewMemCache(0)
+	cache := NewMemCache(0)
 	for _, tt := range getTests {
 		cache.Set(tt.keyToAdd, 1234)
 		val, ok := cache.Get(tt.keyToGet)

+ 1 - 0
pkg/coap/error.go

@@ -2,6 +2,7 @@ package coap
 
 import "errors"
 
+// define errors
 var (
 	ErrInvalidTokenLen   = errors.New("invalid token length")
 	ErrOptionTooLong     = errors.New("option is too long")

+ 4 - 1
pkg/coap/manager.go

@@ -27,12 +27,14 @@ const (
 	subCommandTopic = "c"
 )
 
+// Manager manager
 type Manager struct {
 	queue        chan *Request
 	Provider     Provider
 	workersCount int32
 }
 
+// NewManager new manager
 func NewManager(p Provider) *Manager {
 	return &Manager{
 		Provider: p,
@@ -40,6 +42,7 @@ func NewManager(p Provider) *Manager {
 	}
 }
 
+// Handler udp handler
 func (m *Manager) Handler(conn *net.UDPConn) {
 	buf := make([]byte, maxPktlen)
 	for {
@@ -103,7 +106,7 @@ func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
 // coap://endpoint/$DEVICE_ID/s
 func (m *Manager) serve(w *Request) {
 	msg := w.Msg
-	server.Log.Debugf("get packet:%#v", msg)
+	server.Log.Debugf("get packet:%#v, workers count :%d", msg, m.workersCount)
 	if msg.IsConfirmable() && len(msg.Path()) > 1 {
 		token := msg.GetToken()
 		// TODO:想别的deviceid的

+ 13 - 8
pkg/coap/message.go

@@ -261,7 +261,6 @@ const (
 type Message interface {
 	Encode() ([]byte, error)
 	Decode(data []byte) error
-	AllOptions() options
 	Option(opid OptionID) interface{}
 	Path() []string
 	PathString() string
@@ -287,16 +286,22 @@ type BaseMessage struct {
 	Opts      options
 }
 
+// GetToken get token
 func (m *BaseMessage) GetToken() []byte {
 	return m.Token
 }
+
+// GetMessageID get message id
 func (m *BaseMessage) GetMessageID() uint16 {
 	return m.MessageID
 }
+
+// GetCode get code
 func (m *BaseMessage) GetCode() COAPCode {
 	return m.Code
 }
 
+// Encode 消息打包
 func (m *BaseMessage) Encode() ([]byte, error) {
 	tmpbuf := []byte{0, 0}
 	binary.BigEndian.PutUint16(tmpbuf, m.MessageID)
@@ -362,6 +367,7 @@ func (m *BaseMessage) Encode() ([]byte, error) {
 	return buf.Bytes(), nil
 }
 
+// Decode 消息解包
 func (m *BaseMessage) Decode(data []byte) error {
 	if len(data) < 4 {
 		return errors.New("short packet")
@@ -449,15 +455,12 @@ func (m *BaseMessage) Decode(data []byte) error {
 	return nil
 }
 
-// AllOptions get message opts
-func (m *BaseMessage) AllOptions() options {
-	return m.Opts
-}
-
 // IsConfirmable 如果是CON类型的消息类型,返回true
 func (m *BaseMessage) IsConfirmable() bool {
 	return m.Type == CON
 }
+
+// GetPayload get payload
 func (m *BaseMessage) GetPayload() []byte {
 	return m.Payload
 }
@@ -561,9 +564,9 @@ func parseOptionValue(optionID OptionID, valueBuf []byte) interface{} {
 		intValue := decodeInt(valueBuf)
 		if optionID == ContentFormat || optionID == Accept {
 			return MediaType(intValue)
-		} else {
-			return intValue
 		}
+		return intValue
+
 	case valueString:
 		return string(valueBuf)
 	case valueOpaque, valueEmpty:
@@ -572,6 +575,8 @@ func parseOptionValue(optionID OptionID, valueBuf []byte) interface{} {
 	// Skip unrecognized options (should never be reached)
 	return nil
 }
+
+// ParseMessage parse []byte to message
 func ParseMessage(data []byte) (Message, error) {
 	rv := &BaseMessage{}
 	return rv, rv.Decode(data)

+ 1 - 0
pkg/coap/provider.go

@@ -2,6 +2,7 @@ package coap
 
 import "sparrow/pkg/rpcs"
 
+// Provider 处理设备业务逻辑
 type Provider interface {
 	ValidateDeviceToken(deviceid uint64, token []byte) error
 	OnDeviceOnline(args rpcs.ArgsGetOnline) error

+ 1 - 0
pkg/coap/request.go

@@ -2,6 +2,7 @@ package coap
 
 import "net"
 
+// Request a client request
 type Request struct {
 	Msg  Message
 	Conn *net.UDPConn

+ 7 - 4
pkg/generator/key_gen.go

@@ -15,6 +15,7 @@ const (
 	maxEncodeLen = 32
 )
 
+// KeyGenerator key generator
 type KeyGenerator struct {
 	AESKey string
 }
@@ -55,16 +56,18 @@ func decryptAESCFB(msg, key []byte) ([]byte, error) {
 	return msg, nil
 }
 
+// NewKeyGenerator create a key generator
 func NewKeyGenerator(key string) (*KeyGenerator, error) {
 	l := len(key)
 	if l != 16 && l != 24 && l != 32 {
-		return nil, errors.New("invalid aes key length, should be 16, 24 or 32 bytes.")
+		return nil, errors.New("invalid aes key length, should be 16, 24 or 32 bytes")
 	}
 	return &KeyGenerator{
 		AESKey: key,
 	}, nil
 }
 
+// GenRandomKey get random key
 func (g *KeyGenerator) GenRandomKey(id int64) (string, error) {
 	buf := make([]byte, maxEncodeLen-binary.Size(id)-aes.BlockSize)
 	if _, err := io.ReadFull(rand.Reader, buf); err != nil {
@@ -84,8 +87,8 @@ func (g *KeyGenerator) GenRandomKey(id int64) (string, error) {
 	return hex.EncodeToString(binkey), nil
 }
 
-// get id from encrypt strings
-func (g *KeyGenerator) DecodeIdFromRandomKey(encrypted string) (int64, error) {
+// DecodeIDFromRandomKey get id from encrypt strings
+func (g *KeyGenerator) DecodeIDFromRandomKey(encrypted string) (int64, error) {
 	buf, err := hex.DecodeString(encrypted)
 	if err != nil {
 		return 0, err
@@ -99,7 +102,7 @@ func (g *KeyGenerator) DecodeIdFromRandomKey(encrypted string) (int64, error) {
 	var id int64
 
 	if len(raw) > maxEncodeLen || len(raw) < maxEncodeLen-aes.BlockSize-binary.Size(id) {
-		return 0, errors.New("invalid key format.")
+		return 0, errors.New("invalid key format")
 	}
 
 	binbuf := bytes.NewBuffer(raw[maxEncodeLen-aes.BlockSize-binary.Size(id):])

+ 1 - 1
pkg/generator/password_gen.go

@@ -9,7 +9,7 @@ const (
 	ranPasswordByteLength = 24
 )
 
-// gen random base64 encoded password
+// GenRandomPassword gen random base64 encoded password
 func GenRandomPassword() (string, error) {
 	ranbuf := make([]byte, ranPasswordByteLength)
 	_, err := rand.Read(ranbuf)

+ 1 - 1
pkg/models/application.go

@@ -1,4 +1,3 @@
-// application is app who will use the cloud api
 package models
 
 import (
@@ -8,6 +7,7 @@ import (
 )
 
 // Application 代指一个应用程序
+// application is app who will use the cloud api
 type Application struct {
 	gorm.Model
 	// App-Key for api

+ 5 - 0
pkg/mongo/recorder.go

@@ -5,12 +5,14 @@ import (
 	"labix.org/v2/mgo/bson"
 )
 
+// Recorder record device data
 type Recorder struct {
 	session    *mgo.Session
 	set        string
 	collection string
 }
 
+// NewRecorder create a recorder
 func NewRecorder(host string, set string, collection string) (*Recorder, error) {
 	sess, err := mgo.Dial(host)
 	if err != nil {
@@ -26,6 +28,7 @@ func NewRecorder(host string, set string, collection string) (*Recorder, error)
 	}, nil
 }
 
+// Insert insert data
 func (r *Recorder) Insert(args interface{}) error {
 	dbHandler := r.session.DB(r.set).C(r.collection)
 
@@ -37,6 +40,7 @@ func (r *Recorder) Insert(args interface{}) error {
 	return nil
 }
 
+// FindLatest find latest device data
 func (r *Recorder) FindLatest(deviceid uint64, record interface{}) error {
 	dbHandler := r.session.DB(r.set).C(r.collection)
 	err := dbHandler.Find(bson.M{
@@ -47,6 +51,7 @@ func (r *Recorder) FindLatest(deviceid uint64, record interface{}) error {
 	return err
 }
 
+// FindByTimestamp find by timestmp
 func (r *Recorder) FindByTimestamp(deviceid uint64, start uint64, end uint64, records interface{}) error {
 	dbHandler := r.session.DB(r.set).C(r.collection)
 	err := dbHandler.Find(bson.M{

+ 6 - 2
pkg/mongo/recorder_test.go

@@ -1,10 +1,10 @@
 package mongo
 
 import (
+	"reflect"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/tlv"
-	"reflect"
 	"testing"
 	"time"
 )
@@ -24,7 +24,11 @@ func TestRecorder(t *testing.T) {
 	timestamp := uint64(time.Now().Unix() * 1000)
 
 	subdata := protocol.SubData{
-		Head:   protocol.SubDataHead{1, 2, 3},
+		Head: protocol.SubDataHead{
+			SubDeviceid: 1,
+			PropertyNum: 2,
+			ParamsCount: 3,
+		},
 		Params: tlvs,
 	}
 

+ 5 - 0
pkg/mqtt/broker.go

@@ -5,10 +5,12 @@ import (
 	"time"
 )
 
+// Broker a mqtt broker
 type Broker struct {
 	mgr *Manager
 }
 
+// NewBroker create new broker
 func NewBroker(p Provider) *Broker {
 	// manager
 	mgr := NewManager(p)
@@ -18,10 +20,12 @@ func NewBroker(p Provider) *Broker {
 	return handler
 }
 
+// Handle tcp conn handle
 func (b *Broker) Handle(conn net.Conn) {
 	b.mgr.NewConn(conn)
 }
 
+// SendMessageToDevice send message to device
 func (b *Broker) SendMessageToDevice(deviceid uint64, msgtype string, message []byte, timeout time.Duration) error {
 	msg := &Publish{}
 	msg.Header.QosLevel = QosAtLeastOnce
@@ -30,6 +34,7 @@ func (b *Broker) SendMessageToDevice(deviceid uint64, msgtype string, message []
 	return b.mgr.PublishMessage2Device(deviceid, msg, timeout)
 }
 
+// GetToken get device token with device id
 func (b *Broker) GetToken(deviceid uint64) ([]byte, error) {
 	return b.mgr.GetToken(deviceid)
 }

+ 38 - 33
pkg/mqtt/connection.go

@@ -3,35 +3,39 @@ package mqtt
 import (
 	"encoding/hex"
 	"errors"
+	"net"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
-	"net"
 	"time"
 )
 
+// const def
 const (
 	SendChanLen      = 16
 	defaultKeepAlive = 30
 )
 
+// ResponseType response type
 type ResponseType struct {
 	SendTime    uint8
 	PublishType uint8
 	DataType    string
 }
 
+// Connection client connection
 type Connection struct {
 	Mgr             *Manager
-	DeviceId        uint64
+	DeviceID        uint64
 	Conn            net.Conn
 	SendChan        chan Message
-	MessageId       uint16
+	MessageID       uint16
 	MessageWaitChan map[uint16]chan error
 	KeepAlive       uint16
 	LastHbTime      int64
 	Token           []byte
 }
 
+// NewConnection create a connection
 func NewConnection(conn net.Conn, mgr *Manager) *Connection {
 	sendchan := make(chan Message, SendChanLen)
 	c := &Connection{
@@ -48,6 +52,7 @@ func NewConnection(conn net.Conn, mgr *Manager) *Connection {
 	return c
 }
 
+// Submit submit a message to send chan
 func (c *Connection) Submit(msg Message) {
 	if c.Conn != nil {
 		c.SendChan <- msg
@@ -59,8 +64,8 @@ func (c *Connection) Publish(msg Message, timeout time.Duration) error {
 	server.Log.Debugf("publishing message : %v, timeout %v", msg, timeout)
 
 	message := msg.(*Publish)
-	message.MessageId = c.MessageId
-	c.MessageId++
+	message.MessageID = c.MessageID
+	c.MessageID++
 	c.Submit(message)
 
 	ch := make(chan error)
@@ -70,15 +75,15 @@ func (c *Connection) Publish(msg Message, timeout time.Duration) error {
 		return nil
 	}
 
-	c.MessageWaitChan[message.MessageId] = ch
+	c.MessageWaitChan[message.MessageID] = ch
 	// wait for timeout and
 	go func() {
 		timer := time.NewTimer(timeout)
 		<-timer.C
-		waitCh, exist := c.MessageWaitChan[message.MessageId]
+		waitCh, exist := c.MessageWaitChan[message.MessageID]
 		if exist {
 			waitCh <- errors.New("timeout pushlishing message.")
-			delete(c.MessageWaitChan, message.MessageId)
+			delete(c.MessageWaitChan, message.MessageID)
 			close(waitCh)
 		}
 	}()
@@ -87,18 +92,18 @@ func (c *Connection) Publish(msg Message, timeout time.Duration) error {
 	return err
 }
 
-func (c *Connection) confirmPublish(messageid uint16) {
-	waitCh, exist := c.MessageWaitChan[messageid]
+func (c *Connection) confirmPublish(MessageID uint16) {
+	waitCh, exist := c.MessageWaitChan[MessageID]
 	if exist {
 		waitCh <- nil
-		delete(c.MessageWaitChan, messageid)
+		delete(c.MessageWaitChan, MessageID)
 		close(waitCh)
 	}
 }
 
 func (c *Connection) ValidateToken(token []byte) error {
 
-	err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceId, token)
+	err := c.Mgr.Provider.ValidateDeviceToken(c.DeviceID, token)
 	if err != nil {
 		return err
 	}
@@ -109,12 +114,12 @@ func (c *Connection) ValidateToken(token []byte) error {
 }
 
 func (c *Connection) Close() {
-	deviceid := c.DeviceId
-	server.Log.Infof("closing connection of device %v", deviceid)
+	DeviceID := c.DeviceID
+	server.Log.Infof("closing connection of device %v", DeviceID)
 	if c.Conn != nil {
 		c.Conn.Close()
 		c.Conn = nil
-		c.Mgr.Provider.OnDeviceOffline(deviceid)
+		c.Mgr.Provider.OnDeviceOffline(DeviceID)
 	}
 	if c.SendChan != nil {
 		close(c.SendChan)
@@ -148,20 +153,20 @@ func (c *Connection) RcvMsgFromClient() {
 				ret = RetCodeUnacceptableProtocolVersion
 			}
 
-			if len(msg.ClientId) < 1 || len(msg.ClientId) > 23 {
-				server.Log.Warn("invalid clientid length: %d", len(msg.ClientId))
+			if len(msg.ClientID) < 1 || len(msg.ClientID) > 23 {
+				server.Log.Warn("invalid ClientID length: %d", len(msg.ClientID))
 				ret = RetCodeIdentifierRejected
 				c.Close()
 				return
 			}
 
-			deviceid, err := ClientIdToDeviceId(msg.ClientId)
+			DeviceID, err := ClientIDToDeviceID(msg.ClientID)
 			if err != nil {
 				server.Log.Warn("invalid Identify: %d", ret)
 				c.Close()
 				return
 			}
-			c.DeviceId = deviceid
+			c.DeviceID = DeviceID
 
 			token, err := hex.DecodeString(msg.Password)
 			if err != nil {
@@ -185,13 +190,13 @@ func (c *Connection) RcvMsgFromClient() {
 			}
 
 			args := rpcs.ArgsGetOnline{
-				Id:                c.DeviceId,
+				Id:                c.DeviceID,
 				ClientIP:          host,
 				AccessRPCHost:     server.GetRPCHost(),
 				HeartbeatInterval: uint32(c.KeepAlive),
 			}
 
-			c.Mgr.AddConn(c.DeviceId, c)
+			c.Mgr.AddConn(c.DeviceID, c)
 			connack := &ConnAck{
 				ReturnCode: ret,
 			}
@@ -206,23 +211,23 @@ func (c *Connection) RcvMsgFromClient() {
 				return
 			}
 
-			server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceId, host)
+			server.Log.Infof("device %d, connected to server now, host: %s", c.DeviceID, host)
 
 		case *Publish:
 			server.Log.Infof("%s, publish topic: %s", host, msg.TopicName)
 
-			c.Mgr.PublishMessage2Server(c.DeviceId, msg)
+			c.Mgr.PublishMessage2Server(c.DeviceID, msg)
 			if msg.QosLevel.IsAtLeastOnce() {
 				server.Log.Infof("publish ack send now")
-				publishack := &PubAck{MessageId: msg.MessageId}
+				publishack := &PubAck{MessageID: msg.MessageID}
 				c.Submit(publishack)
 			} else if msg.QosLevel.IsExactlyOnce() {
 				server.Log.Infof("publish Rec send now")
-				publishRec := &PubRec{MessageId: msg.MessageId}
+				publishRec := &PubRec{MessageID: msg.MessageID}
 				c.Submit(publishRec)
 			}
 
-			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
+			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
 			if err != nil {
 				server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
 				c.Close()
@@ -231,8 +236,8 @@ func (c *Connection) RcvMsgFromClient() {
 
 		case *PubAck:
 			server.Log.Infof("%s, comes publish ack", host)
-			c.confirmPublish(msg.MessageId)
-			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
+			c.confirmPublish(msg.MessageID)
+			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
 			if err != nil {
 				server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
 				c.Close()
@@ -241,18 +246,18 @@ func (c *Connection) RcvMsgFromClient() {
 
 		case *PubRec:
 			server.Log.Infof("%s, comes publish rec", host)
-			publishRel := &PubRel{MessageId: msg.MessageId}
+			publishRel := &PubRel{MessageID: msg.MessageID}
 			c.Submit(publishRel)
 
 		case *PubRel:
 			server.Log.Infof("%s, comes publish rel", host)
-			publishCom := &PubComp{MessageId: msg.MessageId}
+			publishCom := &PubComp{MessageID: msg.MessageID}
 			c.Submit(publishCom)
 
 		case *PubComp:
 			server.Log.Infof("%s, comes publish comp", host)
-			c.confirmPublish(msg.MessageId)
-			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
+			c.confirmPublish(msg.MessageID)
+			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
 			if err != nil {
 				server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
 				c.Close()
@@ -262,7 +267,7 @@ func (c *Connection) RcvMsgFromClient() {
 		case *PingReq:
 			server.Log.Infof("%s, ping req comes", host)
 			pingrsp := &PingResp{}
-			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceId)
+			err := c.Mgr.Provider.OnDeviceHeartBeat(c.DeviceID)
 			if err != nil {
 				server.Log.Warnf("%s, heartbeat set error %s, close now...", host, err)
 				c.Close()

+ 10 - 10
pkg/mqtt/manager.go

@@ -1,8 +1,8 @@
 package mqtt
 
 import (
-	"sparrow/pkg/server"
 	"net"
+	"sparrow/pkg/server"
 	"sync"
 	"time"
 )
@@ -49,34 +49,34 @@ func (m *Manager) DelConn(id uint64) {
 	m.CxtMutex.Unlock()
 }
 
-func (m *Manager) GetToken(deviceid uint64) ([]byte, error) {
+func (m *Manager) GetToken(DeviceID uint64) ([]byte, error) {
 	m.CxtMutex.RLock()
-	con, exist := m.IdToConn[deviceid]
+	con, exist := m.IdToConn[DeviceID]
 	m.CxtMutex.RUnlock()
 	if !exist {
-		return nil, errorf("device not exist: %v[%v]", deviceid, deviceid)
+		return nil, errorf("device not exist: %v[%v]", DeviceID, DeviceID)
 	}
 
 	return con.Token, nil
 }
 
-func (m *Manager) PublishMessage2Device(deviceid uint64, msg *Publish, timeout time.Duration) error {
+func (m *Manager) PublishMessage2Device(DeviceID uint64, msg *Publish, timeout time.Duration) error {
 	m.CxtMutex.RLock()
-	con, exist := m.IdToConn[deviceid]
+	con, exist := m.IdToConn[DeviceID]
 	m.CxtMutex.RUnlock()
 	if !exist {
-		return errorf("device not exist: %v", deviceid)
+		return errorf("device not exist: %v", DeviceID)
 	}
 
 	return con.Publish(msg, timeout)
 }
 
-func (m *Manager) PublishMessage2Server(deviceid uint64, msg *Publish) error {
+func (m *Manager) PublishMessage2Server(DeviceID uint64, msg *Publish) error {
 	topic := msg.TopicName
 
 	payload := msg.Payload.(BytesPayload)
 
-	m.Provider.OnDeviceMessage(deviceid, topic, payload)
+	m.Provider.OnDeviceMessage(DeviceID, topic, payload)
 	return nil
 }
 
@@ -93,7 +93,7 @@ func (m *Manager) CleanWorker() {
 			if uint16(curTime-con.LastHbTime) > uint16(3*con.KeepAlive/2) {
 				server.Log.Infof("connection %v inactive , removing", con)
 				con.Close()
-				delete(m.IdToConn, con.DeviceId)
+				delete(m.IdToConn, con.DeviceID)
 			}
 		}
 

+ 35 - 35
pkg/mqtt/message.go

@@ -172,7 +172,7 @@ type Connect struct {
 	CleanSession               bool
 	WillQos                    TagQosLevel
 	KeepAliveTimer             uint16
-	ClientId                   string
+	ClientID                   string
 	WillTopic, WillMessage     string
 	UsernameFlag, PasswordFlag bool
 	Username, Password         string
@@ -196,7 +196,7 @@ func (msg *Connect) Encode(w io.Writer) (err error) {
 	setUint8(msg.ProtocolVersion, buf)
 	buf.WriteByte(flags)
 	setUint16(msg.KeepAliveTimer, buf)
-	setString(msg.ClientId, buf)
+	setString(msg.ClientID, buf)
 	if msg.WillFlag {
 		setString(msg.WillTopic, buf)
 		setString(msg.WillMessage, buf)
@@ -228,7 +228,7 @@ func (msg *Connect) Decode(r io.Reader, hdr Header, packetRemaining int32) (err
 	if err != nil {
 		return err
 	}
-	clientId, err := getString(r, &packetRemaining)
+	ClientID, err := getString(r, &packetRemaining)
 	if err != nil {
 		return err
 	}
@@ -244,7 +244,7 @@ func (msg *Connect) Decode(r io.Reader, hdr Header, packetRemaining int32) (err
 		WillFlag:        flags&0x04 > 0,
 		CleanSession:    flags&0x02 > 0,
 		KeepAliveTimer:  keepAliveTimer,
-		ClientId:        clientId,
+		ClientID:        ClientID,
 	}
 
 	if msg.WillFlag {
@@ -320,7 +320,7 @@ func (msg *ConnAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err
 type Publish struct {
 	Header
 	TopicName string
-	MessageId uint16
+	MessageID uint16
 	Payload   Payload
 }
 
@@ -329,7 +329,7 @@ func (msg *Publish) Encode(w io.Writer) (err error) {
 
 	setString(msg.TopicName, buf)
 	if msg.Header.QosLevel.HasId() {
-		setUint16(msg.MessageId, buf)
+		setUint16(msg.MessageID, buf)
 	}
 
 	if err = msg.Payload.WritePayload(buf); err != nil {
@@ -351,7 +351,7 @@ func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32) (err
 		return err
 	}
 	if msg.Header.QosLevel.HasId() {
-		msg.MessageId, err = getUint16(r, &packetRemaining)
+		msg.MessageID, err = getUint16(r, &packetRemaining)
 		if err != nil {
 			return err
 		}
@@ -366,67 +366,67 @@ func (msg *Publish) Decode(r io.Reader, hdr Header, packetRemaining int32) (err
 // PubAck represents an MQTT PUBACK message.
 type PubAck struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 }
 
 func (msg *PubAck) Encode(w io.Writer) error {
-	return encodeAckCommon(w, &msg.Header, msg.MessageId, MsgPubAck)
+	return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubAck)
 }
 
 func (msg *PubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
-	return decodeAckCommon(r, packetRemaining, &msg.MessageId)
+	return decodeAckCommon(r, packetRemaining, &msg.MessageID)
 }
 
 // PubRec represents an MQTT PUBREC message.
 type PubRec struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 }
 
 func (msg *PubRec) Encode(w io.Writer) error {
-	return encodeAckCommon(w, &msg.Header, msg.MessageId, MsgPubRec)
+	return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubRec)
 }
 
 func (msg *PubRec) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
-	return decodeAckCommon(r, packetRemaining, &msg.MessageId)
+	return decodeAckCommon(r, packetRemaining, &msg.MessageID)
 }
 
 // PubRel represents an MQTT PUBREL message.
 type PubRel struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 }
 
 func (msg *PubRel) Encode(w io.Writer) error {
-	return encodeAckCommon(w, &msg.Header, msg.MessageId, MsgPubRel)
+	return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubRel)
 }
 
 func (msg *PubRel) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
-	return decodeAckCommon(r, packetRemaining, &msg.MessageId)
+	return decodeAckCommon(r, packetRemaining, &msg.MessageID)
 }
 
 // PubComp represents an MQTT PUBCOMP message.
 type PubComp struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 }
 
 func (msg *PubComp) Encode(w io.Writer) error {
-	return encodeAckCommon(w, &msg.Header, msg.MessageId, MsgPubComp)
+	return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgPubComp)
 }
 
 func (msg *PubComp) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
-	return decodeAckCommon(r, packetRemaining, &msg.MessageId)
+	return decodeAckCommon(r, packetRemaining, &msg.MessageID)
 }
 
 // Subscribe represents an MQTT SUBSCRIBE message.
 type Subscribe struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 	Topics    []TopicQos
 }
 
@@ -438,7 +438,7 @@ type TopicQos struct {
 func (msg *Subscribe) Encode(w io.Writer) (err error) {
 	buf := new(bytes.Buffer)
 	if msg.Header.QosLevel.HasId() {
-		setUint16(msg.MessageId, buf)
+		setUint16(msg.MessageID, buf)
 	}
 	for _, topicSub := range msg.Topics {
 		setString(topicSub.Topic, buf)
@@ -452,7 +452,7 @@ func (msg *Subscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (er
 	msg.Header = hdr
 
 	if msg.Header.QosLevel.HasId() {
-		msg.MessageId, err = getUint16(r, &packetRemaining)
+		msg.MessageID, err = getUint16(r, &packetRemaining)
 		if err != nil {
 			return err
 		}
@@ -480,13 +480,13 @@ func (msg *Subscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (er
 // SubAck represents an MQTT SUBACK message.
 type SubAck struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 	TopicsQos []TagQosLevel
 }
 
 func (msg *SubAck) Encode(w io.Writer) (err error) {
 	buf := new(bytes.Buffer)
-	setUint16(msg.MessageId, buf)
+	setUint16(msg.MessageID, buf)
 	for i := 0; i < len(msg.TopicsQos); i += 1 {
 		setUint8(uint8(msg.TopicsQos[i]), buf)
 	}
@@ -497,7 +497,7 @@ func (msg *SubAck) Encode(w io.Writer) (err error) {
 func (msg *SubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
 
-	msg.MessageId, err = getUint16(r, &packetRemaining)
+	msg.MessageID, err = getUint16(r, &packetRemaining)
 	if err != nil {
 		return err
 	}
@@ -518,14 +518,14 @@ func (msg *SubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err e
 // Unsubscribe represents an MQTT UNSUBSCRIBE message.
 type Unsubscribe struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 	Topics    []string
 }
 
 func (msg *Unsubscribe) Encode(w io.Writer) (err error) {
 	buf := new(bytes.Buffer)
 	if msg.Header.QosLevel.HasId() {
-		setUint16(msg.MessageId, buf)
+		setUint16(msg.MessageID, buf)
 	}
 	for _, topic := range msg.Topics {
 		setString(topic, buf)
@@ -538,7 +538,7 @@ func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (
 	msg.Header = hdr
 
 	if msg.Header.QosLevel.HasId() {
-		msg.MessageId, err = getUint16(r, &packetRemaining)
+		msg.MessageID, err = getUint16(r, &packetRemaining)
 		if err != nil {
 			return err
 		}
@@ -559,16 +559,16 @@ func (msg *Unsubscribe) Decode(r io.Reader, hdr Header, packetRemaining int32) (
 // UnsubAck represents an MQTT UNSUBACK message.
 type UnsubAck struct {
 	Header
-	MessageId uint16
+	MessageID uint16
 }
 
 func (msg *UnsubAck) Encode(w io.Writer) error {
-	return encodeAckCommon(w, &msg.Header, msg.MessageId, MsgUnsubAck)
+	return encodeAckCommon(w, &msg.Header, msg.MessageID, MsgUnsubAck)
 }
 
 func (msg *UnsubAck) Decode(r io.Reader, hdr Header, packetRemaining int32) (err error) {
 	msg.Header = hdr
-	return decodeAckCommon(r, packetRemaining, &msg.MessageId)
+	return decodeAckCommon(r, packetRemaining, &msg.MessageID)
 }
 
 // PingReq represents an MQTT PINGREQ message.
@@ -619,14 +619,14 @@ func (msg *Disconnect) Decode(r io.Reader, hdr Header, packetRemaining int32) er
 	return nil
 }
 
-func encodeAckCommon(w io.Writer, hdr *Header, messageId uint16, msgType TagMessageType) error {
+func encodeAckCommon(w io.Writer, hdr *Header, MessageID uint16, msgType TagMessageType) error {
 	buf := new(bytes.Buffer)
-	setUint16(messageId, buf)
+	setUint16(MessageID, buf)
 	return writeMessage(w, msgType, hdr, buf, 0)
 }
 
-func decodeAckCommon(r io.Reader, packetRemaining int32, messageId *uint16) (err error) {
-	*messageId, err = getUint16(r, &packetRemaining)
+func decodeAckCommon(r io.Reader, packetRemaining int32, MessageID *uint16) (err error) {
+	*MessageID, err = getUint16(r, &packetRemaining)
 	if err != nil {
 		return err
 	}

+ 1 - 1
pkg/mqtt/utils.go

@@ -28,7 +28,7 @@ func DeviceIdToClientId(deviceid uint64) string {
 	return strconv.FormatUint(deviceid, 16)
 }
 
-func ClientIdToDeviceId(identify string) (uint64, error) {
+func ClientIDToDeviceID(identify string) (uint64, error) {
 	deviceId, err := strconv.ParseUint(identify, 16, 64)
 	if err != nil {
 		return uint64(0), err

+ 4 - 0
pkg/rpcs/common.go

@@ -5,3 +5,7 @@ type ArgsDeviceId struct {
 }
 
 type ReplyEmptyResult struct{}
+
+type ArgsWithSpan struct {
+	SpanCarrier map[string]string
+}

+ 2 - 0
pkg/rpcs/fileaccess.go

@@ -2,6 +2,7 @@ package rpcs
 
 // ArgsMoveFile move file rpc args
 type ArgsMoveFile struct {
+	ArgsWithSpan
 	Source string
 	Target string
 }
@@ -13,5 +14,6 @@ type ReplyMoveFile struct {
 
 // ArgsDeleteFile 删除文件参数
 type ArgsDeleteFile struct {
+	ArgsWithSpan
 	FileName string
 }

+ 10 - 0
pkg/rpcs/registry.go

@@ -2,14 +2,22 @@ package rpcs
 
 // ArgsDeviceRegister device register args
 type ArgsDeviceRegister struct {
+	ArgsWithSpan
 	ProductKey    string
 	DeviceCode    string
 	DeviceVersion string
 	ModuleName    string
 }
 
+// ArgsDeviceAuth device auth
+type ArgsDeviceAuth struct {
+	ArgsWithSpan
+	DeviceID int64
+}
+
 // ArgsDeviceUpdate device update args
 type ArgsDeviceUpdate struct {
+	ArgsWithSpan
 	DeviceIdentifier  string
 	DeviceName        string
 	DeviceDescription string
@@ -17,6 +25,7 @@ type ArgsDeviceUpdate struct {
 
 // ArgsProductList get products list args
 type ArgsProductList struct {
+	ArgsWithSpan
 	ArgsPage
 	VendorID    uint
 	ProductName string
@@ -24,6 +33,7 @@ type ArgsProductList struct {
 
 // ArgsPage page params
 type ArgsPage struct {
+	ArgsWithSpan
 	Pi int
 	Ps int
 }

+ 7 - 2
pkg/server/server.go

@@ -7,10 +7,13 @@
 package server
 
 import (
+	"github.com/opentracing/opentracing-go"
+
 	// "github.com/vharitonsky/iniflags"
 	"flag"
 	"net/http"
 	"net/rpc"
+	"sparrow/pkg/tracing"
 	"time"
 )
 
@@ -63,7 +66,6 @@ func Init(name string) error {
 		if err != nil {
 			return err
 		}
-
 		Log.Infof("server %s init success.", name)
 
 	}
@@ -264,7 +266,10 @@ func Run() error {
 		}
 		Log.Info("starting rpc server ... OK")
 	}
-
+	tracer, closer := tracing.Init(serverInstance.name)
+	// opentracing
+	defer closer.Close()
+	opentracing.InitGlobalTracer(tracer)
 	Log.Info("sever launch successfully!")
 
 	// loop to do something

+ 106 - 96
pkg/tlv/tlv.go

@@ -8,35 +8,37 @@ import (
 	"io"
 )
 
+// 定义数据类型
 const (
-	TLV_FLOAT64 = 1
-	TLV_FLOAT32 = 2
-	TLV_INT8    = 3
-	TLV_INT16   = 4
-	TLV_INT32   = 5
-	TLV_INT64   = 6
-	TLV_UINT8   = 7
-	TLV_UINT16  = 8
-	TLV_UINT32  = 9
-	TLV_UINT64  = 10
-	TLV_BYTES   = 11
-	TLV_STRING  = 12
-	TLV_BOOL    = 13
+	TLVFLOAT64 = 1
+	TLVFLOAT32 = 2
+	TLVINT8    = 3
+	TLVINT16   = 4
+	TLVINT32   = 5
+	TLVINT64   = 6
+	TLVUINT8   = 7
+	TLVUINT16  = 8
+	TLVUINT32  = 9
+	TLVUINT64  = 10
+	TLVBYTES   = 11
+	TLVSTRING  = 12
+	TLVBOOL    = 13
 )
 
+// TLV type length value
 type TLV struct {
 	Tag   uint16
 	Value []byte
 }
 
-func Uint16ToByte(value uint16) []byte {
+func uint16ToByte(value uint16) []byte {
 	buf := bytes.NewBuffer([]byte{})
 	binary.Write(buf, binary.BigEndian, value)
 
 	return buf.Bytes()
 }
 
-func ByteToUint16(buf []byte) uint16 {
+func byteToUint16(buf []byte) uint16 {
 	tmpBuf := bytes.NewBuffer(buf)
 	var value uint16
 	binary.Read(tmpBuf, binary.BigEndian, &value)
@@ -44,6 +46,7 @@ func ByteToUint16(buf []byte) uint16 {
 	return value
 }
 
+// ToBinary make tlv to []byte
 func (tlv *TLV) ToBinary() []byte {
 	buf := new(bytes.Buffer)
 	binary.Write(buf, binary.BigEndian, &tlv.Tag)
@@ -52,34 +55,35 @@ func (tlv *TLV) ToBinary() []byte {
 	return buf.Bytes()
 }
 
+// Length get tlv length
 func (tlv *TLV) Length() int {
 	length := int(0)
 	switch tlv.Tag {
-	case TLV_FLOAT64:
+	case TLVFLOAT64:
 		length = 8
-	case TLV_INT64:
+	case TLVINT64:
 		length = 8
-	case TLV_UINT64:
+	case TLVUINT64:
 		length = 8
-	case TLV_FLOAT32:
+	case TLVFLOAT32:
 		length = 4
-	case TLV_INT32:
+	case TLVINT32:
 		length = 4
-	case TLV_UINT32:
+	case TLVUINT32:
 		length = 4
-	case TLV_INT16:
+	case TLVINT16:
 		length = 2
-	case TLV_UINT16:
+	case TLVUINT16:
 		length = 2
-	case TLV_INT8:
+	case TLVINT8:
 		length = 1
-	case TLV_UINT8:
+	case TLVUINT8:
 		length = 1
-	case TLV_BYTES:
-		length = int(ByteToUint16(tlv.Value[0:2]))
+	case TLVBYTES:
+		length = int(byteToUint16(tlv.Value[0:2]))
 		length += 2
-	case TLV_STRING:
-		length = int(ByteToUint16(tlv.Value[0:2]))
+	case TLVSTRING:
+		length = int(byteToUint16(tlv.Value[0:2]))
 		length += 2
 	default:
 		length = 0
@@ -90,124 +94,126 @@ func (tlv *TLV) Length() int {
 	return length
 }
 
+// FromBinary read from binary
 func (tlv *TLV) FromBinary(r io.Reader) error {
 	binary.Read(r, binary.BigEndian, &tlv.Tag)
 	length := uint16(0)
 	switch tlv.Tag {
-	case TLV_FLOAT64:
+	case TLVFLOAT64:
 		length = 8
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_INT64:
+	case TLVINT64:
 		length = 8
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_UINT64:
+	case TLVUINT64:
 		length = 8
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_FLOAT32:
+	case TLVFLOAT32:
 		length = 4
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_INT32:
+	case TLVINT32:
 		length = 4
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_UINT32:
+	case TLVUINT32:
 		length = 4
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_INT16:
+	case TLVINT16:
 		length = 2
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_UINT16:
+	case TLVUINT16:
 		length = 2
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_INT8:
+	case TLVINT8:
 		length = 1
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_UINT8:
+	case TLVUINT8:
 		length = 1
 		tlv.Value = make([]byte, length)
 		binary.Read(r, binary.BigEndian, &tlv.Value)
-	case TLV_BYTES:
+	case TLVBYTES:
 		binary.Read(r, binary.BigEndian, &length)
 		tlv.Value = make([]byte, length+2)
-		copy(tlv.Value[0:2], Uint16ToByte(length))
+		copy(tlv.Value[0:2], uint16ToByte(length))
 		binary.Read(r, binary.BigEndian, tlv.Value[2:])
-	case TLV_STRING:
+	case TLVSTRING:
 		binary.Read(r, binary.BigEndian, &length)
 		tlv.Value = make([]byte, length+2)
-		copy(tlv.Value[0:2], Uint16ToByte(length))
+		copy(tlv.Value[0:2], uint16ToByte(length))
 		binary.Read(r, binary.BigEndian, tlv.Value[2:])
 	default:
-		return errors.New(fmt.Sprintf("unsuport value: %d", tlv.Tag))
+		return fmt.Errorf("unsuport value: %d", tlv.Tag)
 	}
 
 	return nil
 }
 
+// MakeTLV make a tlv pointer
 func MakeTLV(a interface{}) (*TLV, error) {
 	var tag uint16
 	var length uint16
 	buf := new(bytes.Buffer)
-	switch a.(type) {
+	switch a := a.(type) {
 	case float64:
-		tag = TLV_FLOAT64
+		tag = TLVFLOAT64
 		length = 8
-		binary.Write(buf, binary.BigEndian, a.(float64))
+		binary.Write(buf, binary.BigEndian, a)
 	case float32:
-		tag = TLV_FLOAT32
+		tag = TLVFLOAT32
 		length = 4
-		binary.Write(buf, binary.BigEndian, a.(float32))
+		binary.Write(buf, binary.BigEndian, a)
 	case int8:
-		tag = TLV_INT8
+		tag = TLVINT8
 		length = 1
-		binary.Write(buf, binary.BigEndian, a.(int8))
+		binary.Write(buf, binary.BigEndian, a)
 	case int16:
-		tag = TLV_INT16
+		tag = TLVINT16
 		length = 2
-		binary.Write(buf, binary.BigEndian, a.(int16))
+		binary.Write(buf, binary.BigEndian, a)
 	case int32:
-		tag = TLV_INT32
+		tag = TLVINT32
 		length = 4
-		binary.Write(buf, binary.BigEndian, a.(int32))
+		binary.Write(buf, binary.BigEndian, a)
 	case int64:
-		tag = TLV_INT64
+		tag = TLVINT64
 		length = 8
-		binary.Write(buf, binary.BigEndian, a.(int64))
+		binary.Write(buf, binary.BigEndian, a)
 	case uint8:
-		tag = TLV_UINT8
+		tag = TLVUINT8
 		length = 1
-		binary.Write(buf, binary.BigEndian, a.(uint8))
+		binary.Write(buf, binary.BigEndian, a)
 	case uint16:
-		tag = TLV_UINT16
+		tag = TLVUINT16
 		length = 2
-		binary.Write(buf, binary.BigEndian, a.(uint16))
+		binary.Write(buf, binary.BigEndian, a)
 	case uint32:
-		tag = TLV_UINT32
+		tag = TLVUINT32
 		length = 4
-		binary.Write(buf, binary.BigEndian, a.(uint32))
+		binary.Write(buf, binary.BigEndian, a)
 	case uint64:
-		tag = TLV_UINT64
+		tag = TLVUINT64
 		length = 8
-		binary.Write(buf, binary.BigEndian, a.(uint64))
+		binary.Write(buf, binary.BigEndian, a)
 	case []byte:
-		tag = TLV_BYTES
-		length = uint16(len(a.([]byte)))
+		tag = TLVBYTES
+		length = uint16(len(a))
 		binary.Write(buf, binary.BigEndian, length)
-		binary.Write(buf, binary.BigEndian, a.([]byte))
+		binary.Write(buf, binary.BigEndian, a)
 	case string:
-		tag = TLV_STRING
-		length = uint16(len(a.(string)))
+		tag = TLVSTRING
+		length = uint16(len(a))
 		binary.Write(buf, binary.BigEndian, length)
-		binary.Write(buf, binary.BigEndian, []byte(a.(string)))
+		binary.Write(buf, binary.BigEndian, []byte(a))
 	default:
-		return nil, errors.New(fmt.Sprintf("unsuport value: %v", a))
+		return nil, fmt.Errorf("unsuport value: %v", a)
 	}
 
 	tlv := TLV{
@@ -222,6 +228,7 @@ func MakeTLV(a interface{}) (*TLV, error) {
 	return &tlv, nil
 }
 
+// ReadTLV read from tlv pointer
 func ReadTLV(tlv *TLV) (interface{}, error) {
 	tag := tlv.Tag
 	length := uint16(0)
@@ -229,47 +236,47 @@ func ReadTLV(tlv *TLV) (interface{}, error) {
 
 	buffer := bytes.NewReader(value)
 	switch tag {
-	case TLV_FLOAT64:
+	case TLVFLOAT64:
 		retvar := float64(0.0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_FLOAT32:
+	case TLVFLOAT32:
 		retvar := float32(0.0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_INT8:
+	case TLVINT8:
 		retvar := int8(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_INT16:
+	case TLVINT16:
 		retvar := int16(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_INT32:
+	case TLVINT32:
 		retvar := int32(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_INT64:
+	case TLVINT64:
 		retvar := int64(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_UINT8:
+	case TLVUINT8:
 		retvar := uint8(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_UINT16:
+	case TLVUINT16:
 		retvar := uint16(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_UINT32:
+	case TLVUINT32:
 		retvar := uint32(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_UINT64:
+	case TLVUINT64:
 		retvar := uint64(0)
 		err := binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_BYTES:
+	case TLVBYTES:
 		err := binary.Read(buffer, binary.BigEndian, &length)
 		if err != nil {
 			return []byte{}, err
@@ -277,7 +284,7 @@ func ReadTLV(tlv *TLV) (interface{}, error) {
 		retvar := make([]byte, length)
 		err = binary.Read(buffer, binary.BigEndian, &retvar)
 		return retvar, err
-	case TLV_STRING:
+	case TLVSTRING:
 		err := binary.Read(buffer, binary.BigEndian, &length)
 		if err != nil {
 			return string([]byte{}), err
@@ -290,6 +297,7 @@ func ReadTLV(tlv *TLV) (interface{}, error) {
 	}
 }
 
+// MakeTLVs ``
 func MakeTLVs(a []interface{}) ([]TLV, error) {
 	tlvs := []TLV{}
 	for _, one := range a {
@@ -302,6 +310,7 @@ func MakeTLVs(a []interface{}) ([]TLV, error) {
 	return tlvs, nil
 }
 
+// ReadTLVs ``
 func ReadTLVs(tlvs []TLV) ([]interface{}, error) {
 	values := []interface{}{}
 	for _, tlv := range tlvs {
@@ -314,31 +323,32 @@ func ReadTLVs(tlvs []TLV) ([]interface{}, error) {
 	return values, nil
 }
 
+// CastTLV cast tlv
 func CastTLV(value interface{}, valueType int32) interface{} {
 	switch valueType {
-	case TLV_FLOAT64:
+	case TLVFLOAT64:
 		return float64(value.(float64))
-	case TLV_FLOAT32:
+	case TLVFLOAT32:
 		return float32(value.(float64))
-	case TLV_INT8:
+	case TLVINT8:
 		return int8(value.(float64))
-	case TLV_INT16:
+	case TLVINT16:
 		return int16(value.(float64))
-	case TLV_INT32:
+	case TLVINT32:
 		return int32(value.(float64))
-	case TLV_INT64:
+	case TLVINT64:
 		return int64(value.(float64))
-	case TLV_UINT8:
+	case TLVUINT8:
 		return uint8(value.(float64))
-	case TLV_UINT16:
+	case TLVUINT16:
 		return uint16(value.(float64))
-	case TLV_UINT32:
+	case TLVUINT32:
 		return uint32(value.(float64))
-	case TLV_UINT64:
+	case TLVUINT64:
 		return uint64(value.(float64))
-	case TLV_BYTES:
+	case TLVBYTES:
 		return []byte(value.(string))
-	case TLV_STRING:
+	case TLVSTRING:
 		return value.(string)
 	default:
 		return nil

+ 27 - 0
pkg/tracing/tracer.go

@@ -0,0 +1,27 @@
+package tracing
+
+import (
+	"fmt"
+	"io"
+
+	opentracing "github.com/opentracing/opentracing-go"
+	jaeger "github.com/uber/jaeger-client-go"
+	config "github.com/uber/jaeger-client-go/config"
+)
+
+func Init(service string) (opentracing.Tracer, io.Closer) {
+	cfg := &config.Configuration{
+		Sampler: &config.SamplerConfig{
+			Type:  "const",
+			Param: 1,
+		},
+		Reporter: &config.ReporterConfig{
+			LogSpans: true,
+		},
+	}
+	tracer, closer, err := cfg.New(service, config.Logger(jaeger.StdLogger))
+	if err != nil {
+		panic(fmt.Sprintf("ERROR: cannot init Jaeger: %v\n", err))
+	}
+	return tracer, closer
+}

+ 7 - 0
services/fileaccess/fileaccess.go

@@ -15,6 +15,8 @@ import (
 	"time"
 
 	"github.com/garyburd/redigo/redis"
+	"github.com/opentracing/opentracing-go"
+	"github.com/opentracing/opentracing-go/ext"
 )
 
 const checkTimeOut = 30 * time.Minute
@@ -155,6 +157,10 @@ func (f *FileAccess) DeleteFile(args *rpcs.ArgsDeleteFile, reply *rpcs.ReplyEmpt
 // MoveFile move a file to new path
 // source:http://192.168.175.60:9000/upload/tmp/2c9d7d85-2266-450a-9d47-28e67703d818.jpeg
 func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile) error {
+	spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
+	span := opentracing.StartSpan("movefile", ext.RPCServerOption(spanCtx))
+	defer span.Finish()
+	span.LogKV("tmpfile", args.Source)
 	// check source file
 	reg := regexp.MustCompile(`tmp/\$*.*`)
 	src := reg.FindString(args.Source)
@@ -195,6 +201,7 @@ func (f *FileAccess) MoveFile(args *rpcs.ArgsMoveFile, reply *rpcs.ReplyMoveFile
 			fpath = fmt.Sprintf("%s/%s/%s/%s", *conDomain, *conStaticPath, args.Target, fileName)
 		}
 		reply.FilePath = fpath
+		span.LogKV("tartgetfile", fpath)
 		//delete src
 		os.Remove(src)
 		return nil

+ 30 - 1
services/httpaccess/actions.go

@@ -10,6 +10,10 @@ import (
 	"sparrow/pkg/server"
 	"sparrow/pkg/token"
 
+	"github.com/opentracing/opentracing-go/ext"
+
+	"github.com/opentracing/opentracing-go"
+
 	"github.com/martini-contrib/render"
 )
 
@@ -47,13 +51,23 @@ type DeviceAuthArgs struct {
 // RegisterDevice 设备激活
 func RegisterDevice(args DeviceRegisterArgs, r render.Render) {
 	server.Log.Printf("ACTION RegisterDevice, args:: %v ", args)
+	span := opentracing.StartSpan("/v1/devices/registration")
+	defer span.Finish()
+
 	rpcargs := &rpcs.ArgsDeviceRegister{
 		ProductKey:    args.ProductKey,
 		DeviceCode:    args.DeviceCode,
 		DeviceVersion: args.Version,
 		ModuleName:    args.ModuleName,
 	}
+	rpcargs.SpanCarrier = make(map[string]string)
 	device := &models.Device{}
+	ext.SpanKindRPCClient.Set(span)
+	span.Tracer().Inject(
+		span.Context(),
+		opentracing.TextMap,
+		opentracing.TextMapCarrier(rpcargs.SpanCarrier),
+	)
 	err := server.RPCCallByName("registry", "Registry.RegisterDevice", rpcargs, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrSystemFault, err))
@@ -76,7 +90,22 @@ func RegisterDevice(args DeviceRegisterArgs, r render.Render) {
 func AuthDevice(args DeviceAuthArgs, r render.Render) {
 	server.Log.Printf("ACTION AuthDevice, args:: %v", args)
 	device := &models.Device{}
-	err := server.RPCCallByName("registry", "Registry.FindDeviceById", int64(args.DeviceId), device)
+
+	arg := &rpcs.ArgsDeviceAuth{
+		DeviceID: args.DeviceId,
+	}
+	arg.SpanCarrier = make(map[string]string)
+
+	span := opentracing.StartSpan("/v1/devices/authentication")
+	defer span.Finish()
+
+	ext.SpanKindRPCClient.Set(span)
+	span.Tracer().Inject(
+		span.Context(),
+		opentracing.TextMap,
+		opentracing.TextMapCarrier(arg.SpanCarrier),
+	)
+	err := server.RPCCallByName("registry", "Registry.FindDeviceById", arg, device)
 	if err != nil {
 		r.JSON(http.StatusOK, renderError(ErrDeviceNotFound, err))
 		return

+ 14 - 0
services/knowoapi/controllers/application.go

@@ -7,6 +7,10 @@ import (
 	"sparrow/services/knowoapi/services"
 	"strings"
 
+	"github.com/opentracing/opentracing-go/ext"
+
+	"github.com/opentracing/opentracing-go"
+
 	"github.com/kataras/iris"
 )
 
@@ -32,7 +36,17 @@ func (a *AppController) Post() {
 			Source: app.AppIcon,
 			Target: "application",
 		}
+		args.SpanCarrier = make(map[string]string)
 		reply := &rpcs.ReplyMoveFile{}
+		span := opentracing.StartSpan(a.Ctx.Path())
+		defer span.Finish()
+		ext.SpanKindRPCClient.Set(span)
+		ext.HTTPMethod.Set(span, a.Ctx.Method())
+		span.Tracer().Inject(
+			span.Context(),
+			opentracing.TextMap,
+			opentracing.TextMapCarrier(args.SpanCarrier),
+		)
 		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)

+ 14 - 0
services/knowoapi/controllers/produdct.go

@@ -8,6 +8,8 @@ import (
 	"strings"
 
 	"github.com/kataras/iris"
+	opentracing "github.com/opentracing/opentracing-go"
+	"github.com/opentracing/opentracing-go/ext"
 )
 
 // ProductController 产品API
@@ -33,7 +35,19 @@ func (a *ProductController) Post() {
 			Source: product.ProductImage,
 			Target: "product",
 		}
+		args.SpanCarrier = make(map[string]string)
 		reply := &rpcs.ReplyMoveFile{}
+
+		span := opentracing.StartSpan(a.Ctx.Path())
+		defer span.Finish()
+		ext.SpanKindRPCClient.Set(span)
+		ext.HTTPMethod.Set(span, a.Ctx.Method())
+		span.Tracer().Inject(
+			span.Context(),
+			opentracing.TextMap,
+			opentracing.TextMapCarrier(args.SpanCarrier),
+		)
+
 		err := server.RPCCallByName("fileaccess", "FileAccess.MoveFile", args, reply)
 		if err != nil {
 			server.Log.Error(err)

+ 11 - 1
services/knowoapi/router.go

@@ -6,6 +6,8 @@ import (
 	"sparrow/services/knowoapi/model"
 	"sparrow/services/knowoapi/services"
 
+	"github.com/opentracing/opentracing-go"
+
 	jwt "github.com/dgrijalva/jwt-go"
 	jwtmiddleware "github.com/iris-contrib/middleware/jwt"
 	"github.com/kataras/iris"
@@ -49,7 +51,15 @@ func registerRouters(srv *iris.Application, models *model.All, gen *generator.Ke
 	alertService := services.NewAlertService(models)
 	deviceService := services.NewDeviceService(models)
 	roleService := services.NewRoleService(models)
-	v1router := srv.Party("/api/v1")
+	v1router := srv.Party("/api/v1", func(ctx iris.Context) {
+		span := opentracing.StartSpan(ctx.Path())
+		defer span.Finish()
+		span.SetTag("http.method", ctx.Method())
+		span.SetTag("http.url", ctx.Path())
+		span.SetTag("http.status_code", ctx.GetStatusCode())
+		ctx.Values().Set("span", span)
+		ctx.Next()
+	})
 
 	// 登陆,注册
 	loginAPI := mvc.New(v1router.Party("/"))

+ 2 - 2
services/registry/product.go

@@ -91,7 +91,7 @@ func (r *Registry) FindProduct(id int32, reply *models.Product) error {
 	return nil
 }
 
-// ValidProduct try to validate the given product key.
+// ValidateProduct try to validate the given product key.
 // if success, it will reply the corresponding product
 func (r *Registry) ValidateProduct(key string, reply *models.Product) error {
 	db, err := getDB()
@@ -99,7 +99,7 @@ func (r *Registry) ValidateProduct(key string, reply *models.Product) error {
 		return err
 	}
 
-	id, err := r.keygen.DecodeIdFromRandomKey(key)
+	id, err := r.keygen.DecodeIDFromRandomKey(key)
 	server.Log.Debug(id)
 	if err != nil {
 		return err

+ 23 - 8
services/registry/registry.go

@@ -7,6 +7,9 @@ import (
 	"sparrow/pkg/generator"
 	"sparrow/pkg/models"
 	"sparrow/pkg/rpcs"
+
+	opentracing "github.com/opentracing/opentracing-go"
+	"github.com/opentracing/opentracing-go/ext"
 )
 
 const (
@@ -163,7 +166,7 @@ func (r *Registry) ValidateApplication(key string, reply *models.Application) er
 		return err
 	}
 
-	id, err := r.keygen.DecodeIdFromRandomKey(key)
+	id, err := r.keygen.DecodeIDFromRandomKey(key)
 	if err != nil {
 		return err
 	}
@@ -264,6 +267,10 @@ func (r *Registry) FindApplication(id int32, reply *models.Application) error {
 // if the device has already been registered,
 // the registration will success return the registered device before.
 func (r *Registry) RegisterDevice(args *rpcs.ArgsDeviceRegister, reply *models.Device) error {
+	spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
+	span := opentracing.StartSpan("RegisterDevice", ext.RPCServerOption(spanCtx))
+	defer span.Finish()
+	span.SetTag("args", args)
 	db, err := getDB()
 	if err != nil {
 		return err
@@ -319,9 +326,10 @@ func (r *Registry) RegisterDevice(args *rpcs.ArgsDeviceRegister, reply *models.D
 		if err != nil {
 			return err
 		}
-
 	}
-
+	span.LogKV("device_identifier", reply.DeviceIdentifier)
+	span.LogKV("device_key", reply.DeviceKey)
+	span.LogKV("device_secret", reply.DeviceSecret)
 	return nil
 }
 
@@ -353,29 +361,36 @@ func (r *Registry) FindDeviceByIdentifier(identifier string, reply *models.Devic
 }
 
 // FindDeviceById will find the device with given id
-func (r *Registry) FindDeviceById(id int64, reply *models.Device) error {
+func (r *Registry) FindDeviceById(args *rpcs.ArgsDeviceAuth, reply *models.Device) error {
+	spanCtx, _ := opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(args.SpanCarrier))
+	span := opentracing.StartSpan("FindDeviceById", ext.RPCServerOption(spanCtx))
+	defer span.Finish()
+	span.SetTag("args", args)
 	db, err := getDB()
 	if err != nil {
 		return err
 	}
 	d := &models.Device{}
-	d.ID = uint(id)
+	d.ID = uint(args.DeviceID)
 	err = db.Where(d).First(reply).Error
 
 	if err != nil {
 		return err
 	}
+	span.LogKV("device", reply)
 	return nil
 }
 
 // ValidateDevice will validate a device key and return the model if success.
 func (r *Registry) ValidateDevice(key string, device *models.Device) error {
-	id, err := r.keygen.DecodeIdFromRandomKey(key)
+	id, err := r.keygen.DecodeIDFromRandomKey(key)
 	if err != nil {
 		return err
 	}
-
-	err = r.FindDeviceById(id, device)
+	args := rpcs.ArgsDeviceAuth{
+		DeviceID: id,
+	}
+	err = r.FindDeviceById(&args, device)
 	if err != nil {
 		return err
 	}

BIN
tests/device/device


+ 10 - 8
tests/device/device.go

@@ -205,14 +205,13 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 }
 
 func (d *Device) coapReportStatus(conn *net.UDPConn) {
-
+	fmt.Println("coapReportStatus")
 	for {
-		time.Sleep(10 * time.Second)
 		payloadHead := protocol.DataHead{
 			Flag:      0,
 			Timestamp: uint64(time.Now().Unix() * 1000),
 		}
-		param := []interface{}{1}
+		param := []interface{}{int64(10)}
 		params, err := tlv.MakeTLVs(param)
 		if err != nil {
 			fmt.Println(err)
@@ -241,14 +240,17 @@ func (d *Device) coapReportStatus(conn *net.UDPConn) {
 		}
 
 		req := &coap.BaseMessage{
-			Code:    coap.POST,
-			Type:    coap.CON,
-			Token:   d.token,
-			Payload: payload,
+			Code:      coap.POST,
+			Type:      coap.CON,
+			Token:     d.token,
+			MessageID: 2,
+			Payload:   payload,
 		}
 		req.SetPathString(fmt.Sprintf("%d/s", d.id))
 		reqbytes, _ := req.Encode()
 		conn.Write(reqbytes)
+		fmt.Println("write end")
+		time.Sleep(10 * time.Second)
 	}
 }
 
@@ -355,9 +357,9 @@ func (d *Device) doCoAPAccess() error {
 	if err != nil {
 		return err
 	}
-	defer conn.Close()
 	go d.coapReportStatus(conn)
 	<-make(chan int)
+	defer conn.Close()
 	return nil
 }