device.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. package main
  2. import (
  3. "encoding/hex"
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "os"
  8. "sparrow/pkg/protocol"
  9. "sparrow/pkg/tlv"
  10. "time"
  11. MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
  12. )
  13. const (
  14. commonCmdGetStatus = uint16(65528)
  15. )
  16. // device register args
  17. type DeviceRegisterArgs struct {
  18. ProductKey string `json:"product_key" binding:"required"`
  19. DeviceCode string `json:"device_code" binding:"required"`
  20. Version string `json:"version" binding:"required"`
  21. }
  22. // device authentication args
  23. type DeviceAuthArgs struct {
  24. DeviceId int64 `json:"device_id" binding:"required"`
  25. DeviceSecret string `json:"device_secret" binding:"required"`
  26. Protocol string `json:"protocol" binding:"required"`
  27. }
  28. // common response fields
  29. type Common struct {
  30. Code int `json:"code"`
  31. Message string `json:"message"`
  32. }
  33. // device register response data field
  34. type DeviceRegisterData struct {
  35. DeviceId int64 `json:"device_id"`
  36. DeviceSecret string `json:"device_secret"`
  37. DeviceKey string `json:"device_key"`
  38. DeviceIdentifier string `json:"device_identifier"`
  39. }
  40. // device register response
  41. type DeviceRegisterResponse struct {
  42. Common
  43. Data DeviceRegisterData `json:"data"`
  44. }
  45. // device auth response data field
  46. type DeviceAuthData struct {
  47. AccessToken string `json:"access_token"`
  48. AccessAddr string `json:"access_addr"`
  49. }
  50. // device auth response
  51. type DeviceAuthResponse struct {
  52. Common
  53. Data DeviceAuthData `json:"data"`
  54. }
  55. type Device struct {
  56. // API URL
  57. Url string
  58. // basic info
  59. ProductKey string
  60. DeviceCode string
  61. Version string
  62. // private things
  63. id int64
  64. secrect string
  65. token []byte
  66. access string
  67. }
  68. func NewDevice(url string, productkey string, code string, version string) *Device {
  69. return &Device{
  70. Url: url,
  71. ProductKey: productkey,
  72. DeviceCode: code,
  73. Version: version,
  74. }
  75. }
  76. func (d *Device) DoRegister() error {
  77. args := DeviceRegisterArgs{
  78. ProductKey: d.ProductKey,
  79. DeviceCode: d.DeviceCode,
  80. Version: d.Version,
  81. }
  82. regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
  83. request, err := json.Marshal(args)
  84. if err != nil {
  85. return err
  86. }
  87. jsonresp, err := SendHttpRequest(regUrl, string(request), "POST", nil)
  88. if err != nil {
  89. return err
  90. }
  91. response := DeviceRegisterResponse{}
  92. err = json.Unmarshal(jsonresp, &response)
  93. if err != nil {
  94. return err
  95. }
  96. err = CheckHttpsCode(response)
  97. if err != nil {
  98. return err
  99. }
  100. d.id = response.Data.DeviceId
  101. d.secrect = response.Data.DeviceSecret
  102. return nil
  103. }
  104. func (d *Device) DoLogin() error {
  105. args := DeviceAuthArgs{
  106. DeviceId: d.id,
  107. DeviceSecret: d.secrect,
  108. Protocol: "mqtt",
  109. }
  110. regUrl := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
  111. request, err := json.Marshal(args)
  112. if err != nil {
  113. return err
  114. }
  115. jsonresp, err := SendHttpRequest(regUrl, string(request), "POST", nil)
  116. if err != nil {
  117. return err
  118. }
  119. response := DeviceAuthResponse{}
  120. err = json.Unmarshal(jsonresp, &response)
  121. if err != nil {
  122. return err
  123. }
  124. err = CheckHttpsCode(response)
  125. if err != nil {
  126. return err
  127. }
  128. // ecode hex
  129. htoken, err := hex.DecodeString(response.Data.AccessToken)
  130. if err != nil {
  131. return err
  132. }
  133. d.token = htoken
  134. d.access = response.Data.AccessAddr
  135. return nil
  136. }
  137. func (d *Device) reportStatus(client *MQTT.Client) {
  138. for {
  139. time.Sleep(10 * time.Second)
  140. payloadHead := protocol.DataHead{
  141. Flag: 0,
  142. Timestamp: uint64(time.Now().Unix() * 1000),
  143. }
  144. param := []interface{}{"li jian"}
  145. params, err := tlv.MakeTLVs(param)
  146. if err != nil {
  147. fmt.Println(err)
  148. return
  149. }
  150. sub := protocol.SubData{
  151. Head: protocol.SubDataHead{
  152. SubDeviceid: uint16(1),
  153. PropertyNum: uint16(1),
  154. ParamsCount: uint16(len(params)),
  155. },
  156. Params: params,
  157. }
  158. status := protocol.Data{
  159. Head: payloadHead,
  160. SubData: []protocol.SubData{},
  161. }
  162. status.SubData = append(status.SubData, sub)
  163. payload, err := status.Marshal()
  164. if err != nil {
  165. fmt.Println(err)
  166. return
  167. }
  168. client.Publish("s", 1, false, payload)
  169. }
  170. }
  171. func (d *Device) reportEvent(client *MQTT.Client) {
  172. for {
  173. time.Sleep(3 * time.Second)
  174. event := protocol.Event{}
  175. params, err := tlv.MakeTLVs([]interface{}{"hello event."})
  176. if err != nil {
  177. fmt.Println(err)
  178. return
  179. }
  180. event.Params = params
  181. event.Head.No = 1
  182. event.Head.SubDeviceid = 1
  183. event.Head.ParamsCount = uint16(len(params))
  184. payload, err := event.Marshal()
  185. if err != nil {
  186. fmt.Println(err)
  187. return
  188. }
  189. client.Publish("e", 1, false, payload)
  190. }
  191. }
  192. func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
  193. status := protocol.Data{}
  194. err := status.UnMarshal(msg.Payload())
  195. if err != nil {
  196. fmt.Println(err)
  197. return
  198. }
  199. fmt.Println("device receiving status set : ")
  200. for _, one := range status.SubData {
  201. fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
  202. fmt.Println("no : ", one.Head.PropertyNum)
  203. fmt.Println("params : ", one.Params)
  204. }
  205. }
  206. func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
  207. cmd := protocol.Command{}
  208. err := cmd.UnMarshal(msg.Payload())
  209. if err != nil {
  210. fmt.Println(err)
  211. return
  212. }
  213. switch cmd.Head.No {
  214. case commonCmdGetStatus:
  215. d.reportStatus(client)
  216. default:
  217. fmt.Printf("received command : %v: %v", cmd.Head.No, cmd.Params)
  218. }
  219. }
  220. func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
  221. fmt.Printf("TOPIC: %s\n", msg.Topic())
  222. fmt.Printf("MSG: %x\n", msg.Payload())
  223. msgtype := msg.Topic()
  224. fmt.Println(msgtype)
  225. switch msgtype {
  226. case "c":
  227. d.commandHandler(client, msg)
  228. case "s":
  229. d.statusHandler(client, msg)
  230. default:
  231. fmt.Println("unsuported message type :", msgtype)
  232. }
  233. }
  234. func (d *Device) DoAccess() error {
  235. logger := log.New(os.Stdout, "", log.LstdFlags)
  236. MQTT.ERROR = logger
  237. MQTT.CRITICAL = logger
  238. MQTT.WARN = logger
  239. MQTT.DEBUG = logger
  240. //create a ClientOptions struct setting the broker address, clientid, turn
  241. //off trace output and set the default message handler
  242. opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
  243. clientid := fmt.Sprintf("%x", d.id)
  244. opts.SetClientID(clientid)
  245. opts.SetUsername(clientid) // clientid as username
  246. opts.SetPassword(hex.EncodeToString(d.token))
  247. opts.SetKeepAlive(30 * time.Second)
  248. opts.SetDefaultPublishHandler(d.messageHandler)
  249. //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true})
  250. //create and start a client using the above ClientOptions
  251. c := MQTT.NewClient(opts)
  252. if token := c.Connect(); token.Wait() && token.Error() != nil {
  253. return token.Error()
  254. }
  255. // beigin report event test
  256. //go d.reportEvent(c)
  257. go d.reportStatus(c)
  258. // we just pause here to wait for messages
  259. <-make(chan int)
  260. defer c.Disconnect(250)
  261. return nil
  262. }