package main import ( "encoding/hex" "encoding/json" "fmt" "github.com/gogf/gf/encoding/gjson" "log" "math/rand" "net" "os" "sparrow/pkg/coap" "sparrow/pkg/klink" "sparrow/pkg/protocol" "sparrow/pkg/tlv" "time" MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git" ) const ( commonCmdGetStatus = uint16(65528) ) // DeviceRegisterArgs device register args type DeviceRegisterArgs struct { ProductKey string `json:"product_key" binding:"required"` DeviceCode string `json:"device_code" binding:"required"` Version string `json:"version" binding:"required"` } // DeviceAuthArgs device authentication args type DeviceAuthArgs struct { DeviceId int64 `json:"device_id" binding:"required"` DeviceSecret string `json:"device_secret" binding:"required"` Protocol string `json:"protocol" binding:"required"` } // Common common response fields type Common struct { Code int `json:"code"` Message string `json:"message"` } // DeviceRegisterData device register response data field type DeviceRegisterData struct { DeviceId int64 `json:"device_id"` DeviceSecret string `json:"device_secret"` DeviceKey string `json:"device_key"` DeviceIdentifier string `json:"device_identifier"` } // DeviceRegisterResponse device register response type DeviceRegisterResponse struct { Common Data DeviceRegisterData `json:"data"` } // DeviceAuthData device auth response data field type DeviceAuthData struct { AccessToken string `json:"access_token"` AccessAddr string `json:"access_addr"` } // DeviceAuthResponse device auth response type DeviceAuthResponse struct { Common Data DeviceAuthData `json:"data"` } // Device a device type Device struct { // Url API URL Url string // basic info ProductKey string DeviceCode string Version string Proto string // private things id int64 secrect string token []byte access string } // NewDevice create a device func NewDevice(url string, productkey string, code string, version string, proto string) *Device { return &Device{ Url: url, ProductKey: productkey, DeviceCode: code, Version: version, Proto: proto, } } // DoRegister device register func (d *Device) DoRegister() error { args := DeviceRegisterArgs{ ProductKey: d.ProductKey, DeviceCode: d.DeviceCode, Version: d.Version, } regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration") request, err := json.Marshal(args) if err != nil { return err } jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil) if err != nil { return err } response := DeviceRegisterResponse{} err = json.Unmarshal(jsonresp, &response) if err != nil { return err } err = CheckHttpsCode(response) if err != nil { return err } d.id = response.Data.DeviceId d.secrect = response.Data.DeviceSecret return nil } // DoLogin device log in func (d *Device) DoLogin() error { args := DeviceAuthArgs{ DeviceId: d.id, DeviceSecret: d.secrect, Protocol: d.Proto, } regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication") request, err := json.Marshal(args) if err != nil { return err } jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil) if err != nil { return err } response := DeviceAuthResponse{} err = json.Unmarshal(jsonresp, &response) if err != nil { return err } err = CheckHttpsCode(response) if err != nil { return err } // ecode hex htoken, err := hex.DecodeString(response.Data.AccessToken) if err != nil { return err } d.token = htoken d.access = response.Data.AccessAddr return nil } func (d *Device) reportStatus(client *MQTT.Client) { for { time.Sleep(2 * time.Second) rand.Seed(time.Now().UnixNano()) r := rand.Intn(100) dd := &klink.DevSend{ Action: "devSend", MsgId: 1, SubDeviceId: "1", Timestamp: time.Now().Unix(), DeviceCode: "5566", Data: &klink.DevSendData{ Cmd: "powerOn", Params: gjson.New(fmt.Sprintf(`{"power":%d}`, r)), }, } payload, err := dd.Marshal() if err != nil { continue } client.Publish("s", 1, false, payload) } } func (d *Device) reportStatus2(client *MQTT.Client) { for { time.Sleep(2 * time.Second) rand.Seed(time.Now().UnixNano()) r := rand.Intn(100) dd := &klink.DevSend{ Action: "devSend", MsgId: 1, SubDeviceId: "1", Timestamp: time.Now().Unix(), DeviceCode: "5566", Data: &klink.DevSendData{ Cmd: "status", Params: gjson.New(fmt.Sprintf(`{"fanlevel":%d}`, r)), }, } payload, err := dd.Marshal() if err != nil { continue } client.Publish("s", 1, false, payload) } } func (d *Device) coapReportStatus(conn *net.UDPConn) { fmt.Println("coapReportStatus") for { payloadHead := protocol.DataHead{ Flag: 0, Timestamp: uint64(time.Now().Unix() * 1000), } param := []interface{}{int64(10)} params, err := tlv.MakeTLVs(param) if err != nil { fmt.Println(err) return } sub := protocol.SubData{ Head: protocol.SubDataHead{ SubDeviceid: uint16(1), PropertyNum: uint16(1), ParamsCount: uint16(len(params)), }, Params: params, } status := protocol.Data{ Head: payloadHead, SubData: []protocol.SubData{}, } status.SubData = append(status.SubData, sub) payload, err := status.Marshal() if err != nil { fmt.Println(err) return } req := &coap.BaseMessage{ Code: coap.POST, Type: coap.CON, Token: d.token, MessageID: 2, Payload: payload, } req.SetPathString(fmt.Sprintf("%d/s", d.id)) reqbytes, _ := req.Encode() conn.Write(reqbytes) fmt.Println("write end") time.Sleep(10 * time.Second) } } func (d *Device) reportEvent(client *MQTT.Client) { for { time.Sleep(2 * time.Second) dd := &klink.DevSend{ Action: "devSend", MsgId: 1, SubDeviceId: "1", Timestamp: time.Now().Unix(), DeviceCode: "5566", Data: &klink.DevSendData{ Cmd: "temperatureEvent", Params: gjson.New(`{"temperature":100, "reason": 0}`), }, } payload, err := dd.Marshal() if err != nil { continue } client.Publish("e", 1, false, payload) } } func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) { status := protocol.Data{} err := status.UnMarshal(msg.Payload()) if err != nil { fmt.Println(err) return } fmt.Println("device receiving status set : ") for _, one := range status.SubData { fmt.Println("subdeviceid : ", one.Head.SubDeviceid) fmt.Println("no : ", one.Head.PropertyNum) fmt.Println("params : ", one.Params) } } // 子设备上线 func (d *Device) subDeviceLogin(client *MQTT.Client) { for { dd := &klink.DevLogin{ Action: "devLogin", MsgId: 1, SubDeviceId: "5566-0", Timestamp: time.Now().Unix(), DeviceCode: "5566", } payload, err := dd.Marshal() if err != nil { return } client.Publish("s", 1, false, payload) time.Sleep(3 * time.Second) } } // 子设备上线 func (d *Device) deviceNetConfig(client *MQTT.Client) { j := `{ "action": "devNetConfig", "timestamp": 12312312, "deviceCode":"deviceCode", "md5": "0FE264CC7DC9AFFD7E61E4ABCD1F3354" }` jj := gjson.New(j) client.Publish("s", 1, false, jj.MustToJson()) } func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) { j, err := gjson.DecodeToJson(msg.Payload()) if err != nil { panic("错误的报文格式") } fmt.Printf("%v", j.MustToJsonString()) } func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) { fmt.Printf("TOPIC: %s\n", msg.Topic()) fmt.Printf("MSG: %x\n", msg.Payload()) msgtype := msg.Topic() fmt.Println(msgtype) switch msgtype { case "c": d.commandHandler(client, msg) case "s": d.statusHandler(client, msg) default: fmt.Println("unsuported message type :", msgtype) } } // DoAccess device access func (d *Device) DoAccess() error { if d.Proto == "mqtt" { if err := d.doMQTTAccess(); err != nil { fmt.Printf("do mqtt access error:%s", err.Error()) } } else if d.Proto == "coap" { if err := d.doCoAPAccess(); err != nil { fmt.Printf("do coap access error:%s", err.Error()) } } return nil } func (d *Device) doCoAPAccess() error { fmt.Printf("get access addr :%s", d.access) addr, err := net.ResolveUDPAddr("udp", d.access) if err != nil { return err } conn, err := net.DialUDP("udp", nil, addr) if err != nil { return err } go d.coapReportStatus(conn) <-make(chan int) defer conn.Close() return nil } func (d *Device) doMQTTAccess() error { logger := log.New(os.Stdout, "", log.LstdFlags) //MQTT.ERROR = logger //MQTT.CRITICAL = logger //MQTT.WARN = logger MQTT.DEBUG = logger //create a ClientOptions struct setting the broker address, clientid, turn //off trace output and set the default message handler opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access) clientid := fmt.Sprintf("%x", d.id) opts.SetClientID(clientid) opts.SetUsername(clientid) // clientid as username opts.SetPassword(hex.EncodeToString(d.token)) opts.SetKeepAlive(30 * time.Second) //// process key files //cert, err := tls.LoadX509KeyPair(*confCAFile, *confKeyFile) //if err != nil { // panic(err) //} //opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true}) opts.SetDefaultPublishHandler(d.messageHandler) //create and start a client using the above ClientOptions c := MQTT.NewClient(opts) go func() { if token := c.Connect(); token.Wait() && token.Error() != nil { return } }() // 子设备上线 //d.deviceNetConfig(c) //go d.subDeviceLogin(c) //go d.reportStatus(c) //go d.reportStatus2(c) go d.reportEvent(c) // we just pause here to wait for messages <-make(chan int) defer c.Disconnect(250) return nil }