package main import ( "encoding/hex" "encoding/json" "fmt" "log" "net" "os" "sparrow/pkg/coap" "sparrow/pkg/protocol" "sparrow/pkg/server" "sparrow/pkg/tlv" "time" MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) const ( commonCmdGetStatus = uint16(65528) ) // DeviceRegisterArgs device register args type DeviceRegisterArgs struct { ProductKey string `json:"product_key" binding:"required"` DeviceCode string `json:"device_code" binding:"required"` Version string `json:"version" binding:"required"` } // DeviceAuthArgs device authentication args type DeviceAuthArgs struct { DeviceId int64 `json:"device_id" binding:"required"` DeviceSecret string `json:"device_secret" binding:"required"` Protocol string `json:"protocol" binding:"required"` } // Common common response fields type Common struct { Code int `json:"code"` Message string `json:"message"` } // DeviceRegisterData device register response data field type DeviceRegisterData struct { DeviceId int64 `json:"device_id"` DeviceSecret string `json:"device_secret"` DeviceKey string `json:"device_key"` DeviceIdentifier string `json:"device_identifier"` } // DeviceRegisterResponse device register response type DeviceRegisterResponse struct { Common Data DeviceRegisterData `json:"data"` } // DeviceAuthData device auth response data field type DeviceAuthData struct { AccessToken string `json:"access_token"` AccessAddr string `json:"access_addr"` } // DeviceAuthResponse device auth response type DeviceAuthResponse struct { Common Data DeviceAuthData `json:"data"` } // Device a device type Device struct { // Url API URL Url string // basic info ProductKey string DeviceCode string Version string Proto string // private things id int64 secrect string token []byte access string } // NewDevice create a device func NewDevice(url string, productkey string, code string, version string, proto string) *Device { return &Device{ Url: url, ProductKey: productkey, DeviceCode: code, Version: version, Proto: proto, } } // DoRegister device register func (d *Device) DoRegister() error { args := DeviceRegisterArgs{ ProductKey: d.ProductKey, DeviceCode: d.DeviceCode, Version: d.Version, } regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration") request, err := json.Marshal(args) if err != nil { return err } jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil) if err != nil { return err } response := DeviceRegisterResponse{} err = json.Unmarshal(jsonresp, &response) if err != nil { return err } err = CheckHttpsCode(response) if err != nil { return err } d.id = response.Data.DeviceId d.secrect = response.Data.DeviceSecret return nil } // DoLogin device log in func (d *Device) DoLogin() error { args := DeviceAuthArgs{ DeviceId: d.id, DeviceSecret: d.secrect, Protocol: d.Proto, } regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication") request, err := json.Marshal(args) if err != nil { return err } jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil) if err != nil { return err } response := DeviceAuthResponse{} err = json.Unmarshal(jsonresp, &response) if err != nil { return err } err = CheckHttpsCode(response) if err != nil { return err } // ecode hex htoken, err := hex.DecodeString(response.Data.AccessToken) if err != nil { return err } d.token = htoken d.access = response.Data.AccessAddr return nil } func (d *Device) reportStatus(client *MQTT.Client) { for { time.Sleep(2 * time.Second) payloadHead := protocol.DataHead{ Flag: 0, Timestamp: uint64(time.Now().Unix() * 1000), } param := []interface{}{uint16(1), uint16(2)} params, err := tlv.MakeTLVs(param) if err != nil { fmt.Println(err) return } sub := protocol.SubData{ Head: protocol.SubDataHead{ SubDeviceid: uint16(1), PropertyNum: uint16(1), ParamsCount: uint16(len(params)), }, Params: params, } status := protocol.Data{ Head: payloadHead, SubData: []protocol.SubData{}, } status.SubData = append(status.SubData, sub) payload, err := status.Marshal() if err != nil { fmt.Println(err) return } client.Publish("s", 1, false, payload) } } func (d *Device) coapReportStatus(conn *net.UDPConn) { fmt.Println("coapReportStatus") for { payloadHead := protocol.DataHead{ Flag: 0, Timestamp: uint64(time.Now().Unix() * 1000), } param := []interface{}{int64(10)} params, err := tlv.MakeTLVs(param) if err != nil { fmt.Println(err) return } sub := protocol.SubData{ Head: protocol.SubDataHead{ SubDeviceid: uint16(1), PropertyNum: uint16(1), ParamsCount: uint16(len(params)), }, Params: params, } status := protocol.Data{ Head: payloadHead, SubData: []protocol.SubData{}, } status.SubData = append(status.SubData, sub) payload, err := status.Marshal() if err != nil { fmt.Println(err) return } req := &coap.BaseMessage{ Code: coap.POST, Type: coap.CON, Token: d.token, MessageID: 2, Payload: payload, } req.SetPathString(fmt.Sprintf("%d/s", d.id)) reqbytes, _ := req.Encode() conn.Write(reqbytes) fmt.Println("write end") time.Sleep(10 * time.Second) } } func (d *Device) reportEvent(client *MQTT.Client) { for { time.Sleep(3 * time.Second) event := protocol.Event{} params, err := tlv.MakeTLVs([]interface{}{"hello event."}) if err != nil { fmt.Println(err) return } event.Params = params event.Head.No = 1 event.Head.SubDeviceid = 1 event.Head.ParamsCount = uint16(len(params)) payload, err := event.Marshal() if err != nil { fmt.Println(err) return } client.Publish("e", 1, false, payload) } } func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) { status := protocol.Data{} err := status.UnMarshal(msg.Payload()) if err != nil { fmt.Println(err) return } fmt.Println("device receiving status set : ") for _, one := range status.SubData { fmt.Println("subdeviceid : ", one.Head.SubDeviceid) fmt.Println("no : ", one.Head.PropertyNum) fmt.Println("params : ", one.Params) } } func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) { cmd := protocol.Command{} err := cmd.UnMarshal(msg.Payload()) if err != nil { fmt.Println(err) return } switch cmd.Head.No { case commonCmdGetStatus: d.reportStatus(client) default: fmt.Printf("received command : %v: %v", cmd.Head.No, cmd.Params) } } func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %x\n", msg.Payload()) msgtype := msg.Topic() fmt.Println(msgtype) switch msgtype { case "c": d.commandHandler(client, msg) case "s": d.statusHandler(client, msg) default: fmt.Println("unsuported message type :", msgtype) } } // DoAccess device access func (d *Device) DoAccess() error { if d.Proto == "mqtt" { if err := d.doMQTTAccess(); err != nil { fmt.Printf("do mqtt access error:%s", err.Error()) } } else if d.Proto == "coap" { if err := d.doCoAPAccess(); err != nil { fmt.Printf("do coap access error:%s", err.Error()) } } return nil } func (d *Device) doCoAPAccess() error { fmt.Printf("get access addr :%s", d.access) addr, err := net.ResolveUDPAddr("udp", d.access) if err != nil { return err } conn, err := net.DialUDP("udp", nil, addr) if err != nil { return err } go d.coapReportStatus(conn) <-make(chan int) defer conn.Close() return nil } func (d *Device) doMQTTAccess() error { logger := log.New(os.Stdout, "", log.LstdFlags) MQTT.ERROR = logger MQTT.CRITICAL = logger MQTT.WARN = logger MQTT.DEBUG = logger //create a ClientOptions struct setting the broker address, clientid, turn //off trace output and set the default message handler opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access) clientid := fmt.Sprintf("%x", d.id) opts.SetClientID(clientid) opts.SetUsername(clientid) // clientid as username opts.SetPassword(hex.EncodeToString(d.token)) opts.SetKeepAlive(30 * time.Second) opts.SetDefaultPublishHandler(d.messageHandler) //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true}) //create and start a client using the above ClientOptions c := MQTT.NewClient(opts) go func() { if token := c.Connect(); token.Wait() && token.Error() != nil { server.Log.Error(token.Error()) return } }() go d.reportStatus(c) // we just pause here to wait for messages <-make(chan int) defer c.Disconnect(250) return nil }