Explorar o código

更新测试device工具,支持coap协议

lijian %!s(int64=6) %!d(string=hai) anos
pai
achega
e4a1f9861c
Modificáronse 8 ficheiros con 135 adicións e 39 borrados
  1. 2 1
      README.md
  2. 18 9
      pkg/coap/manager.go
  3. 2 0
      services/httpaccess/actions.go
  4. BIN=BIN
      tests/coaptool/coaptool
  5. 4 4
      tests/coaptool/main.go
  6. BIN=BIN
      tests/device/device
  7. 101 18
      tests/device/device.go
  8. 8 7
      tests/device/main.go

+ 2 - 1
README.md

@@ -9,4 +9,5 @@
 * `httpaccess`: 设备Http API接入服务,实现设备登陆,注册,认证等逻辑 [文档说明](httpaccess.md)
 * `mqttaccess`: MQTT接入服务 [文档说明](mqttaccess.md)
 * `knowoapi`: 开放平台管理中心api接口服务 [文档说明](knowoapi.md)
-* `fileaccess`: 静态文件服务器 [文档说明](./services/fileaccess/readme.markdown)
+* `fileaccess`: 静态文件服务器 [文档说明](./services/fileaccess/readme.markdown)
+* `coapaccess`: CoAP接入服务[文档说明](./services/coapaccess/readme.md)

+ 18 - 9
pkg/coap/manager.go

@@ -4,6 +4,7 @@ import (
 	"net"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
+	"strconv"
 	"sync/atomic"
 	"time"
 )
@@ -21,9 +22,9 @@ const (
 	maxWorkersCount   = 10000
 	idleWorkerTimeout = 10 * time.Second
 
-	pubStatusTopic  = "/s"
-	pubEventTopic   = "/e"
-	subCommandTopic = "/c"
+	pubStatusTopic  = "s"
+	pubEventTopic   = "e"
+	subCommandTopic = "c"
 )
 
 type Manager struct {
@@ -98,13 +99,19 @@ func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
 	}
 	return true
 }
+
+// coap://endpoint/$DEVICE_ID/s
 func (m *Manager) serve(w *Request) {
 	msg := w.Msg
 	server.Log.Debugf("get packet:%#v", msg)
-	if msg.IsConfirmable() {
+	if msg.IsConfirmable() && len(msg.Path()) > 1 {
 		token := msg.GetToken()
 		// TODO:想别的deviceid的
-		deviceid := msg.GetMessageID()
+		deviceid, err := strconv.ParseUint(msg.Path()[0], 10, 0)
+		if err != nil {
+			server.Log.Errorf("device id error:%s", msg.Path()[0])
+			return
+		}
 		if len(token) != 8 {
 			res := &BaseMessage{
 				Code:      Unauthorized,
@@ -114,11 +121,11 @@ func (m *Manager) serve(w *Request) {
 			}
 			bytes, _ := res.Encode()
 			w.Conn.WriteTo(bytes, w.Addr)
-			server.Log.Debugf("token length error, size :%d", len(token))
+			server.Log.Errorf("token length error, size :%d", len(token))
 			return
 		}
 		//check token
-		err := m.Provider.ValidateDeviceToken(uint64(deviceid), token)
+		err = m.Provider.ValidateDeviceToken(deviceid, token)
 		if err != nil {
 			res := &BaseMessage{
 				Code:      Unauthorized,
@@ -132,7 +139,7 @@ func (m *Manager) serve(w *Request) {
 			return
 		}
 		args := rpcs.ArgsGetOnline{
-			Id:                uint64(deviceid),
+			Id:                deviceid,
 			ClientIP:          w.Addr.String(),
 			AccessRPCHost:     server.GetRPCHost(),
 			HeartbeatInterval: 30,
@@ -152,7 +159,7 @@ func (m *Manager) serve(w *Request) {
 			return
 		}
 		server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
-		topic := msg.Path()[0]
+		topic := msg.Path()[1]
 		switch topic {
 		case pubStatusTopic, pubEventTopic, subCommandTopic:
 			server.Log.Infof("%s, publish status", w.Addr.String())
@@ -184,6 +191,8 @@ func (m *Manager) serve(w *Request) {
 			w.Conn.WriteTo(ackbytes, w.Addr)
 			return
 		}
+	} else {
+		//无效请求
 	}
 }
 

+ 2 - 0
services/httpaccess/actions.go

@@ -101,6 +101,8 @@ func AuthDevice(args DeviceAuthArgs, r render.Render) {
 		hosts, err = server.GetServerHosts(args.Protocol+"access", "httphost")
 	case "mqtt":
 		hosts, err = server.GetServerHosts(args.Protocol+"access", "tcphost")
+	case "coap":
+		hosts, err = server.GetServerHosts(args.Protocol+"access", "udphost")
 	default:
 		err = errors.New("unsuported protocol: " + args.Protocol)
 	}

BIN=BIN
tests/coaptool/coaptool


+ 4 - 4
tests/coaptool/main.go

@@ -25,7 +25,7 @@ func main() {
 		os.Exit(1)
 	}
 	defer conn.Close()
-	token, _ := hex.DecodeString("1b00f92489298929")
+	token, _ := hex.DecodeString("e30680acf77ecd60")
 	payloadHead := protocol.DataHead{
 		Flag:      0,
 		Timestamp: uint64(time.Now().Unix() * 1000),
@@ -78,19 +78,19 @@ func main() {
 		Token:   token,
 		Payload: buf,
 	}
-	req.AddOption(coap.URIPath, "/s")
+	req.SetPathString("/2")
 	bytes, err := req.Encode()
 	if err != nil {
 		fmt.Printf("dial err %s", err.Error())
 	}
 	conn.Write(bytes)
-	msg := make([]byte, 1500)
+	msg := make([]byte, 20)
 
 	_, err = conn.Read(msg)
 	if err != nil {
 		fmt.Println(err)
 	}
 
-	fmt.Println("Response:", string(msg))
+	fmt.Printf("Response:%#v", msg)
 
 }

BIN=BIN
tests/device/device


+ 101 - 18
tests/device/device.go

@@ -5,7 +5,9 @@ import (
 	"encoding/json"
 	"fmt"
 	"log"
+	"net"
 	"os"
+	"sparrow/pkg/coap"
 	"sparrow/pkg/protocol"
 	"sparrow/pkg/tlv"
 	"time"
@@ -17,27 +19,27 @@ const (
 	commonCmdGetStatus = uint16(65528)
 )
 
-// device register args
+// DeviceRegisterArgs device register args
 type DeviceRegisterArgs struct {
 	ProductKey string `json:"product_key"  binding:"required"`
 	DeviceCode string `json:"device_code"  binding:"required"`
 	Version    string `json:"version"  binding:"required"`
 }
 
-// device authentication args
+// DeviceAuthArgs device authentication args
 type DeviceAuthArgs struct {
 	DeviceId     int64  `json:"device_id" binding:"required"`
 	DeviceSecret string `json:"device_secret" binding:"required"`
 	Protocol     string `json:"protocol" binding:"required"`
 }
 
-// common response fields
+// Common common response fields
 type Common struct {
 	Code    int    `json:"code"`
 	Message string `json:"message"`
 }
 
-// device register response data field
+// DeviceRegisterData device register response data field
 type DeviceRegisterData struct {
 	DeviceId         int64  `json:"device_id"`
 	DeviceSecret     string `json:"device_secret"`
@@ -45,32 +47,34 @@ type DeviceRegisterData struct {
 	DeviceIdentifier string `json:"device_identifier"`
 }
 
-// device register response
+// DeviceRegisterResponse device register response
 type DeviceRegisterResponse struct {
 	Common
 	Data DeviceRegisterData `json:"data"`
 }
 
-// device auth response data field
+// DeviceAuthData device auth response data field
 type DeviceAuthData struct {
 	AccessToken string `json:"access_token"`
 	AccessAddr  string `json:"access_addr"`
 }
 
-// device auth response
+// DeviceAuthResponse device auth response
 type DeviceAuthResponse struct {
 	Common
 	Data DeviceAuthData `json:"data"`
 }
 
+// Device a device
 type Device struct {
-	// API URL
+	// Url API URL
 	Url string
 
 	// basic info
 	ProductKey string
 	DeviceCode string
 	Version    string
+	Proto      string
 
 	// private things
 	id      int64
@@ -79,28 +83,31 @@ type Device struct {
 	access  string
 }
 
-func NewDevice(url string, productkey string, code string, version string) *Device {
+// NewDevice create a device
+func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
 
 	return &Device{
 		Url:        url,
 		ProductKey: productkey,
 		DeviceCode: code,
 		Version:    version,
+		Proto:      proto,
 	}
 }
 
+// DoRegister device register
 func (d *Device) DoRegister() error {
 	args := DeviceRegisterArgs{
 		ProductKey: d.ProductKey,
 		DeviceCode: d.DeviceCode,
 		Version:    d.Version,
 	}
-	regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
+	regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
 	request, err := json.Marshal(args)
 	if err != nil {
 		return err
 	}
-	jsonresp, err := SendHttpRequest(regUrl, string(request), "POST", nil)
+	jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
 	if err != nil {
 		return err
 	}
@@ -120,18 +127,19 @@ func (d *Device) DoRegister() error {
 	return nil
 }
 
+// DoLogin device log in
 func (d *Device) DoLogin() error {
 	args := DeviceAuthArgs{
 		DeviceId:     d.id,
 		DeviceSecret: d.secrect,
-		Protocol:     "mqtt",
+		Protocol:     d.Proto,
 	}
-	regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
+	regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
 	request, err := json.Marshal(args)
 	if err != nil {
 		return err
 	}
-	jsonresp, err := SendHttpRequest(regUrl, string(request), "POST", nil)
+	jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
 	if err != nil {
 		return err
 	}
@@ -163,7 +171,7 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 			Flag:      0,
 			Timestamp: uint64(time.Now().Unix() * 1000),
 		}
-		param := []interface{}{"li jian"}
+		param := []interface{}{1}
 		params, err := tlv.MakeTLVs(param)
 		if err != nil {
 			fmt.Println(err)
@@ -196,6 +204,54 @@ func (d *Device) reportStatus(client *MQTT.Client) {
 
 }
 
+func (d *Device) coapReportStatus(conn *net.UDPConn) {
+
+	for {
+		time.Sleep(10 * time.Second)
+		payloadHead := protocol.DataHead{
+			Flag:      0,
+			Timestamp: uint64(time.Now().Unix() * 1000),
+		}
+		param := []interface{}{1}
+		params, err := tlv.MakeTLVs(param)
+		if err != nil {
+			fmt.Println(err)
+			return
+		}
+		sub := protocol.SubData{
+			Head: protocol.SubDataHead{
+				SubDeviceid: uint16(1),
+				PropertyNum: uint16(1),
+				ParamsCount: uint16(len(params)),
+			},
+			Params: params,
+		}
+
+		status := protocol.Data{
+			Head:    payloadHead,
+			SubData: []protocol.SubData{},
+		}
+
+		status.SubData = append(status.SubData, sub)
+
+		payload, err := status.Marshal()
+		if err != nil {
+			fmt.Println(err)
+			return
+		}
+
+		req := &coap.BaseMessage{
+			Code:    coap.POST,
+			Type:    coap.CON,
+			Token:   d.token,
+			Payload: payload,
+		}
+		req.SetPathString(fmt.Sprintf("%d/s", d.id))
+		reqbytes, _ := req.Encode()
+		conn.Write(reqbytes)
+	}
+}
+
 func (d *Device) reportEvent(client *MQTT.Client) {
 	for {
 		time.Sleep(3 * time.Second)
@@ -275,7 +331,37 @@ func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
 	}
 }
 
+// DoAccess device access
 func (d *Device) DoAccess() error {
+	if d.Proto == "mqtt" {
+		if err := d.doMQTTAccess(); err != nil {
+			fmt.Printf("do mqtt access error:%s", err.Error())
+		}
+	} else if d.Proto == "coap" {
+		if err := d.doCoAPAccess(); err != nil {
+			fmt.Printf("do coap access error:%s", err.Error())
+		}
+	}
+	return nil
+}
+
+func (d *Device) doCoAPAccess() error {
+	fmt.Printf("get access addr :%s", d.access)
+	addr, err := net.ResolveUDPAddr("udp", d.access)
+	if err != nil {
+		return err
+	}
+	conn, err := net.DialUDP("udp", nil, addr)
+	if err != nil {
+		return err
+	}
+	defer conn.Close()
+	go d.coapReportStatus(conn)
+	<-make(chan int)
+	return nil
+}
+
+func (d *Device) doMQTTAccess() error {
 	logger := log.New(os.Stdout, "", log.LstdFlags)
 	MQTT.ERROR = logger
 	MQTT.CRITICAL = logger
@@ -298,9 +384,6 @@ func (d *Device) DoAccess() error {
 	if token := c.Connect(); token.Wait() && token.Error() != nil {
 		return token.Error()
 	}
-
-	// beigin report event test
-	//go d.reportEvent(c)
 	go d.reportStatus(c)
 	// we just pause here to wait for messages
 	<-make(chan int)

+ 8 - 7
tests/device/main.go

@@ -6,35 +6,36 @@ import (
 )
 
 var (
-	TestUrl        = flag.String("url", "http://192.168.175.60:8088", "login url")
-	TestProductKey = flag.String("productkey", "99b11b395c84435202692e36dada175c7af9452038a62a40b230b5e18b7d51ff", "product key")
+	testURL        = flag.String("url", "http://192.168.175.60:8088", "login url")
+	testProductKey = flag.String("productkey", "99b11b395c84435202692e36dada175c7af9452038a62a40b230b5e18b7d51ff", "product key")
+	testProtocol   = flag.String("protocol", "mqtt", "access protocol")
 )
 
 func main() {
 	flag.Parse()
 
-	if *TestProductKey == "" {
+	if *testProductKey == "" {
 		fmt.Println("product key not provided. use -productkey flag")
 		return
 	}
 
-	dev := NewDevice(*TestUrl, *TestProductKey, "ffe34e", "version")
+	dev := NewDevice(*testURL, *testProductKey, "ffe34e", "version", *testProtocol)
 
 	err := dev.DoRegister()
 	if err != nil {
-		fmt.Errorf("device register error %s", err)
+		fmt.Printf("device register error %s", err)
 		return
 	}
 
 	err = dev.DoLogin()
 	if err != nil {
-		fmt.Errorf("device login error %s", err)
+		fmt.Printf("device login error %s", err)
 		return
 	}
 
 	err = dev.DoAccess()
 	if err != nil {
-		fmt.Errorf("device access error %s", err)
+		fmt.Printf("device access error %s", err)
 		return
 	}