device.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package main
  2. import (
  3. "encoding/hex"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net"
  8. "os"
  9. "sparrow/pkg/coap"
  10. "sparrow/pkg/protocol"
  11. "sparrow/pkg/tlv"
  12. "time"
  13. MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
  14. )
  15. const (
  16. commonCmdGetStatus = uint16(65528)
  17. )
  18. // DeviceRegisterArgs device register args
  19. type DeviceRegisterArgs struct {
  20. ProductKey string `json:"product_key" binding:"required"`
  21. DeviceCode string `json:"device_code" binding:"required"`
  22. Version string `json:"version" binding:"required"`
  23. }
  24. // DeviceAuthArgs device authentication args
  25. type DeviceAuthArgs struct {
  26. DeviceId int64 `json:"device_id" binding:"required"`
  27. DeviceSecret string `json:"device_secret" binding:"required"`
  28. Protocol string `json:"protocol" binding:"required"`
  29. }
  30. // Common common response fields
  31. type Common struct {
  32. Code int `json:"code"`
  33. Message string `json:"message"`
  34. }
  35. // DeviceRegisterData device register response data field
  36. type DeviceRegisterData struct {
  37. DeviceId int64 `json:"device_id"`
  38. DeviceSecret string `json:"device_secret"`
  39. DeviceKey string `json:"device_key"`
  40. DeviceIdentifier string `json:"device_identifier"`
  41. }
  42. // DeviceRegisterResponse device register response
  43. type DeviceRegisterResponse struct {
  44. Common
  45. Data DeviceRegisterData `json:"data"`
  46. }
  47. // DeviceAuthData device auth response data field
  48. type DeviceAuthData struct {
  49. AccessToken string `json:"access_token"`
  50. AccessAddr string `json:"access_addr"`
  51. }
  52. // DeviceAuthResponse device auth response
  53. type DeviceAuthResponse struct {
  54. Common
  55. Data DeviceAuthData `json:"data"`
  56. }
  57. // Device a device
  58. type Device struct {
  59. // Url API URL
  60. Url string
  61. // basic info
  62. ProductKey string
  63. DeviceCode string
  64. Version string
  65. Proto string
  66. // private things
  67. id int64
  68. secrect string
  69. token []byte
  70. access string
  71. }
  72. // NewDevice create a device
  73. func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
  74. return &Device{
  75. Url: url,
  76. ProductKey: productkey,
  77. DeviceCode: code,
  78. Version: version,
  79. Proto: proto,
  80. }
  81. }
  82. // DoRegister device register
  83. func (d *Device) DoRegister() error {
  84. args := DeviceRegisterArgs{
  85. ProductKey: d.ProductKey,
  86. DeviceCode: d.DeviceCode,
  87. Version: d.Version,
  88. }
  89. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
  90. request, err := json.Marshal(args)
  91. if err != nil {
  92. return err
  93. }
  94. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  95. if err != nil {
  96. return err
  97. }
  98. response := DeviceRegisterResponse{}
  99. err = json.Unmarshal(jsonresp, &response)
  100. if err != nil {
  101. return err
  102. }
  103. err = CheckHttpsCode(response)
  104. if err != nil {
  105. return err
  106. }
  107. d.id = response.Data.DeviceId
  108. d.secrect = response.Data.DeviceSecret
  109. return nil
  110. }
  111. // DoLogin device log in
  112. func (d *Device) DoLogin() error {
  113. args := DeviceAuthArgs{
  114. DeviceId: d.id,
  115. DeviceSecret: d.secrect,
  116. Protocol: d.Proto,
  117. }
  118. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
  119. request, err := json.Marshal(args)
  120. if err != nil {
  121. return err
  122. }
  123. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  124. if err != nil {
  125. return err
  126. }
  127. response := DeviceAuthResponse{}
  128. err = json.Unmarshal(jsonresp, &response)
  129. if err != nil {
  130. return err
  131. }
  132. err = CheckHttpsCode(response)
  133. if err != nil {
  134. return err
  135. }
  136. // ecode hex
  137. htoken, err := hex.DecodeString(response.Data.AccessToken)
  138. if err != nil {
  139. return err
  140. }
  141. d.token = htoken
  142. d.access = response.Data.AccessAddr
  143. return nil
  144. }
  145. func (d *Device) reportStatus(client *MQTT.Client) {
  146. for {
  147. time.Sleep(10 * time.Second)
  148. payloadHead := protocol.DataHead{
  149. Flag: 0,
  150. Timestamp: uint64(time.Now().Unix() * 1000),
  151. }
  152. param := []interface{}{1}
  153. params, err := tlv.MakeTLVs(param)
  154. if err != nil {
  155. fmt.Println(err)
  156. return
  157. }
  158. sub := protocol.SubData{
  159. Head: protocol.SubDataHead{
  160. SubDeviceid: uint16(1),
  161. PropertyNum: uint16(1),
  162. ParamsCount: uint16(len(params)),
  163. },
  164. Params: params,
  165. }
  166. status := protocol.Data{
  167. Head: payloadHead,
  168. SubData: []protocol.SubData{},
  169. }
  170. status.SubData = append(status.SubData, sub)
  171. payload, err := status.Marshal()
  172. if err != nil {
  173. fmt.Println(err)
  174. return
  175. }
  176. client.Publish("s", 1, false, payload)
  177. }
  178. }
  179. func (d *Device) coapReportStatus(conn *net.UDPConn) {
  180. fmt.Println("coapReportStatus")
  181. for {
  182. payloadHead := protocol.DataHead{
  183. Flag: 0,
  184. Timestamp: uint64(time.Now().Unix() * 1000),
  185. }
  186. param := []interface{}{int64(10)}
  187. params, err := tlv.MakeTLVs(param)
  188. if err != nil {
  189. fmt.Println(err)
  190. return
  191. }
  192. sub := protocol.SubData{
  193. Head: protocol.SubDataHead{
  194. SubDeviceid: uint16(1),
  195. PropertyNum: uint16(1),
  196. ParamsCount: uint16(len(params)),
  197. },
  198. Params: params,
  199. }
  200. status := protocol.Data{
  201. Head: payloadHead,
  202. SubData: []protocol.SubData{},
  203. }
  204. status.SubData = append(status.SubData, sub)
  205. payload, err := status.Marshal()
  206. if err != nil {
  207. fmt.Println(err)
  208. return
  209. }
  210. req := &coap.BaseMessage{
  211. Code: coap.POST,
  212. Type: coap.CON,
  213. Token: d.token,
  214. MessageID: 2,
  215. Payload: payload,
  216. }
  217. req.SetPathString(fmt.Sprintf("%d/s", d.id))
  218. reqbytes, _ := req.Encode()
  219. conn.Write(reqbytes)
  220. fmt.Println("write end")
  221. time.Sleep(10 * time.Second)
  222. }
  223. }
  224. func (d *Device) reportEvent(client *MQTT.Client) {
  225. for {
  226. time.Sleep(3 * time.Second)
  227. event := protocol.Event{}
  228. params, err := tlv.MakeTLVs([]interface{}{"hello event."})
  229. if err != nil {
  230. fmt.Println(err)
  231. return
  232. }
  233. event.Params = params
  234. event.Head.No = 1
  235. event.Head.SubDeviceid = 1
  236. event.Head.ParamsCount = uint16(len(params))
  237. payload, err := event.Marshal()
  238. if err != nil {
  239. fmt.Println(err)
  240. return
  241. }
  242. client.Publish("e", 1, false, payload)
  243. }
  244. }
  245. func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
  246. status := protocol.Data{}
  247. err := status.UnMarshal(msg.Payload())
  248. if err != nil {
  249. fmt.Println(err)
  250. return
  251. }
  252. fmt.Println("device receiving status set : ")
  253. for _, one := range status.SubData {
  254. fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
  255. fmt.Println("no : ", one.Head.PropertyNum)
  256. fmt.Println("params : ", one.Params)
  257. }
  258. }
  259. func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
  260. cmd := protocol.Command{}
  261. err := cmd.UnMarshal(msg.Payload())
  262. if err != nil {
  263. fmt.Println(err)
  264. return
  265. }
  266. switch cmd.Head.No {
  267. case commonCmdGetStatus:
  268. d.reportStatus(client)
  269. default:
  270. fmt.Printf("received command : %v: %v", cmd.Head.No, cmd.Params)
  271. }
  272. }
  273. func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
  274. fmt.Printf("TOPIC: %s\n", msg.Topic())
  275. fmt.Printf("MSG: %x\n", msg.Payload())
  276. msgtype := msg.Topic()
  277. fmt.Println(msgtype)
  278. switch msgtype {
  279. case "c":
  280. d.commandHandler(client, msg)
  281. case "s":
  282. d.statusHandler(client, msg)
  283. default:
  284. fmt.Println("unsuported message type :", msgtype)
  285. }
  286. }
  287. // DoAccess device access
  288. func (d *Device) DoAccess() error {
  289. if d.Proto == "mqtt" {
  290. if err := d.doMQTTAccess(); err != nil {
  291. fmt.Printf("do mqtt access error:%s", err.Error())
  292. }
  293. } else if d.Proto == "coap" {
  294. if err := d.doCoAPAccess(); err != nil {
  295. fmt.Printf("do coap access error:%s", err.Error())
  296. }
  297. }
  298. return nil
  299. }
  300. func (d *Device) doCoAPAccess() error {
  301. fmt.Printf("get access addr :%s", d.access)
  302. addr, err := net.ResolveUDPAddr("udp", d.access)
  303. if err != nil {
  304. return err
  305. }
  306. conn, err := net.DialUDP("udp", nil, addr)
  307. if err != nil {
  308. return err
  309. }
  310. go d.coapReportStatus(conn)
  311. <-make(chan int)
  312. defer conn.Close()
  313. return nil
  314. }
  315. func (d *Device) doMQTTAccess() error {
  316. logger := log.New(os.Stdout, "", log.LstdFlags)
  317. MQTT.ERROR = logger
  318. MQTT.CRITICAL = logger
  319. MQTT.WARN = logger
  320. MQTT.DEBUG = logger
  321. //create a ClientOptions struct setting the broker address, clientid, turn
  322. //off trace output and set the default message handler
  323. opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
  324. clientid := fmt.Sprintf("%x", d.id)
  325. opts.SetClientID(clientid)
  326. opts.SetUsername(clientid) // clientid as username
  327. opts.SetPassword(hex.EncodeToString(d.token))
  328. opts.SetKeepAlive(30 * time.Second)
  329. opts.SetDefaultPublishHandler(d.messageHandler)
  330. //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true})
  331. //create and start a client using the above ClientOptions
  332. c := MQTT.NewClient(opts)
  333. if token := c.Connect(); token.Wait() && token.Error() != nil {
  334. return token.Error()
  335. }
  336. go d.reportStatus(c)
  337. // we just pause here to wait for messages
  338. <-make(chan int)
  339. defer c.Disconnect(250)
  340. return nil
  341. }