瀏覽代碼

重新实现coapx协议,没有实现订阅

lijian 6 年之前
父節點
當前提交
e560a7316c
共有 7 個文件被更改,包括 169 次插入25 次删除
  1. 75 1
      pkg/coap/manager.go
  2. 4 0
      pkg/coap/message.go
  3. 10 7
      pkg/token/token_test.go
  4. 2 2
      run.sh
  5. 8 1
      services/coapaccess/coap_provider.go
  6. 二進制
      tests/coaptool/coaptool
  7. 70 14
      tests/coaptool/main.go

+ 75 - 1
pkg/coap/manager.go

@@ -2,6 +2,7 @@ package coap
 
 import (
 	"net"
+	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 	"sync/atomic"
 	"time"
@@ -19,6 +20,10 @@ const (
 	maxPktlen         = 1500
 	maxWorkersCount   = 10000
 	idleWorkerTimeout = 10 * time.Second
+
+	pubStatusTopic  = "/s"
+	pubEventTopic   = "/e"
+	subCommandTopic = "/c"
 )
 
 type Manager struct {
@@ -96,9 +101,10 @@ func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
 func (m *Manager) serve(w *Request) {
 	msg := w.Msg
 	server.Log.Debugf("get packet:%#v", msg)
-	// check token
 	if msg.IsConfirmable() {
 		token := msg.GetToken()
+		// TODO:想别的deviceid的
+		deviceid := msg.GetMessageID()
 		if len(token) != 8 {
 			res := &BaseMessage{
 				Code:      Unauthorized,
@@ -111,8 +117,76 @@ func (m *Manager) serve(w *Request) {
 			server.Log.Debugf("token length error, size :%d", len(token))
 			return
 		}
+		//check token
+		err := m.Provider.ValidateDeviceToken(uint64(deviceid), token)
+		if err != nil {
+			res := &BaseMessage{
+				Code:      Unauthorized,
+				Type:      ACK,
+				MessageID: msg.GetMessageID(),
+				Token:     msg.GetToken(),
+			}
+			bytes, _ := res.Encode()
+			w.Conn.WriteTo(bytes, w.Addr)
+			server.Log.Warnf("device %d token not validate, token :%v", deviceid, token)
+			return
+		}
+		args := rpcs.ArgsGetOnline{
+			Id:                uint64(deviceid),
+			ClientIP:          w.Addr.String(),
+			AccessRPCHost:     server.GetRPCHost(),
+			HeartbeatInterval: 30,
+		}
+
+		ack := &BaseMessage{
+			Code:      Changed,
+			Type:      ACK,
+			MessageID: msg.GetMessageID(),
+			Token:     msg.GetToken(),
+		}
+		ackbytes, _ := ack.Encode()
+		w.Conn.WriteTo(ackbytes, w.Addr)
+		err = m.Provider.OnDeviceOnline(args)
+		if err != nil {
+			server.Log.Warnf("device online error :%v", err)
+			return
+		}
+		server.Log.Infof("device %d, connected to server now host:%s", deviceid, w.Addr.String())
+		topic := msg.Path()[0]
+		switch topic {
+		case pubStatusTopic, pubEventTopic, subCommandTopic:
+			server.Log.Infof("%s, publish status", w.Addr.String())
+			m.Provider.OnDeviceMessage(uint64(deviceid), topic, msg.GetPayload())
+			err := m.Provider.OnDeviceHeartBeat(uint64(deviceid))
+			if err != nil {
+				server.Log.Warnf("heartbeat set error:%s", w.Addr.String())
+				return
+			}
+			//pub ack
+			ack := &BaseMessage{
+				Code:      Created,
+				Type:      ACK,
+				MessageID: msg.GetMessageID(),
+				Token:     msg.GetToken(),
+			}
+			ackbytes, _ := ack.Encode()
+			w.Conn.WriteTo(ackbytes, w.Addr)
+		default:
+			//无效主题
+			server.Log.Errorf("unknown msg type:%s", topic)
+			ack := &BaseMessage{
+				Code:      BadRequest,
+				Type:      ACK,
+				MessageID: msg.GetMessageID(),
+				Token:     msg.GetToken(),
+			}
+			ackbytes, _ := ack.Encode()
+			w.Conn.WriteTo(ackbytes, w.Addr)
+			return
+		}
 	}
 }
+
 func (m *Manager) spawnWorker(req *Request) {
 	select {
 	case m.queue <- req:

+ 4 - 0
pkg/coap/message.go

@@ -274,6 +274,7 @@ type Message interface {
 	GetMessageID() uint16
 	GetToken() []byte
 	GetCode() COAPCode
+	GetPayload() []byte
 }
 
 // BaseMessage COAP 消息体
@@ -457,6 +458,9 @@ func (m *BaseMessage) AllOptions() options {
 func (m *BaseMessage) IsConfirmable() bool {
 	return m.Type == CON
 }
+func (m *BaseMessage) GetPayload() []byte {
+	return m.Payload
+}
 
 // Option get option by id
 func (m *BaseMessage) Option(o OptionID) interface{} {

+ 10 - 7
pkg/token/token_test.go

@@ -1,27 +1,30 @@
 package token
 
 import (
+	"encoding/hex"
 	"testing"
 )
 
 func TestTokenHelper(t *testing.T) {
-	helper := NewHelper("localhost:6379")
+	helper := NewHelper("192.168.175.60:6379")
 
-	testid := uint64(123)
+	testid := uint64(2)
 
 	token, err := helper.GenerateToken(testid)
 	if err != nil {
 		t.Error(err)
 	}
-
+	encodedStr := hex.EncodeToString(token)
+	de, _ := hex.DecodeString(encodedStr)
+	t.Log(encodedStr, de, token)
 	err = helper.ValidateToken(testid, token)
 	if err != nil {
 		t.Error(err)
 	}
 
-	err = helper.ClearToken(testid)
-	if err != nil {
-		t.Error(err)
-	}
+	// err = helper.ClearToken(testid)
+	// if err != nil {
+	// 	t.Error(err)
+	// }
 
 }

+ 2 - 2
run.sh

@@ -1,6 +1,6 @@
 export GOPATH=/Users/terrence/go
 
-sudo killall -9 httpaccess registry apiprovider devicemanager controller mqttaccess knowoapi fileaccess
+sudo killall -9 httpaccess registry apiprovider devicemanager controller mqttaccess knowoapi fileaccess coapaccess
 
 # start services
 #$GOPATH/bin/httpaccess -etcd http://localhost:2379 -httphost internal:443 -loglevel debug -usehttps -keyfile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/key.pem -cafile $GOPATH/src/github.com/PandoCloud/pando-cloud/pkg/server/testdata/cert.pem &
@@ -13,7 +13,7 @@ $GOPATH/bin/controller -etcd http://192.168.175.60:2379 -loglevel debug  -rpchos
 $GOPATH/bin/mqttaccess -etcd http://192.168.175.60:2379 -loglevel debug  -rpchost localhost:20030 -tcphost internal:1883  &
 $GOPATH/bin/knowoapi -etcd http://192.168.175.60:2379 -loglevel debug  -httphost localhost:8889 -dbhost 192.168.175.60 -dbname SparrowCloud -dbport 3306 -dbuser SparrowCloud -dbpass 123456 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
 $GOPATH/bin/fileaccess -etcd http://192.168.175.60:2379 -loglevel debug  -rpchost localhost:20035 -httphost localhost:9000 &
-$GOPATH/bin/coapaccess -etcd http://192.168.175.60:2379 -loglevel debug  -udphost localhost:5683 &
+$GOPATH/bin/coapaccess -etcd http://192.168.175.60:2379 -loglevel debug  -udphost localhost:56883 &
 exit 0
 
 

+ 8 - 1
services/coapaccess/coap_provider.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"sparrow/pkg/protocol"
 	"sparrow/pkg/rpcs"
 	"sparrow/pkg/server"
 )
@@ -58,5 +59,11 @@ func (mp *CoAPProvider) OnDeviceHeartBeat(deviceid uint64) error {
 }
 func (mp *CoAPProvider) OnDeviceMessage(deviceid uint64, msgtype string, message []byte) {
 	server.Log.Infof("device {%v} message {%v} : %x", deviceid, msgtype, message)
-
+	data := &protocol.Data{}
+	err := data.UnMarshal(message)
+	if err != nil {
+		server.Log.Errorf("unmarshal data error : %v", err)
+		return
+	}
+	server.Log.Debugf("data %#v", data.SubData)
 }

二進制
tests/coaptool/coaptool


+ 70 - 14
tests/coaptool/main.go

@@ -1,27 +1,74 @@
 package main
 
 import (
+	"encoding/hex"
 	"fmt"
 	"net"
+	"os"
 	"sparrow/pkg/coap"
+	"sparrow/pkg/protocol"
+	"sparrow/pkg/tlv"
+	"time"
 )
 
 const url = "127.0.0.1:56883"
 
 func main() {
-	co, err := net.Dial("udp", url)
+	addr, err := net.ResolveUDPAddr("udp", url)
 	if err != nil {
-		fmt.Printf("dial err %s", err.Error())
+		fmt.Println("Can't resolve address: ", err)
+		os.Exit(1)
+	}
+	conn, err := net.DialUDP("udp", nil, addr)
+	if err != nil {
+		fmt.Println("Can't dial: ", err)
+		os.Exit(1)
+	}
+	defer conn.Close()
+	token, _ := hex.DecodeString("1b00f92489298929")
+	payloadHead := protocol.DataHead{
+		Flag:      0,
+		Timestamp: uint64(time.Now().Unix() * 1000),
+	}
+	param1 := []interface{}{uint32(3), float32(1.2), int64(10)}
+	params1, err := tlv.MakeTLVs(param1)
+	if err != nil {
+		fmt.Println(err)
+	}
+	sub1 := protocol.SubData{
+		Head: protocol.SubDataHead{
+			SubDeviceid: uint16(1),
+			PropertyNum: uint16(1),
+			ParamsCount: uint16(len(params1)),
+		},
+		Params: params1,
 	}
-	token := []byte{
-		0x1,
-		0x2,
-		0x3,
-		0x4,
-		0x5,
-		0x6,
-		0x7,
-		0x8,
+	param2 := []interface{}{uint32(4), int64(11)}
+	params2, err := tlv.MakeTLVs(param2)
+	if err != nil {
+		fmt.Println(err)
+	}
+	sub2 := protocol.SubData{
+		Head: protocol.SubDataHead{
+			SubDeviceid: uint16(1),
+			PropertyNum: uint16(2),
+			ParamsCount: uint16(len(params2)),
+		},
+		Params: params2,
+	}
+
+	payload := &protocol.Data{
+		Head:    payloadHead,
+		SubData: []protocol.SubData{},
+	}
+	payload.SubData = append(payload.SubData, sub1)
+	payload.SubData = append(payload.SubData, sub2)
+
+	buf, err := payload.Marshal()
+	if err != nil {
+		if err != nil {
+			fmt.Println(err)
+		}
 	}
 	req := &coap.BaseMessage{
 		Code:      coap.POST,
@@ -29,12 +76,21 @@ func main() {
 		MessageID: 2,
 		//Token:     []byte{0, 1, 0x2, 3, 4, 5, 6, 7, 8, 9, 1, 12, 12, 1, 2, 1, 2, 2},
 		Token:   token,
-		Payload: []byte("PAYLOAD"),
+		Payload: buf,
 	}
-	//req.AddOption(coap.URIPath, "/topic/s")
+	req.AddOption(coap.URIPath, "/s")
 	bytes, err := req.Encode()
 	if err != nil {
 		fmt.Printf("dial err %s", err.Error())
 	}
-	co.Write(bytes)
+	conn.Write(bytes)
+	msg := make([]byte, 1500)
+
+	_, err = conn.Read(msg)
+	if err != nil {
+		fmt.Println(err)
+	}
+
+	fmt.Println("Response:", string(msg))
+
 }