device.go 9.6 KB


  1. package main
  2. import (
  3. "encoding/hex"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/encoding/gjson"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "sparrow/pkg/coap"
  12. "sparrow/pkg/klink"
  13. "sparrow/pkg/protocol"
  14. "sparrow/pkg/tlv"
  15. "time"
  16. MQTT "github.com/eclipse/paho.mqtt.golang"
  17. )
  18. const (
  19. commonCmdGetStatus = uint16(65528)
  20. )
  21. // DeviceRegisterArgs device register args
  22. type DeviceRegisterArgs struct {
  23. ProductKey string `json:"product_key" binding:"required"`
  24. DeviceCode string `json:"device_code" binding:"required"`
  25. Version string `json:"version" binding:"required"`
  26. }
  27. // DeviceAuthArgs device authentication args
  28. type DeviceAuthArgs struct {
  29. DeviceId int64 `json:"device_id" binding:"required"`
  30. DeviceSecret string `json:"device_secret" binding:"required"`
  31. Protocol string `json:"protocol" binding:"required"`
  32. }
  33. // Common common response fields
  34. type Common struct {
  35. Code int `json:"code"`
  36. Message string `json:"message"`
  37. }
  38. // DeviceRegisterData device register response data field
  39. type DeviceRegisterData struct {
  40. DeviceId int64 `json:"device_id"`
  41. DeviceSecret string `json:"device_secret"`
  42. DeviceKey string `json:"device_key"`
  43. DeviceIdentifier string `json:"device_identifier"`
  44. }
  45. // DeviceRegisterResponse device register response
  46. type DeviceRegisterResponse struct {
  47. Common
  48. Data DeviceRegisterData `json:"data"`
  49. }
  50. // DeviceAuthData device auth response data field
  51. type DeviceAuthData struct {
  52. AccessToken string `json:"access_token"`
  53. AccessAddr string `json:"access_addr"`
  54. }
  55. // DeviceAuthResponse device auth response
  56. type DeviceAuthResponse struct {
  57. Common
  58. Data DeviceAuthData `json:"data"`
  59. }
  60. // Device a device
  61. type Device struct {
  62. // Url API URL
  63. Url string
  64. // basic info
  65. ProductKey string
  66. DeviceCode string
  67. Version string
  68. Proto string
  69. // private things
  70. id int64
  71. secrect string
  72. token []byte
  73. access string
  74. }
  75. // NewDevice create a device
  76. func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
  77. return &Device{
  78. Url: url,
  79. ProductKey: productkey,
  80. DeviceCode: code,
  81. Version: version,
  82. Proto: proto,
  83. }
  84. }
  85. // DoRegister device register
  86. func (d *Device) DoRegister() error {
  87. args := DeviceRegisterArgs{
  88. ProductKey: d.ProductKey,
  89. DeviceCode: d.DeviceCode,
  90. Version: d.Version,
  91. }
  92. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
  93. request, err := json.Marshal(args)
  94. if err != nil {
  95. return err
  96. }
  97. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  98. if err != nil {
  99. return err
  100. }
  101. response := DeviceRegisterResponse{}
  102. err = json.Unmarshal(jsonresp, &response)
  103. if err != nil {
  104. return err
  105. }
  106. err = CheckHttpsCode(response)
  107. if err != nil {
  108. return err
  109. }
  110. d.id = response.Data.DeviceId
  111. d.secrect = response.Data.DeviceSecret
  112. return nil
  113. }
  114. // DoLogin device log in
  115. func (d *Device) DoLogin() error {
  116. args := DeviceAuthArgs{
  117. DeviceId: d.id,
  118. DeviceSecret: d.secrect,
  119. Protocol: d.Proto,
  120. }
  121. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
  122. request, err := json.Marshal(args)
  123. if err != nil {
  124. return err
  125. }
  126. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  127. if err != nil {
  128. return err
  129. }
  130. response := DeviceAuthResponse{}
  131. err = json.Unmarshal(jsonresp, &response)
  132. if err != nil {
  133. return err
  134. }
  135. err = CheckHttpsCode(response)
  136. if err != nil {
  137. return err
  138. }
  139. // ecode hex
  140. htoken, err := hex.DecodeString(response.Data.AccessToken)
  141. if err != nil {
  142. return err
  143. }
  144. d.token = htoken
  145. d.access = response.Data.AccessAddr
  146. return nil
  147. }
  148. func (d *Device) reportStatus(client MQTT.Client) {
  149. for {
  150. time.Sleep(2 * time.Second)
  151. rand.Seed(time.Now().UnixNano())
  152. r := rand.Intn(100)
  153. dd := &klink.DevSend{
  154. Action: "devSend",
  155. MsgId: 1,
  156. SubDeviceId: "1",
  157. Timestamp: time.Now().Unix(),
  158. DeviceCode: "5566",
  159. Data: &klink.DevSendData{
  160. Cmd: "powerOn",
  161. Params: gjson.New(fmt.Sprintf(`{"power":%d}`, r)),
  162. },
  163. }
  164. payload, err := dd.Marshal()
  165. if err != nil {
  166. continue
  167. }
  168. client.Publish("s", 1, false, payload)
  169. }
  170. }
  171. func (d *Device) reportStatus2(client MQTT.Client) {
  172. for {
  173. time.Sleep(2 * time.Second)
  174. rand.Seed(time.Now().UnixNano())
  175. r := rand.Intn(100)
  176. dd := &klink.DevSend{
  177. Action: "devSend",
  178. MsgId: 1,
  179. SubDeviceId: "1",
  180. Timestamp: time.Now().Unix(),
  181. DeviceCode: "5566",
  182. Data: &klink.DevSendData{
  183. Cmd: "status",
  184. Params: gjson.New(fmt.Sprintf(`{"fanlevel":%d}`, r)),
  185. },
  186. }
  187. payload, err := dd.Marshal()
  188. if err != nil {
  189. continue
  190. }
  191. client.Publish("s", 1, false, payload)
  192. }
  193. }
  194. func (d *Device) coapReportStatus(conn *net.UDPConn) {
  195. fmt.Println("coapReportStatus")
  196. for {
  197. payloadHead := protocol.DataHead{
  198. Flag: 0,
  199. Timestamp: uint64(time.Now().Unix() * 1000),
  200. }
  201. param := []interface{}{int64(10)}
  202. params, err := tlv.MakeTLVs(param)
  203. if err != nil {
  204. fmt.Println(err)
  205. return
  206. }
  207. sub := protocol.SubData{
  208. Head: protocol.SubDataHead{
  209. SubDeviceid: uint16(1),
  210. PropertyNum: uint16(1),
  211. ParamsCount: uint16(len(params)),
  212. },
  213. Params: params,
  214. }
  215. status := protocol.Data{
  216. Head: payloadHead,
  217. SubData: []protocol.SubData{},
  218. }
  219. status.SubData = append(status.SubData, sub)
  220. payload, err := status.Marshal()
  221. if err != nil {
  222. fmt.Println(err)
  223. return
  224. }
  225. req := &coap.BaseMessage{
  226. Code: coap.POST,
  227. Type: coap.CON,
  228. Token: d.token,
  229. MessageID: 2,
  230. Payload: payload,
  231. }
  232. req.SetPathString(fmt.Sprintf("%d/s", d.id))
  233. reqbytes, _ := req.Encode()
  234. conn.Write(reqbytes)
  235. fmt.Println("write end")
  236. time.Sleep(10 * time.Second)
  237. }
  238. }
  239. func (d *Device) reportEvent(client MQTT.Client) {
  240. for {
  241. time.Sleep(2 * time.Second)
  242. dd := &klink.DevSend{
  243. Action: "devSend",
  244. MsgId: 1,
  245. SubDeviceId: "1",
  246. Timestamp: time.Now().Unix(),
  247. DeviceCode: "5566",
  248. Data: &klink.DevSendData{
  249. Cmd: "temperatureEvent",
  250. Params: gjson.New(`{"temperature":100, "reason": 0}`),
  251. },
  252. }
  253. payload, err := dd.Marshal()
  254. if err != nil {
  255. continue
  256. }
  257. client.Publish("e", 1, false, payload)
  258. }
  259. }
  260. func (d *Device) statusHandler(client MQTT.Client, msg MQTT.Message) {
  261. status := protocol.Data{}
  262. err := status.UnMarshal(msg.Payload())
  263. if err != nil {
  264. fmt.Println(err)
  265. return
  266. }
  267. fmt.Println("device receiving status set : ")
  268. for _, one := range status.SubData {
  269. fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
  270. fmt.Println("no : ", one.Head.PropertyNum)
  271. fmt.Println("params : ", one.Params)
  272. }
  273. }
  274. // 子设备上线
  275. func (d *Device) subDeviceLogin(client MQTT.Client) {
  276. for {
  277. dd := &klink.DevLogin{
  278. Action: "devLogin",
  279. MsgId: 1,
  280. SubDeviceId: "5566-0",
  281. Timestamp: time.Now().Unix(),
  282. DeviceCode: "5566",
  283. }
  284. payload, err := dd.Marshal()
  285. if err != nil {
  286. return
  287. }
  288. client.Publish("s", 1, false, payload)
  289. time.Sleep(3 * time.Second)
  290. }
  291. }
  292. // 子设备上线
  293. func (d *Device) deviceNetConfig(client MQTT.Client) {
  294. j := `{
  295. "action": "devNetConfig",
  296. "timestamp": 12312312,
  297. "deviceCode":"deviceCode",
  298. "md5": "0FE264CC7DC9AFFD7E61E4ABCD1F3354"
  299. }`
  300. jj := gjson.New(j)
  301. client.Publish("s", 1, false, jj.MustToJson())
  302. }
  303. func (d *Device) commandHandler(client MQTT.Client, msg MQTT.Message) {
  304. j, err := gjson.DecodeToJson(msg.Payload())
  305. if err != nil {
  306. panic("错误的报文格式")
  307. }
  308. fmt.Printf("%v", j.MustToJsonString())
  309. }
  310. func (d *Device) messageHandler(client MQTT.Client, msg MQTT.Message) {
  311. fmt.Printf("TOPIC: %s\n", msg.Topic())
  312. fmt.Printf("MSG: %x\n", msg.Payload())
  313. msgtype := msg.Topic()
  314. fmt.Println(msgtype)
  315. switch msgtype {
  316. case "c":
  317. d.commandHandler(client, msg)
  318. case "s":
  319. d.statusHandler(client, msg)
  320. default:
  321. fmt.Println("unsuported message type :", msgtype)
  322. }
  323. }
  324. // DoAccess device access
  325. func (d *Device) DoAccess() error {
  326. if d.Proto == "mqtt" {
  327. if err := d.doMQTTAccess(); err != nil {
  328. fmt.Printf("do mqtt access error:%s", err.Error())
  329. }
  330. } else if d.Proto == "coap" {
  331. if err := d.doCoAPAccess(); err != nil {
  332. fmt.Printf("do coap access error:%s", err.Error())
  333. }
  334. }
  335. return nil
  336. }
  337. func (d *Device) doCoAPAccess() error {
  338. fmt.Printf("get access addr :%s", d.access)
  339. addr, err := net.ResolveUDPAddr("udp", d.access)
  340. if err != nil {
  341. return err
  342. }
  343. conn, err := net.DialUDP("udp", nil, addr)
  344. if err != nil {
  345. return err
  346. }
  347. go d.coapReportStatus(conn)
  348. <-make(chan int)
  349. defer conn.Close()
  350. return nil
  351. }
  352. func (d *Device) doMQTTAccess() error {
  353. logger := log.New(os.Stdout, "", log.LstdFlags)
  354. //MQTT.ERROR = logger
  355. //MQTT.CRITICAL = logger
  356. //MQTT.WARN = logger
  357. MQTT.DEBUG = logger
  358. //create a ClientOptions struct setting the broker address, clientid, turn
  359. //off trace output and set the default message handler
  360. opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
  361. clientid := fmt.Sprintf("%x", d.id)
  362. opts.SetClientID(clientid)
  363. opts.SetUsername(clientid) // clientid as username
  364. opts.SetPassword(hex.EncodeToString(d.token))
  365. opts.SetKeepAlive(30 * time.Second)
  366. //// process key files
  367. //cert, err := tls.LoadX509KeyPair(*confCAFile, *confKeyFile)
  368. //if err != nil {
  369. // panic(err)
  370. //}
  371. //opts.SetTLSConfig(&tls.Config{Certificates: []tls.Certificate{cert}, InsecureSkipVerify: true})
  372. opts.SetDefaultPublishHandler(d.messageHandler)
  373. //create and start a client using the above ClientOptions
  374. c := MQTT.NewClient(opts)
  375. go func() {
  376. if token := c.Connect(); token.Wait() && token.Error() != nil {
  377. return
  378. }
  379. }()
  380. // 子设备上线
  381. //d.deviceNetConfig(c)
  382. //go d.subDeviceLogin(c)
  383. //go d.reportStatus(c)
  384. //go d.reportStatus2(c)
  385. go d.reportEvent(c)
  386. // we just pause here to wait for messages
  387. <-make(chan int)
  388. defer c.Disconnect(250)
  389. return nil
  390. }