Selaa lähdekoodia

初步实现CoAP协议接入

lijian 6 vuotta sitten
vanhempi
commit
6977d32d24

+ 82 - 0
pkg/coap/broker.go

@@ -0,0 +1,82 @@
+package coap
+
+import "net"
+
+type Broker struct {
+	m   map[string]muxEntry
+	Mgr *Manager
+}
+type muxEntry struct {
+	h       Handler
+	pattern string
+}
+
+func NewBroker() *Broker {
+	mgr := NewManager()
+	return &Broker{
+		Mgr: mgr,
+		m:   make(map[string]muxEntry),
+	}
+}
+
+func pathMatch(pattern string, path string) bool {
+	if len(pattern) == 0 {
+		return false
+	}
+	n := len(pattern)
+	if pattern[n-1] != '/' {
+		return pattern == path
+	}
+	return len(path) >= n && path[0:n] == pattern
+}
+
+func (b *Broker) match(path string) (h Handler, pattern string) {
+	var n = 0
+	for k, v := range b.m {
+		if !pathMatch(k, path) {
+			continue
+		}
+		if h == nil || len(k) > n {
+			n = len(k)
+			h = v.h
+			pattern = v.pattern
+		}
+	}
+	return
+}
+func notFoundHandler(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
+	if m.IsConfirmable() {
+		return &BaseMessage{
+			Type: ACK,
+			Code: NotFound,
+		}
+	}
+	return nil
+}
+func (b *Broker) ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
+	h, _ := b.match(m.PathString())
+	if h == nil {
+		h, _ = funcHandler(notFoundHandler), ""
+	}
+	return h.ServeCOAP(l, a, m)
+}
+func (b *Broker) Handle(pattern string, handler Handler) {
+	for pattern != "" && pattern[0] == '/' {
+		pattern = pattern[1:]
+	}
+
+	if pattern == "" {
+		panic("http: invalid pattern " + pattern)
+	}
+	if handler == nil {
+		panic("http: nil handler")
+	}
+
+	b.m[pattern] = muxEntry{h: handler, pattern: pattern}
+}
+
+// HandleFunc configures a handler for the given path.
+func (b *Broker) HandleFunc(pattern string,
+	f func(l *net.UDPConn, a *net.UDPAddr, m Message) Message) {
+	b.Handle(pattern, b.Mgr.FuncHandler(f))
+}

+ 9 - 0
pkg/coap/error.go

@@ -0,0 +1,9 @@
+package coap
+
+import "errors"
+
+var (
+	ErrInvalidTokenLen   = errors.New("invalid token length")
+	ErrOptionTooLong     = errors.New("option is too long")
+	ErrOptionGapTooLarge = errors.New("option gap too large")
+)

+ 92 - 0
pkg/coap/manager.go

@@ -0,0 +1,92 @@
+package coap
+
+import (
+	"net"
+	"sparrow/pkg/server"
+	"time"
+)
+
+const (
+	// ResponseTimeout is the amount of time to wait for a
+	// response.
+	ResponseTimeout = time.Second * 2
+	// ResponseRandomFactor is a multiplier for response backoff.
+	ResponseRandomFactor = 1.5
+	// MaxRetransmit is the maximum number of times a message will
+	// be retransmitted.
+	MaxRetransmit = 4
+	maxPktlen     = 1500
+)
+
+type Handler interface {
+	ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message
+}
+type Manager struct {
+	rh funcHandler
+}
+
+func NewManager() *Manager {
+	return &Manager{}
+}
+
+func (m *Manager) FuncHandler(f func(l *net.UDPConn, a *net.UDPAddr, m Message) Message) Handler {
+	m.rh = f
+	return funcHandler(f)
+}
+
+type funcHandler func(l *net.UDPConn, a *net.UDPAddr, m Message) Message
+
+func (f funcHandler) ServeCOAP(l *net.UDPConn, a *net.UDPAddr, m Message) Message {
+	return f(l, a, m)
+}
+
+func (m *Manager) Handler(conn *net.UDPConn) {
+	buf := make([]byte, maxPktlen)
+	for {
+		nr, addr, err := conn.ReadFromUDP(buf)
+		if err != nil {
+			if neterr, ok := err.(net.Error); ok && (neterr.Temporary() || neterr.Timeout()) {
+				time.Sleep(5 * time.Millisecond)
+				continue
+			}
+
+		}
+		tmp := make([]byte, nr)
+		copy(tmp, buf)
+		go m.handlerPacket(conn, tmp, addr)
+	}
+}
+func (m *Manager) handlerPacket(l *net.UDPConn, data []byte, a *net.UDPAddr) {
+	msg, err := ParseMessage(data)
+	if err != nil {
+		server.Log.Error(err)
+	}
+	rv := m.rh.ServeCOAP(l, a, msg)
+	if rv != nil {
+		Transmit(l, a, msg)
+	}
+}
+func Transmit(l *net.UDPConn, a *net.UDPAddr, m Message) error {
+	d, err := m.Encode()
+	if err != nil {
+		return err
+	}
+
+	if a == nil {
+		_, err = l.Write(d)
+	} else {
+		_, err = l.WriteTo(d, a)
+	}
+	return err
+}
+
+// Receive a message.
+func Receive(l *net.UDPConn, buf []byte) (Message, error) {
+	l.SetReadDeadline(time.Now().Add(ResponseTimeout))
+
+	nr, _, err := l.ReadFromUDP(buf)
+	if err != nil {
+		return &BaseMessage{}, err
+	}
+	return ParseMessage(buf[:nr])
+}

+ 570 - 0
pkg/coap/message.go

@@ -0,0 +1,570 @@
+package coap
+
+import (
+	"bytes"
+	"encoding/binary"
+	"errors"
+	"fmt"
+	"reflect"
+	"sort"
+	"strconv"
+	"strings"
+)
+
+// COAPType 代表消息类型
+type COAPType uint8
+
+// MaxTokenSize 最大token size
+const MaxTokenSize = 8
+
+const (
+	// CON 需要被确认的请求
+	CON COAPType = 0
+	// NON 不需要被确认的请求
+	NON COAPType = 1
+	// ACK 应答消息,接受到CON消息的响应
+	ACK COAPType = 2
+	// RST 复位消息,当接收者接受到的消息包含一个错误,接受者解析消息或者不再关心发送者发送的内容,那么复位消息将会被发送
+	RST COAPType = 3
+)
+
+// COAPCode 请求方法的类型
+type COAPCode uint8
+
+// request codes
+const (
+	GET COAPCode = iota + 1
+	POST
+	PUT
+	DELETE
+)
+
+// Response codes
+const (
+	Empty                   COAPCode = 0
+	Created                 COAPCode = 65
+	Deleted                 COAPCode = 66
+	Valid                   COAPCode = 67
+	Changed                 COAPCode = 68
+	Content                 COAPCode = 69
+	Continue                COAPCode = 95
+	BadRequest              COAPCode = 128
+	Unauthorized            COAPCode = 129
+	BadOption               COAPCode = 130
+	Forbidden               COAPCode = 131
+	NotFound                COAPCode = 132
+	MethodNotAllowed        COAPCode = 133
+	NotAcceptable           COAPCode = 134
+	RequestEntityIncomplete COAPCode = 136
+	PreconditionFailed      COAPCode = 140
+	RequestEntityTooLarge   COAPCode = 141
+	UnsupportedMediaType    COAPCode = 143
+	InternalServerError     COAPCode = 160
+	NotImplemented          COAPCode = 161
+	BadGateway              COAPCode = 162
+	ServiceUnavailable      COAPCode = 163
+	GatewayTimeout          COAPCode = 164
+	ProxyingNotSupported    COAPCode = 165
+)
+
+// MediaType 请求消息的媒体类型 对应Content-Format
+type MediaType uint16
+
+// Content formats.
+const (
+	TextPlain MediaType = 0  // text/plain;charset=utf-8
+	AppXML    MediaType = 41 // application/xml
+	AppOctets MediaType = 42 // application/octet-stream
+	AppExi    MediaType = 47 // application/exi
+	AppJSON   MediaType = 50 // application/json
+	AppCBOR   MediaType = 60 //application/cbor (RFC 7049)
+)
+
+func (c MediaType) String() string {
+	switch c {
+	case TextPlain:
+		return "text/plain;charset=utf-8"
+	case AppXML:
+		return "application/xml"
+	case AppOctets:
+		return "application/octet-stream"
+	case AppExi:
+		return "application/exi"
+	case AppJSON:
+		return "application/json"
+	case AppCBOR:
+		return "application/cbor (RFC 7049)"
+	}
+	return "Unknown media type: 0x" + strconv.FormatInt(int64(c), 16)
+}
+
+// OptionID Option编号
+type OptionID uint8
+
+// Option IDs.
+const (
+	IfMatch       OptionID = 1
+	URIHost       OptionID = 3
+	ETag          OptionID = 4
+	IfNoneMatch   OptionID = 5
+	Observe       OptionID = 6
+	URIPort       OptionID = 7
+	LocationPath  OptionID = 8
+	URIPath       OptionID = 11
+	ContentFormat OptionID = 12
+	MaxAge        OptionID = 14
+	URIQuery      OptionID = 15
+	Accept        OptionID = 17
+	LocationQuery OptionID = 20
+	Block2        OptionID = 23
+	Block1        OptionID = 27
+	Size2         OptionID = 28
+	ProxyURI      OptionID = 35
+	ProxyScheme   OptionID = 39
+	Size1         OptionID = 60
+)
+
+// Option value format (RFC7252 section 3.2)
+type valueFormat uint8
+
+const (
+	valueUnknown valueFormat = iota
+	valueEmpty
+	valueOpaque
+	valueUint
+	valueString
+)
+
+type optionDef struct {
+	valueFormat valueFormat
+	minLen      int
+	maxLen      int
+}
+
+var coapOptionDefs = map[OptionID]optionDef{
+	IfMatch:       optionDef{valueFormat: valueOpaque, minLen: 0, maxLen: 8},
+	URIHost:       optionDef{valueFormat: valueString, minLen: 1, maxLen: 255},
+	ETag:          optionDef{valueFormat: valueOpaque, minLen: 1, maxLen: 8},
+	IfNoneMatch:   optionDef{valueFormat: valueEmpty, minLen: 0, maxLen: 0},
+	Observe:       optionDef{valueFormat: valueUint, minLen: 0, maxLen: 3},
+	URIPort:       optionDef{valueFormat: valueUint, minLen: 0, maxLen: 2},
+	LocationPath:  optionDef{valueFormat: valueString, minLen: 0, maxLen: 255},
+	URIPath:       optionDef{valueFormat: valueString, minLen: 0, maxLen: 255},
+	ContentFormat: optionDef{valueFormat: valueUint, minLen: 0, maxLen: 2},
+	MaxAge:        optionDef{valueFormat: valueUint, minLen: 0, maxLen: 4},
+	URIQuery:      optionDef{valueFormat: valueString, minLen: 0, maxLen: 255},
+	Accept:        optionDef{valueFormat: valueUint, minLen: 0, maxLen: 2},
+	LocationQuery: optionDef{valueFormat: valueString, minLen: 0, maxLen: 255},
+	Block2:        optionDef{valueFormat: valueUint, minLen: 0, maxLen: 3},
+	Block1:        optionDef{valueFormat: valueUint, minLen: 0, maxLen: 3},
+	Size2:         optionDef{valueFormat: valueUint, minLen: 0, maxLen: 4},
+	ProxyURI:      optionDef{valueFormat: valueString, minLen: 1, maxLen: 1034},
+	ProxyScheme:   optionDef{valueFormat: valueString, minLen: 1, maxLen: 255},
+	Size1:         optionDef{valueFormat: valueUint, minLen: 0, maxLen: 4},
+}
+
+type option struct {
+	ID    OptionID
+	Value interface{}
+}
+
+func encodeInt(v uint32) []byte {
+	switch {
+	case v == 0:
+		return nil
+	case v < 256:
+		return []byte{byte(v)}
+	case v < 65536:
+		rv := []byte{0, 0}
+		binary.BigEndian.PutUint16(rv, uint16(v))
+		return rv
+	case v < 16777216:
+		rv := []byte{0, 0, 0, 0}
+		binary.BigEndian.PutUint32(rv, uint32(v))
+		return rv[1:]
+	default:
+		rv := []byte{0, 0, 0, 0}
+		binary.BigEndian.PutUint32(rv, uint32(v))
+		return rv
+	}
+}
+
+func decodeInt(b []byte) uint32 {
+	tmp := []byte{0, 0, 0, 0}
+	copy(tmp[4-len(b):], b)
+	return binary.BigEndian.Uint32(tmp)
+}
+
+func (o option) toBytes() []byte {
+	var v uint32
+
+	switch i := o.Value.(type) {
+	case string:
+		return []byte(i)
+	case []byte:
+		return i
+	case MediaType:
+		v = uint32(i)
+	case int:
+		v = uint32(i)
+	case int32:
+		v = uint32(i)
+	case uint:
+		v = uint32(i)
+	case uint32:
+		v = i
+	default:
+		panic(fmt.Errorf("invalid type for option %x: %T (%v)",
+			o.ID, o.Value, o.Value))
+	}
+
+	return encodeInt(v)
+}
+
+type options []option
+
+func (o options) Len() int {
+	return len(o)
+}
+
+func (o options) Less(i, j int) bool {
+	if o[i].ID == o[j].ID {
+		return i < j
+	}
+	return o[i].ID < o[j].ID
+}
+
+func (o options) Swap(i, j int) {
+	o[i], o[j] = o[j], o[i]
+}
+
+func (o options) Remove(oid OptionID) options {
+	idx := 0
+	for i := 0; i < len(o); i++ {
+		if o[i].ID != oid {
+			o[idx] = o[i]
+			idx++
+		}
+	}
+	return o[:idx]
+}
+
+const (
+	extoptByteCode   = 13
+	extoptByteAddend = 13
+	extoptWordCode   = 14
+	extoptWordAddend = 269
+	extoptError      = 15
+)
+
+// Message interface
+type Message interface {
+	Encode() ([]byte, error)
+	Decode(data []byte) error
+	AllOptions() options
+	Option(opid OptionID) interface{}
+	Path() []string
+	PathString() string
+	SetPath([]string)
+	SetPathString(s string)
+	AddOption(opid OptionID, val interface{})
+	RemoveOption(opid OptionID)
+	IsConfirmable() bool
+	OptionStrings(opid OptionID) []string
+	GetMessageID() uint16
+	GetToken() []byte
+}
+
+// BaseMessage COAP 消息体
+type BaseMessage struct {
+	Type      COAPType
+	Code      COAPCode
+	MessageID uint16
+	Token     []byte
+	Payload   []byte
+	Opts      options
+}
+
+func (m *BaseMessage) GetToken() []byte {
+	return m.Token
+}
+func (m *BaseMessage) GetMessageID() uint16 {
+	return m.MessageID
+}
+
+func (m *BaseMessage) Encode() ([]byte, error) {
+	tmpbuf := []byte{0, 0}
+	binary.BigEndian.PutUint16(tmpbuf, m.MessageID)
+	buf := bytes.Buffer{}
+	buf.Write([]byte{
+		(1 << 6) | (uint8(m.Type) << 4) | uint8(0xf&len(m.Token)),
+		byte(m.Code),
+		tmpbuf[0], tmpbuf[1],
+	})
+	buf.Write(m.Token)
+	extendOpt := func(opt int) (int, int) {
+		ext := 0
+		if opt >= extoptByteAddend {
+			if opt >= extoptWordAddend {
+				ext = opt - extoptWordAddend
+				opt = extoptWordCode
+			} else {
+				ext = opt - extoptByteAddend
+				opt = extoptByteCode
+			}
+		}
+		return opt, ext
+	}
+
+	writeOptHeader := func(delta, length int) {
+		d, dx := extendOpt(delta)
+		l, lx := extendOpt(length)
+
+		buf.WriteByte(byte(d<<4) | byte(l))
+
+		tmp := []byte{0, 0}
+		writeExt := func(opt, ext int) {
+			switch opt {
+			case extoptByteCode:
+				buf.WriteByte(byte(ext))
+			case extoptWordCode:
+				binary.BigEndian.PutUint16(tmp, uint16(ext))
+				buf.Write(tmp)
+			}
+		}
+
+		writeExt(d, dx)
+		writeExt(l, lx)
+	}
+
+	sort.Stable(&m.Opts)
+
+	prev := 0
+
+	for _, o := range m.Opts {
+		b := o.toBytes()
+		writeOptHeader(int(o.ID)-prev, len(b))
+		buf.Write(b)
+		prev = int(o.ID)
+	}
+
+	if len(m.Payload) > 0 {
+		buf.Write([]byte{0xff})
+	}
+
+	buf.Write(m.Payload)
+
+	return buf.Bytes(), nil
+}
+
+func (m *BaseMessage) Decode(data []byte) error {
+	if len(data) < 4 {
+		return errors.New("short packet")
+	}
+
+	if data[0]>>6 != 1 {
+		return errors.New("invalid version")
+	}
+
+	m.Type = COAPType((data[0] >> 4) & 0x3)
+	tokenLen := int(data[0] & 0xf)
+	if tokenLen > 8 {
+		return ErrInvalidTokenLen
+	}
+
+	m.Code = COAPCode(data[1])
+	m.MessageID = binary.BigEndian.Uint16(data[2:4])
+
+	if tokenLen > 0 {
+		m.Token = make([]byte, tokenLen)
+	}
+	if len(data) < 4+tokenLen {
+		return errors.New("truncated")
+	}
+	copy(m.Token, data[4:4+tokenLen])
+	b := data[4+tokenLen:]
+	prev := 0
+
+	parseExtOpt := func(opt int) (int, error) {
+		switch opt {
+		case extoptByteCode:
+			if len(b) < 1 {
+				return -1, errors.New("truncated")
+			}
+			opt = int(b[0]) + extoptByteAddend
+			b = b[1:]
+		case extoptWordCode:
+			if len(b) < 2 {
+				return -1, errors.New("truncated")
+			}
+			opt = int(binary.BigEndian.Uint16(b[:2])) + extoptWordAddend
+			b = b[2:]
+		}
+		return opt, nil
+	}
+
+	for len(b) > 0 {
+		if b[0] == 0xff {
+			b = b[1:]
+			break
+		}
+
+		delta := int(b[0] >> 4)
+		length := int(b[0] & 0x0f)
+
+		if delta == extoptError || length == extoptError {
+			return errors.New("unexpected extended option marker")
+		}
+
+		b = b[1:]
+
+		delta, err := parseExtOpt(delta)
+		if err != nil {
+			return err
+		}
+		length, err = parseExtOpt(length)
+		if err != nil {
+			return err
+		}
+
+		if len(b) < length {
+			return errors.New("truncated")
+		}
+
+		oid := OptionID(prev + delta)
+		opval := parseOptionValue(oid, b[:length])
+		b = b[length:]
+		prev = int(oid)
+
+		if opval != nil {
+			m.Opts = append(m.Opts, option{ID: oid, Value: opval})
+		}
+	}
+	m.Payload = b
+	return nil
+}
+
+// AllOptions get message opts
+func (m *BaseMessage) AllOptions() options {
+	return m.Opts
+}
+
+// IsConfirmable 如果是CON类型的消息类型,返回true
+func (m *BaseMessage) IsConfirmable() bool {
+	return m.Type == CON
+}
+
+// Option get option by id
+func (m *BaseMessage) Option(o OptionID) interface{} {
+	for _, v := range m.Opts {
+		if o == v.ID {
+			return v.Value
+		}
+	}
+	return nil
+}
+
+// Options 获取所的option value
+func (m *BaseMessage) Options(o OptionID) []interface{} {
+	var rv []interface{}
+
+	for _, v := range m.Opts {
+		if o == v.ID {
+			rv = append(rv, v.Value)
+		}
+	}
+
+	return rv
+}
+
+// OptionStrings get option strings by id
+func (m *BaseMessage) OptionStrings(o OptionID) []string {
+	var rv []string
+	for _, o := range m.Options(o) {
+		rv = append(rv, o.(string))
+	}
+	return rv
+}
+
+// Path 获取URIPath
+func (m *BaseMessage) Path() []string {
+	return m.OptionStrings(URIPath)
+}
+
+// PathString gets a path as a / separated string.
+func (m *BaseMessage) PathString() string {
+	return strings.Join(m.Path(), "/")
+}
+
+// SetPathString sets a path by a / separated string.
+func (m *BaseMessage) SetPathString(s string) {
+	switch s {
+	case "", "/":
+		//root path is not set as option
+		return
+	default:
+		if s[0] == '/' {
+			s = s[1:]
+		}
+		m.SetPath(strings.Split(s, "/"))
+	}
+}
+
+//RemoveOption remove a given opid
+func (m *BaseMessage) RemoveOption(opID OptionID) {
+	m.Opts = m.Opts.Remove(opID)
+}
+
+// AddOption ``
+func (m *BaseMessage) AddOption(opID OptionID, val interface{}) {
+	iv := reflect.ValueOf(val)
+	if (iv.Kind() == reflect.Slice || iv.Kind() == reflect.Array) &&
+		iv.Type().Elem().Kind() == reflect.String {
+		for i := 0; i < iv.Len(); i++ {
+			m.Opts = append(m.Opts, option{opID, iv.Index(i).Interface()})
+		}
+		return
+	}
+	m.Opts = append(m.Opts, option{opID, val})
+}
+
+// SetPath ``
+func (m *BaseMessage) SetPath(s []string) {
+	m.SetOption(URIPath, s)
+}
+
+// SetOption sets an option, discarding any previous value
+func (m *BaseMessage) SetOption(opID OptionID, val interface{}) {
+	m.RemoveOption(opID)
+	m.AddOption(opID, val)
+}
+func parseOptionValue(optionID OptionID, valueBuf []byte) interface{} {
+	def := coapOptionDefs[optionID]
+	if def.valueFormat == valueUnknown {
+		// Skip unrecognized options (RFC7252 section 5.4.1)
+		return nil
+	}
+	if len(valueBuf) < def.minLen || len(valueBuf) > def.maxLen {
+		// Skip options with illegal value length (RFC7252 section 5.4.3)
+		return nil
+	}
+	switch def.valueFormat {
+	case valueUint:
+		intValue := decodeInt(valueBuf)
+		if optionID == ContentFormat || optionID == Accept {
+			return MediaType(intValue)
+		} else {
+			return intValue
+		}
+	case valueString:
+		return string(valueBuf)
+	case valueOpaque, valueEmpty:
+		return valueBuf
+	}
+	// Skip unrecognized options (should never be reached)
+	return nil
+}
+func ParseMessage(data []byte) (Message, error) {
+	rv := &BaseMessage{}
+	return rv, rv.Decode(data)
+}

+ 59 - 0
pkg/coap/message_test.go

@@ -0,0 +1,59 @@
+package coap
+
+import (
+	"bytes"
+	"fmt"
+	"testing"
+)
+
+func TestMediaTypes(t *testing.T) {
+	types := []interface{}{TextPlain, AppXML, AppOctets, AppExi, AppJSON}
+	exp := "coap.MediaType"
+	for _, typ := range types {
+		if got := fmt.Sprintf("%T", typ); got != exp {
+			t.Errorf("Error on %#v, expected %q, was %q", typ, exp, got)
+		}
+	}
+}
+func TestOptionToBytes(t *testing.T) {
+	tests := []struct {
+		in  interface{}
+		exp []byte
+	}{
+		{"", []byte{}},
+		{[]byte{}, []byte{}},
+		{"x", []byte{'x'}},
+		{[]byte{'x'}, []byte{'x'}},
+		{MediaType(3), []byte{0x3}},
+		{3, []byte{0x3}},
+		{838, []byte{0x3, 0x46}},
+		{int32(838), []byte{0x3, 0x46}},
+		{uint(838), []byte{0x3, 0x46}},
+		{uint32(838), []byte{0x3, 0x46}},
+	}
+
+	for _, test := range tests {
+		op := option{Value: test.in}
+		got := op.toBytes()
+		if !bytes.Equal(test.exp, got) {
+			t.Errorf("Error on %T(%v), got %#v, wanted %#v",
+				test.in, test.in, got, test.exp)
+		}
+	}
+}
+func TestMessageConfirmable(t *testing.T) {
+	tests := []struct {
+		m   Message
+		exp bool
+	}{
+		{&BaseMessage{mtype: CON}, true},
+		{&BaseMessage{mtype: NON}, false},
+	}
+
+	for _, test := range tests {
+		got := test.m.IsConfirmable()
+		if got != test.exp {
+			t.Errorf("Expected %v for %v", test.exp, test.m)
+		}
+	}
+}

+ 11 - 0
pkg/coap/provider.go

@@ -0,0 +1,11 @@
+package coap
+
+import "sparrow/pkg/rpcs"
+
+type Provider interface {
+	ValidateDeviceToken(deviceid uint64, token []byte) error
+	OnDeviceOnline(args rpcs.ArgsGetOnline) error
+	OnDeviceOffline(deviceid uint64) error
+	OnDeviceHeartBeat(deviceid uint64) error
+	OnDeviceMessage(deviceid uint64, msgtype string, message []byte)
+}

+ 2 - 0
pkg/server/config.go

@@ -16,9 +16,11 @@ const (
 	FlagRPCHost  = "rpchost"
 	FlagEtcd     = "etcd"
 	FlagLogLevel = "loglevel"
+	FlagUDPHost  = "udphost"
 )
 
 var (
+	confUDPHost = flag.String(FlagUDPHost, "", "udp server listen address, format ip:port")
 	confTCPHost = flag.String(FlagTCPHost, "", "tcp server listen address, format ip:port")
 	confUseTls  = flag.Bool(FlagUseTls, false, "if tcp server uses tls, default false")
 

+ 2 - 0
pkg/server/errors.go

@@ -12,4 +12,6 @@ const (
 	errWrongHostAddr           = "wrong address : %s"
 	errWrongEtcdPath           = "wrong path in etcd: %s"
 	errServerManagerNotInit    = "sever manager not init!"
+	errUDPHandlerNotRegisterd  = "Start UDP server error : udp handler not registerd!"
+	errUDPListenFailed         = "FATAL: udp listen (%s) failed - %s"
 )

+ 39 - 9
pkg/server/server.go

@@ -26,6 +26,7 @@ type Server struct {
 	tcpsvr    *TCPServer  // TCP server
 	httpsvr   *HTTPServer // HTTP server
 	timertask TimerTask   // timer task
+	udpsvr    *UDPServer
 	// functions
 	svrmgr *ServerManager // service registration&discovery manager
 	rpccli *RPCClient     // rpc client
@@ -69,7 +70,7 @@ func Init(name string) error {
 	return nil
 }
 
-// register TCP handler class
+// RegisterTCPHandler register TCP handler class
 func RegisterTCPHandler(handler TCPHandler) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -81,7 +82,7 @@ func RegisterTCPHandler(handler TCPHandler) error {
 
 		addr, err := fixHostIp(*confTCPHost)
 		if err != nil {
-			return errorf(errWrongHostAddr, confTCPHost)
+			return errorf(errWrongHostAddr, *confTCPHost)
 		}
 
 		serverInstance.tcpsvr = &TCPServer{
@@ -93,7 +94,28 @@ func RegisterTCPHandler(handler TCPHandler) error {
 	return nil
 }
 
-// register HTTP handler class
+// RegisterUDPHandler register UDP handler class
+func RegisterUDPHandler(handler UDPHandler) error {
+	if serverInstance == nil {
+		return errorf(errServerNotInit)
+	}
+	if serverInstance.udpsvr == nil {
+		if *confUDPHost == "" {
+			return errorf(errMissingFlag, FlagUDPHost)
+		}
+		addr, err := fixHostIp(*confUDPHost)
+		if err != nil {
+			return errorf(errWrongHostAddr, *confUDPHost)
+		}
+		serverInstance.udpsvr = &UDPServer{
+			addr:    addr,
+			handler: handler,
+		}
+	}
+	return nil
+}
+
+// RegisterHTTPHandler register HTTP handler class
 func RegisterHTTPHandler(handler http.Handler) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -117,7 +139,7 @@ func RegisterHTTPHandler(handler http.Handler) error {
 	return nil
 }
 
-// register RPC handler class
+// RegisterRPCHandler register RPC handler class
 func RegisterRPCHandler(rcvr interface{}) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -150,7 +172,7 @@ func RegisterRPCHandler(rcvr interface{}) error {
 	return nil
 }
 
-// register timer task
+// RegisterTimerTask register timer task
 func RegisterTimerTask(task TimerTask) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -161,7 +183,7 @@ func RegisterTimerTask(task TimerTask) error {
 	return nil
 }
 
-// rpc call by name
+// RPCCallByName rpc call by name
 func RPCCallByName(serverName string, serverMethod string, args interface{}, reply interface{}) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -170,7 +192,7 @@ func RPCCallByName(serverName string, serverMethod string, args interface{}, rep
 	return serverInstance.rpccli.Call(serverName, serverMethod, args, reply)
 }
 
-// rpc call by host
+// RPCCallByHost rpc call by host
 func RPCCallByHost(host string, serverMethod string, args interface{}, reply interface{}) error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -179,7 +201,7 @@ func RPCCallByHost(host string, serverMethod string, args interface{}, reply int
 	return serverInstance.rpccli.CallHost(host, serverMethod, args, reply)
 }
 
-// get server's hosts by server name and service type
+// GetServerHosts get server's hosts by server name and service type
 func GetServerHosts(serverName string, hostType string) ([]string, error) {
 	if serverInstance == nil {
 		return nil, errorf(errServerNotInit)
@@ -205,7 +227,7 @@ func GetHTTPHost() string {
 	return serverInstance.httpsvr.addr
 }
 
-// start service
+// Run start service
 func Run() error {
 	if serverInstance == nil {
 		return errorf(errServerNotInit)
@@ -227,6 +249,14 @@ func Run() error {
 		Log.Info("starting http server ... OK")
 	}
 
+	if serverInstance.udpsvr != nil {
+		err := serverInstance.udpsvr.Start()
+		if err != nil {
+			return err
+		}
+		Log.Infof("starting udp server ... OK")
+	}
+
 	if serverInstance.rpcsvr != nil {
 		err := serverInstance.rpcsvr.Start()
 		if err != nil {

+ 3 - 1
pkg/server/tcp_server_test.go

@@ -12,7 +12,7 @@ const (
 	testTCPHost = "localhost:12345"
 )
 
-var testEchoData = "hello pando"
+var testEchoData = "hello"
 
 type testEchoHandler struct{}
 
@@ -45,6 +45,7 @@ func validateTCPServer(t *testing.T, addr string) {
 		t.Fatal(err)
 	}
 	gotData := string(buf[:length])
+	t.Log(gotData)
 	if gotData != testEchoData {
 		t.Errorf("echo server test failed. want: %s, got: %s", testEchoData, gotData)
 	}
@@ -68,6 +69,7 @@ func validateTLSServer(t *testing.T, addr string) {
 		t.Fatal(err)
 	}
 	gotData := string(buf[:length])
+	t.Log(gotData)
 	if gotData != testEchoData {
 		t.Errorf("echo server test failed. want: %s, got: %s", testEchoData, gotData)
 	}

+ 52 - 0
pkg/server/udp_server.go

@@ -0,0 +1,52 @@
+package server
+
+import (
+	"net"
+	"runtime"
+
+	"golang.org/x/net/ipv4"
+	"golang.org/x/net/ipv6"
+)
+
+// UDPHandler udp handler
+type UDPHandler interface {
+	Handler(*net.UDPConn)
+}
+
+// UDPServer udp server
+type UDPServer struct {
+	addr    string
+	handler UDPHandler
+}
+
+// Start udp server start
+func (us *UDPServer) Start() error {
+	if us.handler == nil {
+		return errorf(errUDPHandlerNotRegisterd)
+	}
+	var ln *net.UDPConn
+	var err error
+	a, err := net.ResolveUDPAddr("udp", us.addr)
+	if err != nil {
+		return err
+	}
+	ln, err = net.ListenUDP("udp", a)
+	if err != nil {
+		return errorf(errUDPListenFailed, us.addr, err)
+	}
+	Log.Infof("UDP Server Listen on %s", us.addr)
+	if err := setUDPSocketOptions(ln); err != nil {
+		return err
+	}
+	go us.handler.Handler(ln)
+	return nil
+}
+func setUDPSocketOptions(conn *net.UDPConn) error {
+	if runtime.GOOS == "windows" {
+		return nil
+	}
+	if ip4 := conn.LocalAddr().(*net.UDPAddr).IP.To4(); ip4 != nil {
+		return ipv4.NewPacketConn(conn).SetControlMessage(ipv4.FlagDst|ipv4.FlagInterface, true)
+	}
+	return ipv6.NewPacketConn(conn).SetControlMessage(ipv6.FlagDst|ipv6.FlagInterface, true)
+}

+ 64 - 0
pkg/server/udp_server_test.go

@@ -0,0 +1,64 @@
+package server
+
+import (
+	"fmt"
+	"net"
+	"testing"
+	"time"
+)
+
+const (
+	testUDPHost = "127.0.0.1:5638"
+)
+
+type testUDPEchoHandler struct{}
+
+func (a testUDPEchoHandler) Handler(conn *net.UDPConn) {
+	buf := make([]byte, 1024)
+	for {
+		len, addr, err := conn.ReadFromUDP(buf)
+		if err != nil {
+			fmt.Println(err)
+			continue
+		}
+		strData := string(buf)
+		fmt.Println("Received:", strData)
+		len, err = conn.WriteToUDP(buf[:len], addr)
+		if err != nil {
+			fmt.Println(err)
+		}
+	}
+}
+func validateUDPServer(t *testing.T, addr string) {
+	cli, err := net.Dial("udp", testUDPHost)
+	if err != nil {
+		t.Fatal(err)
+	}
+	_, err = cli.Write([]byte(testEchoData))
+	if err != nil {
+		t.Fatal(err)
+	}
+	buf := make([]byte, 1024)
+	length, err := cli.Read(buf)
+	if err != nil {
+		t.Fatal(err)
+	}
+	gotData := string(buf[:length])
+	if gotData != testEchoData {
+		t.Errorf("echo server test failed. want: %s, got: %s", testEchoData, gotData)
+	}
+}
+func TestUDPServer(t *testing.T) {
+	initLog("test", "debug")
+	h := testUDPEchoHandler{}
+	svr := &UDPServer{
+		addr:    testUDPHost,
+		handler: h,
+	}
+	err := svr.Start()
+	if err != nil {
+		t.Fatal(err)
+	}
+	time.Sleep(time.Millisecond * 100)
+	validateUDPServer(t, testUDPHost)
+}

+ 1 - 0
run.sh

@@ -13,6 +13,7 @@ $GOPATH/bin/controller -etcd http://192.168.175.60:2379 -loglevel debug  -rpchos
 $GOPATH/bin/mqttaccess -etcd http://192.168.175.60:2379 -loglevel debug  -rpchost localhost:20030 -tcphost internal:1883  &
 $GOPATH/bin/knowoapi -etcd http://192.168.175.60:2379 -loglevel debug  -httphost localhost:8889 -dbhost 192.168.175.60 -dbname SparrowCloud -dbport 3306 -dbuser SparrowCloud -dbpass 123456 -aeskey ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP &
 $GOPATH/bin/fileaccess -etcd http://192.168.175.60:2379 -loglevel debug  -rpchost localhost:20035 -httphost localhost:9000 &
+$GOPATH/bin/coapaccess -etcd http://192.168.175.60:2379 -loglevel debug  -udphost localhost:5683
 exit 0
 
 

+ 34 - 0
services/coapaccess/access.go

@@ -0,0 +1,34 @@
+package main
+
+import (
+	"net"
+	"sparrow/pkg/coap"
+	"sparrow/pkg/server"
+)
+
+type Access struct {
+	CoAPBroker *coap.Broker
+}
+
+func NewAccess() (*Access, error) {
+	broker := coap.NewBroker()
+	broker.Handle("/topic/s", broker.Mgr.FuncHandler(HandlerDeviceStatus))
+	return &Access{
+		CoAPBroker: broker,
+	}, nil
+}
+
+func HandlerDeviceStatus(l *net.UDPConn, a *net.UDPAddr, m coap.Message) coap.Message {
+	server.Log.Debugf("Got message in handleA: path=%q: %#v from %v", m.GetMessageID(), m, a)
+	if m.IsConfirmable() {
+		res := &coap.BaseMessage{
+			Type:      coap.ACK,
+			Code:      coap.Content,
+			MessageID: m.GetMessageID(),
+			Token:     m.GetToken(),
+			Payload:   []byte("hello to you!"),
+		}
+		return res
+	}
+	return nil
+}

+ 28 - 0
services/coapaccess/main.go

@@ -0,0 +1,28 @@
+package main
+
+import (
+	"sparrow/pkg/server"
+)
+
+func main() {
+	err := server.Init("coapaccess")
+	if err != nil {
+		server.Log.Fatal(err)
+		return
+	}
+	a, err := NewAccess()
+	if err != nil {
+		server.Log.Fatal(err)
+		return
+	}
+	err = server.RegisterUDPHandler(a.CoAPBroker.Mgr)
+	if err != nil {
+		server.Log.Errorf("Register UDP service Error: %s", err)
+		return
+	}
+	// start to run
+	err = server.Run()
+	if err != nil {
+		server.Log.Fatal(err)
+	}
+}

+ 4 - 0
services/coapaccess/readme.md

@@ -0,0 +1,4 @@
+## CoAP协议接入
+
+* 支持TCP、UDP接入
+* 实现Observe resources

+ 1 - 0
services/knowoapi/model/role.go

@@ -29,6 +29,7 @@ func (a *Role) Update(role *models.Role) error {
 
 // Delete delete
 func (a *Role) Delete(role *models.Role) error {
+	a.db.Model(&models.User{}).Where("user_role_id = ?", role.ID).Update(map[string]interface{}{"user_role_id": 1})
 	return a.db.Delete(role).Error
 }
 

+ 1 - 1
services/knowoapi/model/role_test.go

@@ -40,7 +40,7 @@ func TestRoleDelete(t *testing.T) {
 		RoleName: "管理员修改",
 		MenuList: "home",
 	}
-	data.ID = 1
+	data.ID = 2
 	err := role.Delete(data)
 	if err != nil {
 		t.Fatal(err)