123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398 |
- package main
- import (
- "encoding/hex"
- "encoding/json"
- "fmt"
- "log"
- "net"
- "os"
- "sparrow/pkg/coap"
- "sparrow/pkg/protocol"
- "sparrow/pkg/server"
- "sparrow/pkg/tlv"
- "time"
- MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
- )
- const (
- commonCmdGetStatus = uint16(65528)
- )
- // DeviceRegisterArgs device register args
- type DeviceRegisterArgs struct {
- ProductKey string `json:"product_key" binding:"required"`
- DeviceCode string `json:"device_code" binding:"required"`
- Version string `json:"version" binding:"required"`
- }
- // DeviceAuthArgs device authentication args
- type DeviceAuthArgs struct {
- DeviceId int64 `json:"device_id" binding:"required"`
- DeviceSecret string `json:"device_secret" binding:"required"`
- Protocol string `json:"protocol" binding:"required"`
- }
- // Common common response fields
- type Common struct {
- Code int `json:"code"`
- Message string `json:"message"`
- }
- // DeviceRegisterData device register response data field
- type DeviceRegisterData struct {
- DeviceId int64 `json:"device_id"`
- DeviceSecret string `json:"device_secret"`
- DeviceKey string `json:"device_key"`
- DeviceIdentifier string `json:"device_identifier"`
- }
- // DeviceRegisterResponse device register response
- type DeviceRegisterResponse struct {
- Common
- Data DeviceRegisterData `json:"data"`
- }
- // DeviceAuthData device auth response data field
- type DeviceAuthData struct {
- AccessToken string `json:"access_token"`
- AccessAddr string `json:"access_addr"`
- }
- // DeviceAuthResponse device auth response
- type DeviceAuthResponse struct {
- Common
- Data DeviceAuthData `json:"data"`
- }
- // Device a device
- type Device struct {
- // Url API URL
- Url string
- // basic info
- ProductKey string
- DeviceCode string
- Version string
- Proto string
- // private things
- id int64
- secrect string
- token []byte
- access string
- }
- // NewDevice create a device
- func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
- return &Device{
- Url: url,
- ProductKey: productkey,
- DeviceCode: code,
- Version: version,
- Proto: proto,
- }
- }
- // DoRegister device register
- func (d *Device) DoRegister() error {
- args := DeviceRegisterArgs{
- ProductKey: d.ProductKey,
- DeviceCode: d.DeviceCode,
- Version: d.Version,
- }
- regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
- request, err := json.Marshal(args)
- if err != nil {
- return err
- }
- jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
- if err != nil {
- return err
- }
- response := DeviceRegisterResponse{}
- err = json.Unmarshal(jsonresp, &response)
- if err != nil {
- return err
- }
- err = CheckHttpsCode(response)
- if err != nil {
- return err
- }
- d.id = response.Data.DeviceId
- d.secrect = response.Data.DeviceSecret
- return nil
- }
- // DoLogin device log in
- func (d *Device) DoLogin() error {
- args := DeviceAuthArgs{
- DeviceId: d.id,
- DeviceSecret: d.secrect,
- Protocol: d.Proto,
- }
- regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
- request, err := json.Marshal(args)
- if err != nil {
- return err
- }
- jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
- if err != nil {
- return err
- }
- response := DeviceAuthResponse{}
- err = json.Unmarshal(jsonresp, &response)
- if err != nil {
- return err
- }
- err = CheckHttpsCode(response)
- if err != nil {
- return err
- }
- // ecode hex
- htoken, err := hex.DecodeString(response.Data.AccessToken)
- if err != nil {
- return err
- }
- d.token = htoken
- d.access = response.Data.AccessAddr
- return nil
- }
- func (d *Device) reportStatus(client *MQTT.Client) {
- for {
- time.Sleep(2 * time.Second)
- payloadHead := protocol.DataHead{
- Flag: 0,
- Timestamp: uint64(time.Now().Unix() * 1000),
- }
- param := []interface{}{uint16(1), uint16(2)}
- params, err := tlv.MakeTLVs(param)
- if err != nil {
- fmt.Println(err)
- return
- }
- sub := protocol.SubData{
- Head: protocol.SubDataHead{
- SubDeviceid: uint16(1),
- PropertyNum: uint16(1),
- ParamsCount: uint16(len(params)),
- },
- Params: params,
- }
- status := protocol.Data{
- Head: payloadHead,
- SubData: []protocol.SubData{},
- }
- status.SubData = append(status.SubData, sub)
- payload, err := status.Marshal()
- if err != nil {
- fmt.Println(err)
- return
- }
- client.Publish("s", 1, false, payload)
- }
- }
- func (d *Device) coapReportStatus(conn *net.UDPConn) {
- fmt.Println("coapReportStatus")
- for {
- payloadHead := protocol.DataHead{
- Flag: 0,
- Timestamp: uint64(time.Now().Unix() * 1000),
- }
- param := []interface{}{int64(10)}
- params, err := tlv.MakeTLVs(param)
- if err != nil {
- fmt.Println(err)
- return
- }
- sub := protocol.SubData{
- Head: protocol.SubDataHead{
- SubDeviceid: uint16(1),
- PropertyNum: uint16(1),
- ParamsCount: uint16(len(params)),
- },
- Params: params,
- }
- status := protocol.Data{
- Head: payloadHead,
- SubData: []protocol.SubData{},
- }
- status.SubData = append(status.SubData, sub)
- payload, err := status.Marshal()
- if err != nil {
- fmt.Println(err)
- return
- }
- req := &coap.BaseMessage{
- Code: coap.POST,
- Type: coap.CON,
- Token: d.token,
- MessageID: 2,
- Payload: payload,
- }
- req.SetPathString(fmt.Sprintf("%d/s", d.id))
- reqbytes, _ := req.Encode()
- conn.Write(reqbytes)
- fmt.Println("write end")
- time.Sleep(10 * time.Second)
- }
- }
- func (d *Device) reportEvent(client *MQTT.Client) {
- for {
- time.Sleep(3 * time.Second)
- event := protocol.Event{}
- params, err := tlv.MakeTLVs([]interface{}{"hello event."})
- if err != nil {
- fmt.Println(err)
- return
- }
- event.Params = params
- event.Head.No = 1
- event.Head.SubDeviceid = 1
- event.Head.ParamsCount = uint16(len(params))
- payload, err := event.Marshal()
- if err != nil {
- fmt.Println(err)
- return
- }
- client.Publish("e", 1, false, payload)
- }
- }
- func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
- status := protocol.Data{}
- err := status.UnMarshal(msg.Payload())
- if err != nil {
- fmt.Println(err)
- return
- }
- fmt.Println("device receiving status set : ")
- for _, one := range status.SubData {
- fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
- fmt.Println("no : ", one.Head.PropertyNum)
- fmt.Println("params : ", one.Params)
- }
- }
- func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
- cmd := protocol.Command{}
- err := cmd.UnMarshal(msg.Payload())
- if err != nil {
- fmt.Println(err)
- return
- }
- switch cmd.Head.No {
- case commonCmdGetStatus:
- d.reportStatus(client)
- default:
- fmt.Printf("received command : %v: %v", cmd.Head.No, cmd.Params)
- }
- }
- func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
- fmt.Printf("TOPIC: %s\n", msg.Topic())
- fmt.Printf("MSG: %x\n", msg.Payload())
- msgtype := msg.Topic()
- fmt.Println(msgtype)
- switch msgtype {
- case "c":
- d.commandHandler(client, msg)
- case "s":
- d.statusHandler(client, msg)
- default:
- fmt.Println("unsuported message type :", msgtype)
- }
- }
- // DoAccess device access
- func (d *Device) DoAccess() error {
- if d.Proto == "mqtt" {
- if err := d.doMQTTAccess(); err != nil {
- fmt.Printf("do mqtt access error:%s", err.Error())
- }
- } else if d.Proto == "coap" {
- if err := d.doCoAPAccess(); err != nil {
- fmt.Printf("do coap access error:%s", err.Error())
- }
- }
- return nil
- }
- func (d *Device) doCoAPAccess() error {
- fmt.Printf("get access addr :%s", d.access)
- addr, err := net.ResolveUDPAddr("udp", d.access)
- if err != nil {
- return err
- }
- conn, err := net.DialUDP("udp", nil, addr)
- if err != nil {
- return err
- }
- go d.coapReportStatus(conn)
- <-make(chan int)
- defer conn.Close()
- return nil
- }
- func (d *Device) doMQTTAccess() error {
- logger := log.New(os.Stdout, "", log.LstdFlags)
- MQTT.ERROR = logger
- MQTT.CRITICAL = logger
- MQTT.WARN = logger
- MQTT.DEBUG = logger
- //create a ClientOptions struct setting the broker address, clientid, turn
- //off trace output and set the default message handler
- opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
- clientid := fmt.Sprintf("%x", d.id)
- opts.SetClientID(clientid)
- opts.SetUsername(clientid) // clientid as username
- opts.SetPassword(hex.EncodeToString(d.token))
- opts.SetKeepAlive(30 * time.Second)
- opts.SetDefaultPublishHandler(d.messageHandler)
- //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true})
- //create and start a client using the above ClientOptions
- c := MQTT.NewClient(opts)
- go func() {
- if token := c.Connect(); token.Wait() && token.Error() != nil {
- server.Log.Error(token.Error())
- return
- }
- }()
- go d.reportStatus(c)
- // we just pause here to wait for messages
- <-make(chan int)
- defer c.Disconnect(250)
- return nil
- }
|