device.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. package main
  2. import (
  3. "encoding/hex"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/gogf/gf/encoding/gjson"
  7. "log"
  8. "math/rand"
  9. "net"
  10. "os"
  11. "sparrow/pkg/coap"
  12. "sparrow/pkg/klink"
  13. "sparrow/pkg/protocol"
  14. "sparrow/pkg/server"
  15. "sparrow/pkg/tlv"
  16. "time"
  17. MQTT "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git"
  18. )
  19. const (
  20. commonCmdGetStatus = uint16(65528)
  21. )
  22. // DeviceRegisterArgs device register args
  23. type DeviceRegisterArgs struct {
  24. ProductKey string `json:"product_key" binding:"required"`
  25. DeviceCode string `json:"device_code" binding:"required"`
  26. Version string `json:"version" binding:"required"`
  27. }
  28. // DeviceAuthArgs device authentication args
  29. type DeviceAuthArgs struct {
  30. DeviceId int64 `json:"device_id" binding:"required"`
  31. DeviceSecret string `json:"device_secret" binding:"required"`
  32. Protocol string `json:"protocol" binding:"required"`
  33. }
  34. // Common common response fields
  35. type Common struct {
  36. Code int `json:"code"`
  37. Message string `json:"message"`
  38. }
  39. // DeviceRegisterData device register response data field
  40. type DeviceRegisterData struct {
  41. DeviceId int64 `json:"device_id"`
  42. DeviceSecret string `json:"device_secret"`
  43. DeviceKey string `json:"device_key"`
  44. DeviceIdentifier string `json:"device_identifier"`
  45. }
  46. // DeviceRegisterResponse device register response
  47. type DeviceRegisterResponse struct {
  48. Common
  49. Data DeviceRegisterData `json:"data"`
  50. }
  51. // DeviceAuthData device auth response data field
  52. type DeviceAuthData struct {
  53. AccessToken string `json:"access_token"`
  54. AccessAddr string `json:"access_addr"`
  55. }
  56. // DeviceAuthResponse device auth response
  57. type DeviceAuthResponse struct {
  58. Common
  59. Data DeviceAuthData `json:"data"`
  60. }
  61. // Device a device
  62. type Device struct {
  63. // Url API URL
  64. Url string
  65. // basic info
  66. ProductKey string
  67. DeviceCode string
  68. Version string
  69. Proto string
  70. // private things
  71. id int64
  72. secrect string
  73. token []byte
  74. access string
  75. }
  76. // NewDevice create a device
  77. func NewDevice(url string, productkey string, code string, version string, proto string) *Device {
  78. return &Device{
  79. Url: url,
  80. ProductKey: productkey,
  81. DeviceCode: code,
  82. Version: version,
  83. Proto: proto,
  84. }
  85. }
  86. // DoRegister device register
  87. func (d *Device) DoRegister() error {
  88. args := DeviceRegisterArgs{
  89. ProductKey: d.ProductKey,
  90. DeviceCode: d.DeviceCode,
  91. Version: d.Version,
  92. }
  93. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/registration")
  94. request, err := json.Marshal(args)
  95. if err != nil {
  96. return err
  97. }
  98. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  99. if err != nil {
  100. return err
  101. }
  102. response := DeviceRegisterResponse{}
  103. err = json.Unmarshal(jsonresp, &response)
  104. if err != nil {
  105. return err
  106. }
  107. err = CheckHttpsCode(response)
  108. if err != nil {
  109. return err
  110. }
  111. d.id = response.Data.DeviceId
  112. d.secrect = response.Data.DeviceSecret
  113. return nil
  114. }
  115. // DoLogin device log in
  116. func (d *Device) DoLogin() error {
  117. args := DeviceAuthArgs{
  118. DeviceId: d.id,
  119. DeviceSecret: d.secrect,
  120. Protocol: d.Proto,
  121. }
  122. regURL := fmt.Sprintf("%v%v", d.Url, "/v1/devices/authentication")
  123. request, err := json.Marshal(args)
  124. if err != nil {
  125. return err
  126. }
  127. jsonresp, err := SendHttpRequest(regURL, string(request), "POST", nil)
  128. if err != nil {
  129. return err
  130. }
  131. response := DeviceAuthResponse{}
  132. err = json.Unmarshal(jsonresp, &response)
  133. if err != nil {
  134. return err
  135. }
  136. err = CheckHttpsCode(response)
  137. if err != nil {
  138. return err
  139. }
  140. // ecode hex
  141. htoken, err := hex.DecodeString(response.Data.AccessToken)
  142. if err != nil {
  143. return err
  144. }
  145. d.token = htoken
  146. d.access = response.Data.AccessAddr
  147. return nil
  148. }
  149. func (d *Device) reportStatus(client *MQTT.Client) {
  150. for {
  151. time.Sleep(2 * time.Second)
  152. rand.Seed(time.Now().UnixNano())
  153. r := rand.Intn(100)
  154. dd := &klink.DevSend{
  155. Action: "devSend",
  156. MsgId: 1,
  157. SubDeviceId: "1",
  158. Timestamp: time.Now().Unix(),
  159. DeviceCode: "5566",
  160. Data: &klink.DevSendData{
  161. Cmd: "powerOn",
  162. Params: gjson.New(fmt.Sprintf(`{"power":%d}`, r)),
  163. },
  164. }
  165. payload, err := dd.Marshal()
  166. if err != nil {
  167. continue
  168. }
  169. client.Publish("s", 1, false, payload)
  170. }
  171. }
  172. func (d *Device) reportStatus2(client *MQTT.Client) {
  173. for {
  174. time.Sleep(2 * time.Second)
  175. rand.Seed(time.Now().UnixNano())
  176. r := rand.Intn(100)
  177. dd := &klink.DevSend{
  178. Action: "devSend",
  179. MsgId: 1,
  180. SubDeviceId: "1",
  181. Timestamp: time.Now().Unix(),
  182. DeviceCode: "5566",
  183. Data: &klink.DevSendData{
  184. Cmd: "status",
  185. Params: gjson.New(fmt.Sprintf(`{"fanlevel":%d}`, r)),
  186. },
  187. }
  188. payload, err := dd.Marshal()
  189. if err != nil {
  190. continue
  191. }
  192. client.Publish("s", 1, false, payload)
  193. }
  194. }
  195. func (d *Device) coapReportStatus(conn *net.UDPConn) {
  196. fmt.Println("coapReportStatus")
  197. for {
  198. payloadHead := protocol.DataHead{
  199. Flag: 0,
  200. Timestamp: uint64(time.Now().Unix() * 1000),
  201. }
  202. param := []interface{}{int64(10)}
  203. params, err := tlv.MakeTLVs(param)
  204. if err != nil {
  205. fmt.Println(err)
  206. return
  207. }
  208. sub := protocol.SubData{
  209. Head: protocol.SubDataHead{
  210. SubDeviceid: uint16(1),
  211. PropertyNum: uint16(1),
  212. ParamsCount: uint16(len(params)),
  213. },
  214. Params: params,
  215. }
  216. status := protocol.Data{
  217. Head: payloadHead,
  218. SubData: []protocol.SubData{},
  219. }
  220. status.SubData = append(status.SubData, sub)
  221. payload, err := status.Marshal()
  222. if err != nil {
  223. fmt.Println(err)
  224. return
  225. }
  226. req := &coap.BaseMessage{
  227. Code: coap.POST,
  228. Type: coap.CON,
  229. Token: d.token,
  230. MessageID: 2,
  231. Payload: payload,
  232. }
  233. req.SetPathString(fmt.Sprintf("%d/s", d.id))
  234. reqbytes, _ := req.Encode()
  235. conn.Write(reqbytes)
  236. fmt.Println("write end")
  237. time.Sleep(10 * time.Second)
  238. }
  239. }
  240. func (d *Device) reportEvent(client *MQTT.Client) {
  241. for {
  242. time.Sleep(2 * time.Second)
  243. dd := &klink.DevSend{
  244. Action: "devSend",
  245. MsgId: 1,
  246. SubDeviceId: "1",
  247. Timestamp: time.Now().Unix(),
  248. DeviceCode: "5566",
  249. Data: &klink.DevSendData{
  250. Cmd: "temperatureEvent",
  251. Params: gjson.New(`{"temperature":100, "reason": 0}`),
  252. },
  253. }
  254. payload, err := dd.Marshal()
  255. if err != nil {
  256. continue
  257. }
  258. client.Publish("e", 1, false, payload)
  259. }
  260. }
  261. func (d *Device) statusHandler(client *MQTT.Client, msg MQTT.Message) {
  262. status := protocol.Data{}
  263. err := status.UnMarshal(msg.Payload())
  264. if err != nil {
  265. fmt.Println(err)
  266. return
  267. }
  268. fmt.Println("device receiving status set : ")
  269. for _, one := range status.SubData {
  270. fmt.Println("subdeviceid : ", one.Head.SubDeviceid)
  271. fmt.Println("no : ", one.Head.PropertyNum)
  272. fmt.Println("params : ", one.Params)
  273. }
  274. }
  275. func (d *Device) commandHandler(client *MQTT.Client, msg MQTT.Message) {
  276. j, err := gjson.DecodeToJson(msg.Payload())
  277. if err != nil {
  278. panic("错误的报文格式")
  279. }
  280. fmt.Printf("%v", j.MustToJsonString())
  281. }
  282. func (d *Device) messageHandler(client *MQTT.Client, msg MQTT.Message) {
  283. fmt.Printf("TOPIC: %s\n", msg.Topic())
  284. fmt.Printf("MSG: %x\n", msg.Payload())
  285. msgtype := msg.Topic()
  286. fmt.Println(msgtype)
  287. switch msgtype {
  288. case "c":
  289. d.commandHandler(client, msg)
  290. case "s":
  291. d.statusHandler(client, msg)
  292. default:
  293. fmt.Println("unsuported message type :", msgtype)
  294. }
  295. }
  296. // DoAccess device access
  297. func (d *Device) DoAccess() error {
  298. if d.Proto == "mqtt" {
  299. if err := d.doMQTTAccess(); err != nil {
  300. fmt.Printf("do mqtt access error:%s", err.Error())
  301. }
  302. } else if d.Proto == "coap" {
  303. if err := d.doCoAPAccess(); err != nil {
  304. fmt.Printf("do coap access error:%s", err.Error())
  305. }
  306. }
  307. return nil
  308. }
  309. func (d *Device) doCoAPAccess() error {
  310. fmt.Printf("get access addr :%s", d.access)
  311. addr, err := net.ResolveUDPAddr("udp", d.access)
  312. if err != nil {
  313. return err
  314. }
  315. conn, err := net.DialUDP("udp", nil, addr)
  316. if err != nil {
  317. return err
  318. }
  319. go d.coapReportStatus(conn)
  320. <-make(chan int)
  321. defer conn.Close()
  322. return nil
  323. }
  324. func (d *Device) doMQTTAccess() error {
  325. logger := log.New(os.Stdout, "", log.LstdFlags)
  326. MQTT.ERROR = logger
  327. MQTT.CRITICAL = logger
  328. MQTT.WARN = logger
  329. MQTT.DEBUG = logger
  330. //create a ClientOptions struct setting the broker address, clientid, turn
  331. //off trace output and set the default message handler
  332. opts := MQTT.NewClientOptions().AddBroker("tcp://" + d.access)
  333. clientid := fmt.Sprintf("%x", d.id)
  334. opts.SetClientID(clientid)
  335. opts.SetUsername(clientid) // clientid as username
  336. opts.SetPassword(hex.EncodeToString(d.token))
  337. opts.SetKeepAlive(30 * time.Second)
  338. opts.SetDefaultPublishHandler(d.messageHandler)
  339. //opts.SetTLSConfig(&tls.Config{Certificates: nil, InsecureSkipVerify: true})
  340. //create and start a client using the above ClientOptions
  341. c := MQTT.NewClient(opts)
  342. go func() {
  343. if token := c.Connect(); token.Wait() && token.Error() != nil {
  344. server.Log.Error(token.Error())
  345. return
  346. }
  347. }()
  348. go d.reportStatus(c)
  349. go d.reportStatus2(c)
  350. go d.reportEvent(c)
  351. // we just pause here to wait for messages
  352. <-make(chan int)
  353. defer c.Disconnect(250)
  354. return nil
  355. }