lijian 6 tahun lalu
induk
melakukan
15f0f1d381

+ 0 - 82
pkg/coap/broker.go

@@ -1,82 +0,0 @@
-package coap
-
-import "net"
-
-type Broker struct {
-	m   map[string]muxEntry
-	Mgr *Manager
-}
-type muxEntry struct {
-	h       Handler
-	pattern string
-}
-
-func NewBroker() *Broker {
-	mgr := NewManager()
-	return &Broker{
-		Mgr: mgr,
-		m:   make(map[string]muxEntry),
-	}
-}
-
-func pathMatch(pattern string, path string) bool {
-	if len(pattern) == 0 {
-		return false
-	}
-	n := len(pattern)
-	if pattern[n-1] != '/' {
-		return pattern == path
-	}
-	return len(path) >= n && path[0:n] == pattern
-}
-
-func (b *Broker) match(path string) (h Handler, pattern string) {
-	var n = 0
-	for k, v := range b.m {
-		if !pathMatch(k, path) {
-			continue
-		}
-		if h == nil || len(k) > n {
-			n = len(k)
-			h = v.h
-			pattern = v.pattern
-		}
-	}
-	return
-}
-func notFoundHandler(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
-	if m.IsConfirmable() {
-		return &BaseMessage{
-			Type: ACK,
-			Code: NotFound,
-		}
-	}
-	return nil
-}
-func (b *Broker) ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
-	h, _ := b.match(m.PathString())
-	if h == nil {
-		h, _ = funcHandler(notFoundHandler), ""
-	}
-	return h.ServeCOAP(l, a, m)
-}
-func (b *Broker) Handle(pattern string, handler Handler) {
-	for pattern != "" && pattern[0] == '/' {
-		pattern = pattern[1:]
-	}
-
-	if pattern == "" {
-		panic("http: invalid pattern " + pattern)
-	}
-	if handler == nil {
-		panic("http: nil handler")
-	}
-
-	b.m[pattern] = muxEntry{h: handler, pattern: pattern}
-}
-
-// HandleFunc configures a handler for the given path.
-func (b *Broker) HandleFunc(pattern string,
-	f func(l *net.UDPConn, a *net.UDPAddr, m Message) Message) {
-	b.Handle(pattern, b.Mgr.FuncHandler(f))
-}

+ 78 - 37
pkg/coap/manager.go

@@ -3,6 +3,7 @@ package coap
 import (
 	"net"
 	"sparrow/pkg/server"
+	"sync/atomic"
 	"time"
 )
 
@@ -14,30 +15,23 @@ const (
 	ResponseRandomFactor = 1.5
 	// MaxRetransmit is the maximum number of times a message will
 	// be retransmitted.
-	MaxRetransmit = 4
-	maxPktlen     = 1500
+	MaxRetransmit     = 4
+	maxPktlen         = 1500
+	maxWorkersCount   = 10000
+	idleWorkerTimeout = 10 * time.Second
 )
 
-type Handler interface {
-	ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message
-}
 type Manager struct {
-	rh funcHandler
-}
-
-func NewManager() *Manager {
-	return &Manager{}
+	queue        chan *Request
+	Provider     Provider
+	workersCount int32
 }
 
-func (m *Manager) FuncHandler(f func(l *net.UDPConn, a *net.UDPAddr, m Message) Message) Handler {
-	m.rh = f
-	return funcHandler(f)
-}
-
-type funcHandler func(l *net.UDPConn, a *net.UDPAddr, m Message) Message
-
-func (f funcHandler) ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
-	return f(l, a, m)
+func NewManager(p Provider) *Manager {
+	return &Manager{
+		Provider: p,
+		queue:    make(chan *Request),
+	}
 }
 
 func (m *Manager) Handler(conn *net.UDPConn) {
@@ -53,31 +47,78 @@ func (m *Manager) Handler(conn *net.UDPConn) {
 		}
 		tmp := make([]byte, nr)
 		copy(tmp, buf)
-		go m.handlerPacket(conn, tmp, addr)
+		msg, err := ParseMessage(tmp)
+		if err != nil {
+			server.Log.Error(err)
+		}
+		m.spawnWorker(&Request{
+			Msg:  msg,
+			Addr: addr,
+			Conn: conn,
+		})
 	}
 }
-func (m *Manager) handlerPacket(l *net.UDPConn, data []byte, a *net.UDPAddr) {
-	msg, err := ParseMessage(data)
-	if err != nil {
-		server.Log.Error(err)
+func (m *Manager) worker(w *Request) {
+	m.serve(w)
+	for {
+		count := atomic.LoadInt32(&m.workersCount)
+		if count > maxWorkersCount {
+			return
+		}
+		if atomic.CompareAndSwapInt32(&m.workersCount, count, count+1) {
+			break
+		}
 	}
-	rv := m.rh.ServeCOAP(l, a, msg)
-	if rv != nil {
-		Transmit(l, a, msg)
+	defer atomic.AddInt32(&m.workersCount, -1)
+	inUse := false
+	timeout := time.NewTimer(idleWorkerTimeout)
+	defer timeout.Stop()
+	for m.workerChannelHandler(inUse, timeout) {
 	}
 }
-func Transmit(l *net.UDPConn, a *net.UDPAddr, m Message) error {
-	d, err := m.Encode()
-	if err != nil {
-		return err
+func (m *Manager) workerChannelHandler(inUse bool, timeout *time.Timer) bool {
+	select {
+	case w, ok := <-m.queue:
+		if !ok {
+			return false
+		}
+		inUse = true
+		m.serve(w)
+	case <-timeout.C:
+		if !inUse {
+			return false
+		}
+		inUse = false
+		timeout.Reset(idleWorkerTimeout)
 	}
-
-	if a == nil {
-		_, err = l.Write(d)
-	} else {
-		_, err = l.WriteTo(d, a)
+	return true
+}
+func (m *Manager) serve(w *Request) {
+	msg := w.Msg
+	server.Log.Debugf("get packet:%#v", msg)
+	// check token
+	if msg.IsConfirmable() {
+		token := msg.GetToken()
+		if len(token) != 8 {
+			res := &BaseMessage{
+				Code:      Unauthorized,
+				Type:      ACK,
+				MessageID: msg.GetMessageID(),
+				Token:     msg.GetToken(),
+			}
+			bytes, _ := res.Encode()
+			w.Conn.WriteTo(bytes, w.Addr)
+			server.Log.Debugf("token length error, size :%d", len(token))
+			return
+		}
+	}
+}
+func (m *Manager) spawnWorker(req *Request) {
+	select {
+	case m.queue <- req:
+	default:
+		go m.serve(req)
 	}
-	return err
 }
 
 // Receive a message.

+ 4 - 0
pkg/coap/message.go

@@ -273,6 +273,7 @@ type Message interface {
 	OptionStrings(opid OptionID) []string
 	GetMessageID() uint16
 	GetToken() []byte
+	GetCode() COAPCode
 }
 
 // BaseMessage COAP 消息体
@@ -291,6 +292,9 @@ func (m *BaseMessage) GetToken() []byte {
 func (m *BaseMessage) GetMessageID() uint16 {
 	return m.MessageID
 }
+func (m *BaseMessage) GetCode() COAPCode {
+	return m.Code
+}
 
 func (m *BaseMessage) Encode() ([]byte, error) {
 	tmpbuf := []byte{0, 0}

+ 9 - 0
pkg/coap/request.go

@@ -0,0 +1,9 @@
+package coap
+
+import "net"
+
+type Request struct {
+	Msg  Message
+	Conn *net.UDPConn
+	Addr *net.UDPAddr
+}

+ 2 - 2
pkg/generator/token_gen.go

@@ -5,10 +5,10 @@ import (
 )
 
 const (
-	ranTokendByteLength = 16
+	ranTokendByteLength = 8
 )
 
-// gen random token bytes
+// GenRandomToken gen random token bytes
 func GenRandomToken() ([]byte, error) {
 	ranbuf := make([]byte, ranTokendByteLength)
 	_, err := rand.Read(ranbuf)

+ 13 - 0
pkg/mqtt/utils_test.go

@@ -0,0 +1,13 @@
+package mqtt
+
+import "testing"
+
+func TestClientIdToDeviceId(t *testing.T) {
+	id, err := ClientIdToDeviceId("ff")
+	if err != nil {
+		t.Fatal(err)
+	}
+	if id != 255 {
+		t.Fatalf("result not 255,but %d", id)
+	}
+}

+ 2 - 2
pkg/protocol/structure.go

@@ -7,7 +7,7 @@ import (
 type CommandEventHead struct {
 	Flag        uint8
 	Timestamp   uint64
-	Token       [16]byte
+	Token       [8]byte
 	SubDeviceid uint16
 	No          uint16
 	Priority    uint16
@@ -27,7 +27,7 @@ type Event struct {
 type DataHead struct {
 	Flag      uint8
 	Timestamp uint64
-	Token     [16]byte
+	Token     [8]byte
 }
 
 type Data struct {

+ 1 - 3
pkg/server/server.go

@@ -278,7 +278,7 @@ func Run() error {
 		}
 		err = serverInstance.svrmgr.UpdateServerHosts()
 		if err != nil {
-			Log.Error("UpdateServerHosts error: %s", err)
+			Log.Errorf("UpdateServerHosts error: %s", err)
 		} else {
 			Log.Info("UpdateServerHosts Success")
 		}
@@ -290,6 +290,4 @@ func Run() error {
 
 		time.Sleep(60 * time.Second)
 	}
-
-	return nil
 }

+ 5 - 2
pkg/server/server_manager.go

@@ -13,12 +13,14 @@ import (
 )
 
 const (
-	EtcdServersPrefix    = "/pando/servers/"
+	// EtcdServersPrefix prefix
+	EtcdServersPrefix    = "/knowo/servers/"
 	EtcdServersPrefixCnt = 2
 	EnvTCPProxy          = "TCP_PROXY_ADDR"
 	EnvHTTPProxy         = "HTTP_PROXY_ADDR"
 )
 
+// ServerManager server manager
 type ServerManager struct {
 	serverName string
 	// servername -> hosttype -> hostlist
@@ -27,6 +29,7 @@ type ServerManager struct {
 	etcdHosts  []string
 }
 
+// NewServerManager new server manager
 // etcd hosts is config as http://ip1:port1;http://ip2:port2;http://ip3:port3
 func NewServerManager(name string, etcd string) (*ServerManager, error) {
 	if etcd == "" {
@@ -80,7 +83,7 @@ func (mgr *ServerManager) RegisterServer() error {
 		return err
 	}
 	// print common key info
-	Log.Infof("RegisterServer is done. Metadata is %q\n", response)
+	Log.Infof("RegisterServer is done. Metadata is %v\n", response)
 
 	return nil
 }

+ 4 - 21
services/coapaccess/access.go

@@ -1,34 +1,17 @@
 package main
 
 import (
-	"net"
 	"sparrow/pkg/coap"
-	"sparrow/pkg/server"
 )
 
 type Access struct {
-	CoAPBroker *coap.Broker
+	CoAPManager *coap.Manager
 }
 
 func NewAccess() (*Access, error) {
-	broker := coap.NewBroker()
-	broker.Handle("/topic/s", broker.Mgr.FuncHandler(HandlerDeviceStatus))
+	p := NewCoAPProvider()
+	mgr := coap.NewManager(p)
 	return &Access{
-		CoAPBroker: broker,
+		CoAPManager: mgr,
 	}, nil
 }
-
-func HandlerDeviceStatus(l *net.UDPConn, a *net.UDPAddr, m coap.Message) coap.Message {
-	server.Log.Debugf("Got message in handleA: path=%q: %#v from %v", m.GetMessageID(), m, a)
-	if m.IsConfirmable() {
-		res := &coap.BaseMessage{
-			Type:      coap.ACK,
-			Code:      coap.Content,
-			MessageID: m.GetMessageID(),
-			Token:     m.GetToken(),
-			Payload:   []byte("hello to you!"),
-		}
-		return res
-	}
-	return nil
-}

+ 62 - 0
services/coapaccess/coap_provider.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"sparrow/pkg/rpcs"
+	"sparrow/pkg/server"
+)
+
+type CoAPProvider struct {
+}
+
+func NewCoAPProvider() *CoAPProvider {
+	return &CoAPProvider{}
+}
+func (mp *CoAPProvider) ValidateDeviceToken(deviceid uint64, token []byte) error {
+	args := rpcs.ArgsValidateDeviceAccessToken{
+		Id:          deviceid,
+		AccessToken: token,
+	}
+	reply := rpcs.ReplyValidateDeviceAccessToken{}
+	err := server.RPCCallByName("devicemanager", "DeviceManager.ValidateDeviceAccessToken", args, &reply)
+	if err != nil {
+		server.Log.Errorf("validate device token error. deviceid : %v, token : %v, error: %v", deviceid, token, err)
+		return err
+	}
+	return nil
+}
+func (mp *CoAPProvider) OnDeviceOnline(args rpcs.ArgsGetOnline) error {
+	reply := rpcs.ReplyGetOnline{}
+	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOnline", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device online error. args: %v, error: %v", args, err)
+	}
+
+	return err
+}
+func (mp *CoAPProvider) OnDeviceOffline(deviceid uint64) error {
+	args := rpcs.ArgsGetOffline{
+		Id: deviceid,
+	}
+	reply := rpcs.ReplyGetOffline{}
+	err := server.RPCCallByName("devicemanager", "DeviceManager.GetOffline", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device offline error. deviceid: %v, error: %v", deviceid, err)
+	}
+
+	return err
+}
+func (mp *CoAPProvider) OnDeviceHeartBeat(deviceid uint64) error {
+	args := rpcs.ArgsDeviceId{
+		Id: deviceid,
+	}
+	reply := rpcs.ReplyHeartBeat{}
+	err := server.RPCCallByName("devicemanager", "DeviceManager.HeartBeat", args, &reply)
+	if err != nil {
+		server.Log.Errorf("device heartbeat error. deviceid: %v, error: %v", deviceid, err)
+	}
+	return err
+}
+func (mp *CoAPProvider) OnDeviceMessage(deviceid uint64, msgtype string, message []byte) {
+	server.Log.Infof("device {%v} message {%v} : %x", deviceid, msgtype, message)
+
+}

+ 1 - 1
services/coapaccess/main.go

@@ -15,7 +15,7 @@ func main() {
 		server.Log.Fatal(err)
 		return
 	}
-	err = server.RegisterUDPHandler(a.CoAPBroker.Mgr)
+	err = server.RegisterUDPHandler(a.CoAPManager)
 	if err != nil {
 		server.Log.Errorf("Register UDP service Error: %s", err)
 		return

TEMPAT SAMPAH
tests/coaptool/coaptool


+ 40 - 0
tests/coaptool/main.go

@@ -0,0 +1,40 @@
+package main
+
+import (
+	"fmt"
+	"net"
+	"sparrow/pkg/coap"
+)
+
+const url = "127.0.0.1:56883"
+
+func main() {
+	co, err := net.Dial("udp", url)
+	if err != nil {
+		fmt.Printf("dial err %s", err.Error())
+	}
+	token := []byte{
+		0x1,
+		0x2,
+		0x3,
+		0x4,
+		0x5,
+		0x6,
+		0x7,
+		0x8,
+	}
+	req := &coap.BaseMessage{
+		Code:      coap.POST,
+		Type:      coap.CON,
+		MessageID: 2,
+		//Token:     []byte{0, 1, 0x2, 3, 4, 5, 6, 7, 8, 9, 1, 12, 12, 1, 2, 1, 2, 2},
+		Token:   token,
+		Payload: []byte("PAYLOAD"),
+	}
+	//req.AddOption(coap.URIPath, "/topic/s")
+	bytes, err := req.Encode()
+	if err != nil {
+		fmt.Printf("dial err %s", err.Error())
+	}
+	co.Write(bytes)
+}

TEMPAT SAMPAH
tests/device/device