device.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. package main
  2. import (
  3. "encoding/hex"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net"
  8. "os"
  9. "sparrow/pkg/coap"
  10. "sparrow/pkg/protocol"
  11. "sparrow/pkg/server"
  12. "sparrow/pkg/tlv"
  13. "time"
  14. MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
  15. )
  16. const (
  17. commonCmdGetStatus = uint16(65528)
  18. )
  19. // DeviceRegisterArgs device register args
  20. type DeviceRegisterArgs struct {
  21. ProductKey string `json:"product_key" binding:"required"`
  22. DeviceCode string `json:"device_code" binding:"required"`
  23. Version string `json:"version" binding:"required"`
  24. }
  25. // DeviceAuthArgs device authentication args
  26. type DeviceAuthArgs struct {
  27. DeviceId int64 `json:"device_id" binding:"required"`
  28. DeviceSecret string `json:"device_secret" binding:"required"`
  29. Protocol string `json:"protocol" binding:"required"`
  30. }
  31. // Common common response fields
  32. type Common struct {
  33. Code int `json:"code"`
  34. Message string `json:"message"`
  35. }
  36. // DeviceRegisterData device register response data field
  37. type DeviceRegisterData struct {
  38. DeviceId int64 `json:"device_id"`
  39. DeviceSecret string `json:"device_secret"`
  40. DeviceKey string `json:"device_key"`
  41. DeviceIdentifier string `json:"device_identifier"`
  42. }
  43. // DeviceRegisterResponse device register response
  44. type DeviceRegisterResponse struct {
  45. Common
  46. Data DeviceRegisterData `json:"data"`
  47. }
  48. // DeviceAuthData device auth response data field
  49. type DeviceAuthData struct {
  50. AccessToken string `json:"access_token"`
  51. AccessAddr string `json:"access_addr"`
  52. }
  53. // DeviceAuthResponse device auth response
  54. type DeviceAuthResponse struct {
  55. Common
  56. Data DeviceAuthData `json:"data"`
  57. }
  58. // Device a device
  59. type Device struct {
  60. // Url API URL
  61. Url string
  62. // basic info
  63. ProductKey string
  64. DeviceCode string
  65. Version string
  66. Proto string
  67. // private things
  68. id int64
  69. secrect string
  70. token []byte
  71. access string
  72. }
  73. // NewDevice create a device
  74. func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
  75. return &Device{
  76. Url: url,
  77. ProductKey: productkey,
  78. DeviceCode: code,
  79. Version: version,
  80. Proto: proto,
  81. }
  82. }
  83. // DoRegister device register
  84. func (d *Device) DoRegister() error {
  85. args := DeviceRegisterArgs{
  86. ProductKey: d.ProductKey,
  87. DeviceCode: d.DeviceCode,
  88. Version: d.Version,
  89. }
  90. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
  91. request, err := json.Marshal(args)
  92. if err != nil {
  93. return err
  94. }
  95. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  96. if err != nil {
  97. return err
  98. }
  99. response := DeviceRegisterResponse{}
  100. err = json.Unmarshal(jsonresp, &response)
  101. if err != nil {
  102. return err
  103. }
  104. err = CheckHttpsCode(response)
  105. if err != nil {
  106. return err
  107. }
  108. d.id = response.Data.DeviceId
  109. d.secrect = response.Data.DeviceSecret
  110. return nil
  111. }
  112. // DoLogin device log in
  113. func (d *Device) DoLogin() error {
  114. args := DeviceAuthArgs{
  115. DeviceId: d.id,
  116. DeviceSecret: d.secrect,
  117. Protocol: d.Proto,
  118. }
  119. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
  120. request, err := json.Marshal(args)
  121. if err != nil {
  122. return err
  123. }
  124. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  125. if err != nil {
  126. return err
  127. }
  128. response := DeviceAuthResponse{}
  129. err = json.Unmarshal(jsonresp, &response)
  130. if err != nil {
  131. return err
  132. }
  133. err = CheckHttpsCode(response)
  134. if err != nil {
  135. return err
  136. }
  137. // ecode hex
  138. htoken, err := hex.DecodeString(response.Data.AccessToken)
  139. if err != nil {
  140. return err
  141. }
  142. d.token = htoken
  143. d.access = response.Data.AccessAddr
  144. return nil
  145. }
  146. func (d *Device) reportStatus(client *MQTT.Client) {
  147. for {
  148. time.Sleep(2 * time.Second)
  149. payloadHead := protocol.DataHead{
  150. Flag: 0,
  151. Timestamp: uint64(time.Now().Unix() * 1000),
  152. }
  153. param := []interface{}{"lijian"}
  154. params, err := tlv.MakeTLVs(param)
  155. if err != nil {
  156. fmt.Println(err)
  157. return
  158. }
  159. sub := protocol.SubData{
  160. Head: protocol.SubDataHead{
  161. SubDeviceid: uint16(225),
  162. PropertyNum: uint16(1),
  163. ParamsCount: uint16(len(params)),
  164. ExternalDeviceId: [8]byte{0x01},
  165. },
  166. Params: params,
  167. }
  168. status := protocol.Data{
  169. Head: payloadHead,
  170. SubData: []protocol.SubData{},
  171. }
  172. status.SubData = append(status.SubData, sub)
  173. payload, err := status.Marshal()
  174. if err != nil {
  175. fmt.Println(err)
  176. return
  177. }
  178. client.Publish("s", 1, false, payload)
  179. }
  180. }
  181. func (d *Device) coapReportStatus(conn *net.UDPConn) {
  182. fmt.Println("coapReportStatus")
  183. for {
  184. payloadHead := protocol.DataHead{
  185. Flag: 0,
  186. Timestamp: uint64(time.Now().Unix() * 1000),
  187. }
  188. param := []interface{}{int64(10)}
  189. params, err := tlv.MakeTLVs(param)
  190. if err != nil {
  191. fmt.Println(err)
  192. return
  193. }
  194. sub := protocol.SubData{
  195. Head: protocol.SubDataHead{
  196. SubDeviceid: uint16(1),
  197. PropertyNum: uint16(1),
  198. ParamsCount: uint16(len(params)),
  199. },
  200. Params: params,
  201. }
  202. status := protocol.Data{
  203. Head: payloadHead,
  204. SubData: []protocol.SubData{},
  205. }
  206. status.SubData = append(status.SubData, sub)
  207. payload, err := status.Marshal()
  208. if err != nil {
  209. fmt.Println(err)
  210. return
  211. }
  212. req := &coap.BaseMessage{
  213. Code: coap.POST,
  214. Type: coap.CON,
  215. Token: d.token,
  216. MessageID: 2,
  217. Payload: payload,
  218. }
  219. req.SetPathString(fmt.Sprintf("%d/s", d.id))
  220. reqbytes, _ := req.Encode()
  221. conn.Write(reqbytes)
  222. fmt.Println("write end")
  223. time.Sleep(10 * time.Second)
  224. }
  225. }
  226. func (d *Device) reportEvent(client *MQTT.Client) {
  227. for {
  228. time.Sleep(3 * time.Second)
  229. event := protocol.Event{}
  230. params, err := tlv.MakeTLVs([]interface{}{"hello event."})
  231. if err != nil {
  232. fmt.Println(err)
  233. return
  234. }
  235. event.Params = params
  236. event.Head.No = 1
  237. event.Head.SubDeviceid = 1
  238. event.Head.ParamsCount = uint16(len(params))
  239. payload, err := event.Marshal()
  240. if err != nil {
  241. fmt.Println(err)
  242. return
  243. }
  244. client.Publish("e", 1, false, payload)
  245. }
  246. }
  247. func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
  248. status := protocol.Data{}
  249. err := status.UnMarshal(msg.Payload())
  250. if err != nil {
  251. fmt.Println(err)
  252. return
  253. }
  254. fmt.Println("device receiving status set : ")
  255. for _, one := range status.SubData {
  256. fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
  257. fmt.Println("no : ", one.Head.PropertyNum)
  258. fmt.Println("params : ", one.Params)
  259. }
  260. }
  261. func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
  262. cmd := protocol.Command{}
  263. err := cmd.UnMarshal(msg.Payload())
  264. if err != nil {
  265. fmt.Println(err)
  266. return
  267. }
  268. switch cmd.Head.No {
  269. case commonCmdGetStatus:
  270. d.reportStatus(client)
  271. default:
  272. fmt.Printf("received command : %v: %v", cmd.Head.No, cmd.Params)
  273. }
  274. }
  275. func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
  276. fmt.Printf("TOPIC: %s\n", msg.Topic())
  277. fmt.Printf("MSG: %x\n", msg.Payload())
  278. msgtype := msg.Topic()
  279. fmt.Println(msgtype)
  280. switch msgtype {
  281. case "c":
  282. d.commandHandler(client, msg)
  283. case "s":
  284. d.statusHandler(client, msg)
  285. default:
  286. fmt.Println("unsuported message type :", msgtype)
  287. }
  288. }
  289. // DoAccess device access
  290. func (d *Device) DoAccess() error {
  291. if d.Proto == "mqtt" {
  292. if err := d.doMQTTAccess(); err != nil {
  293. fmt.Printf("do mqtt access error:%s", err.Error())
  294. }
  295. } else if d.Proto == "coap" {
  296. if err := d.doCoAPAccess(); err != nil {
  297. fmt.Printf("do coap access error:%s", err.Error())
  298. }
  299. }
  300. return nil
  301. }
  302. func (d *Device) doCoAPAccess() error {
  303. fmt.Printf("get access addr :%s", d.access)
  304. addr, err := net.ResolveUDPAddr("udp", d.access)
  305. if err != nil {
  306. return err
  307. }
  308. conn, err := net.DialUDP("udp", nil, addr)
  309. if err != nil {
  310. return err
  311. }
  312. go d.coapReportStatus(conn)
  313. <-make(chan int)
  314. defer conn.Close()
  315. return nil
  316. }
  317. func (d *Device) doMQTTAccess() error {
  318. logger := log.New(os.Stdout, "", log.LstdFlags)
  319. MQTT.ERROR = logger
  320. MQTT.CRITICAL = logger
  321. MQTT.WARN = logger
  322. MQTT.DEBUG = logger
  323. //create a ClientOptions struct setting the broker address, clientid, turn
  324. //off trace output and set the default message handler
  325. opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
  326. clientid := fmt.Sprintf("%x", d.id)
  327. opts.SetClientID(clientid)
  328. opts.SetUsername(clientid) // clientid as username
  329. opts.SetPassword(hex.EncodeToString(d.token))
  330. opts.SetKeepAlive(30 * time.Second)
  331. opts.SetDefaultPublishHandler(d.messageHandler)
  332. //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true})
  333. //create and start a client using the above ClientOptions
  334. c := MQTT.NewClient(opts)
  335. go func() {
  336. if token := c.Connect(); token.Wait() && token.Error() != nil {
  337. server.Log.Error(token.Error())
  338. return
  339. }
  340. }()
  341. go d.reportStatus(c)
  342. // we just pause here to wait for messages
  343. <-make(chan int)
  344. defer c.Disconnect(250)
  345. return nil
  346. }