mqtt.c 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731
  1. /* mqtt.c
  2. * Protocol: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
  3. *
  4. * Copyright (c) 2014-2015, Tuan PM <tuanpm at live dot com>
  5. * All rights reserved.
  6. *
  7. * Redistribution and use in source and binary forms, with or without
  8. * modification, are permitted provided that the following conditions are met:
  9. *
  10. * * Redistributions of source code must retain the above copyright notice,
  11. * this list of conditions and the following disclaimer.
  12. * * Redistributions in binary form must reproduce the above copyright
  13. * notice, this list of conditions and the following disclaimer in the
  14. * documentation and/or other materials provided with the distribution.
  15. * * Neither the name of Redis nor the names of its contributors may be used
  16. * to endorse or promote products derived from this software without
  17. * specific prior written permission.
  18. *
  19. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
  20. * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  21. * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  22. * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
  23. * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
  24. * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
  25. * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
  26. * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
  27. * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
  28. * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
  29. * POSSIBILITY OF SUCH DAMAGE.
  30. */
  31. #include "mqtt_msg.h"
  32. #include "debug.h"
  33. #include "mqtt.h"
  34. #include "queue.h"
  35. #include "pando_sys.h"
  36. #include "pando_types.h"
  37. #include "pando_timer.h"
  38. #include "pando_net_tcp.h"
  39. #include "utils.h"
  40. #include "common_functions.h"
  41. #define MQTT_BUF_SIZE 1024
  42. #define MQTT_RECONNECT_TIMEOUT 5
  43. #define MQTT_CONNECT_TIMEOUT 20
  44. #define MQTT_TASK_PRIO 0
  45. #define MQTT_TASK_QUEUE_SIZE 1
  46. #define MQTT_SEND_TIMOUT 5
  47. #ifndef QUEUE_BUFFER_SIZE
  48. #define QUEUE_BUFFER_SIZE 2048
  49. #endif
  50. void MQTT_Task(MQTT_Client * arg);
  51. static void FUNCTION_ATTRIBUTE
  52. deliver_publish(MQTT_Client* client, uint8_t* message, int length)
  53. {
  54. mqtt_event_data_t event_data;
  55. event_data.topic_length = length;
  56. event_data.topic = mqtt_get_publish_topic(message, &event_data.topic_length);
  57. event_data.data_length = length;
  58. event_data.data = mqtt_get_publish_data(message, &event_data.data_length);
  59. if(client->dataCb)
  60. client->dataCb((uint32_t*)client, event_data.topic, event_data.topic_length, event_data.data, event_data.data_length);
  61. }
  62. /**
  63. * @brief Client received callback function.
  64. * @param arg: contain the ip link information
  65. * @param pdata: received data
  66. * @param len: the lenght of received data
  67. * @retval None
  68. */
  69. void FUNCTION_ATTRIBUTE
  70. mqtt_tcpclient_recv(void *arg, struct data_buf *buffer)
  71. {
  72. INFO("TCP: data received %d bytes\r\n", buffer->length);
  73. uint8_t msg_type;
  74. uint8_t msg_qos;
  75. uint16_t msg_id;
  76. struct pando_tcp_conn *pCon = (struct pando_tcp_conn*)arg;
  77. MQTT_Client *client = (MQTT_Client *)pCon->reverse;
  78. if(buffer->length > MQTT_BUF_SIZE || buffer->length == 0)
  79. {
  80. INFO("receive length is invalid.\n");
  81. return;
  82. }
  83. pd_memcpy(client->mqtt_state.in_buffer + client->mqtt_state.message_length_read, buffer->data, buffer->length);
  84. client->mqtt_state.message_length_read += buffer->length;
  85. READPACKET:
  86. if(client->mqtt_state.message_length_read == 1)
  87. {
  88. INFO("not enough data for read package length.\n!");
  89. return;
  90. }
  91. client->mqtt_state.message_length = mqtt_get_total_length(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
  92. INFO("message length:%d\n", client->mqtt_state.message_length);
  93. show_package(client->mqtt_state.in_buffer, client->mqtt_state.message_length_read);
  94. if(client->mqtt_state.message_length > client->mqtt_state.message_length_read)
  95. {
  96. INFO("not enough data.\n");
  97. return;
  98. }
  99. msg_type = mqtt_get_type(client->mqtt_state.in_buffer);
  100. msg_qos = mqtt_get_qos(client->mqtt_state.in_buffer);
  101. msg_id = mqtt_get_id(client->mqtt_state.in_buffer, client->mqtt_state.message_length);
  102. INFO("client->connstate:%d, type:%d, Qos:%d, id:%d, message length:%d\n", client->connState, msg_type, msg_qos, \
  103. msg_id, client->mqtt_state.message_length);
  104. switch(client->connState)
  105. {
  106. case MQTT_CONNECT_SENDING:
  107. if(msg_type == MQTT_MSG_TYPE_CONNACK)
  108. {
  109. if(client->mqtt_state.pending_msg_type != MQTT_MSG_TYPE_CONNECT)
  110. {
  111. INFO("MQTT: Invalid packet\r\n");
  112. net_tcp_disconnect(client->pCon);
  113. }
  114. else
  115. {
  116. INFO("MQTT: Connected to %s:%d\r\n", client->host, client->port);
  117. client->connState = MQTT_DATA;
  118. if(client->connectedCb)
  119. client->connectedCb((uint32_t*)client);
  120. }
  121. }
  122. break;
  123. case MQTT_DATA:
  124. switch(msg_type)
  125. {
  126. case MQTT_MSG_TYPE_SUBACK:
  127. if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_SUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
  128. INFO("MQTT: Subscribe successful\r\n");
  129. break;
  130. case MQTT_MSG_TYPE_UNSUBACK:
  131. if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_UNSUBSCRIBE && client->mqtt_state.pending_msg_id == msg_id)
  132. INFO("MQTT: UnSubscribe successful\r\n");
  133. break;
  134. case MQTT_MSG_TYPE_PUBLISH:
  135. if(msg_qos == 1)
  136. client->mqtt_state.outbound_message = mqtt_msg_puback(&client->mqtt_state.mqtt_connection, msg_id);
  137. else if(msg_qos == 2)
  138. client->mqtt_state.outbound_message = mqtt_msg_pubrec(&client->mqtt_state.mqtt_connection, msg_id);
  139. if(msg_qos == 1 || msg_qos == 2)
  140. {
  141. INFO("MQTT: Queue response QoS: %d\r\n", msg_qos);
  142. if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data,\
  143. client->mqtt_state.outbound_message->length) == -1)
  144. {
  145. INFO("MQTT: Queue full\r\n");
  146. }
  147. }
  148. show_package(client->mqtt_state.in_buffer, client->mqtt_state.message_length);
  149. deliver_publish(client, client->mqtt_state.in_buffer, client->mqtt_state.message_length);
  150. break;
  151. case MQTT_MSG_TYPE_PUBACK:
  152. if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id)
  153. {
  154. INFO("MQTT: received MQTT_MSG_TYPE_PUBACK, finish QoS1 publish\r\n");
  155. }
  156. break;
  157. case MQTT_MSG_TYPE_PUBREC:
  158. client->mqtt_state.outbound_message = mqtt_msg_pubrel(&client->mqtt_state.mqtt_connection, msg_id);
  159. if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
  160. {
  161. INFO("MQTT: Queue full\r\n");
  162. }
  163. break;
  164. case MQTT_MSG_TYPE_PUBREL:
  165. client->mqtt_state.outbound_message = mqtt_msg_pubcomp(&client->mqtt_state.mqtt_connection, msg_id);
  166. if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
  167. {
  168. INFO("MQTT: Queue full\r\n");
  169. }
  170. break;
  171. case MQTT_MSG_TYPE_PUBCOMP:
  172. if(client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH && client->mqtt_state.pending_msg_id == msg_id)
  173. {
  174. INFO("MQTT: receive MQTT_MSG_TYPE_PUBCOMP, finish QoS2 publish\r\n");
  175. }
  176. break;
  177. case MQTT_MSG_TYPE_PINGREQ:
  178. client->mqtt_state.outbound_message = mqtt_msg_pingresp(&client->mqtt_state.mqtt_connection);
  179. if(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
  180. {
  181. INFO("MQTT: Queue full\r\n");
  182. }
  183. break;
  184. case MQTT_MSG_TYPE_PINGRESP:
  185. INFO("receive a heart beat response!\n");
  186. client->heart_beat_flag = 1;
  187. break;
  188. }
  189. break;
  190. }
  191. // process package adhesive.
  192. uint16_t remain_length = client->mqtt_state.message_length_read - client->mqtt_state.message_length;
  193. client->mqtt_state.message_length_read = remain_length;
  194. INFO("client->mqtt_state.message_length_read = %d\n", client->mqtt_state.message_length_read);
  195. INFO("client->mqtt_state.message_length = %d\n", client->mqtt_state.message_length);
  196. INFO("the package is\n");
  197. show_package(client->mqtt_state.in_buffer, client->mqtt_state.message_length);
  198. if(remain_length > 0)
  199. {
  200. int i = 0;
  201. for(i = 0; i< remain_length; i++)
  202. {
  203. client->mqtt_state.in_buffer[i] = \
  204. client->mqtt_state.in_buffer[client->mqtt_state.message_length_read - remain_length + i];
  205. }
  206. INFO("Get another published message\r\n");
  207. goto READPACKET;
  208. }
  209. MQTT_Task(client);
  210. }
  211. /**
  212. * @brief Client send over callback function.
  213. * @param arg: contain the ip link information
  214. * @retval None
  215. */
  216. void FUNCTION_ATTRIBUTE
  217. mqtt_tcpclient_sent_cb(void *arg, int8_t error_no)
  218. {
  219. struct pando_tcp_conn *pCon = (struct pando_tcp_conn *)arg;
  220. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  221. if(error_no == 0)
  222. {
  223. INFO("TCP: Sent OK\r\n");
  224. client->sendTimeout = 0;
  225. if(client->connState == MQTT_DATA && client->mqtt_state.pending_msg_type == MQTT_MSG_TYPE_PUBLISH){
  226. if(client->publishedCb)
  227. client->publishedCb((uint32_t*)client);
  228. }
  229. }
  230. else if(error_no == -1)
  231. {
  232. INFO("TCP: sent failed!");
  233. client->sendTimeout = 0;
  234. client->connState = TCP_RECONNECT_REQ;
  235. }
  236. MQTT_Task(client);
  237. }
  238. void FUNCTION_ATTRIBUTE
  239. mqtt_tcpclient_recon_cb(void *arg, int8_t errType)
  240. {
  241. struct pando_tcp_conn *pCon = (struct pando_tcp_conn *)arg;
  242. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  243. INFO("TCP: Reconnect to %s:%d\r\n", client->host, client->port);
  244. client->connState = TCP_RECONNECT_REQ;
  245. MQTT_Task(client);
  246. }
  247. void FUNCTION_ATTRIBUTE mqtt_timer(void *arg)
  248. {
  249. MQTT_Client* client = (MQTT_Client*)arg;
  250. struct data_buf buffer;
  251. INFO("%s, %d\n", __func__, client->connState);
  252. if(client->connState == MQTT_DATA){
  253. client->keepAliveTick ++;
  254. if(client->keepAliveTick > client->mqtt_state.connect_info->keepalive){
  255. // check heart beat.
  256. if(client->heart_beat_flag == 0)
  257. {
  258. client->connState = TCP_RECONNECT_REQ;
  259. }
  260. INFO("\r\nMQTT: Send keepalive packet to %s:%d!\r\n", client->host, client->port);
  261. client->mqtt_state.outbound_message = mqtt_msg_pingreq(&client->mqtt_state.mqtt_connection);
  262. client->mqtt_state.pending_msg_type = MQTT_MSG_TYPE_PINGREQ;
  263. client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
  264. client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  265. client->sendTimeout = MQTT_SEND_TIMOUT;
  266. buffer.length = client->mqtt_state.outbound_message->length;
  267. buffer.data = client->mqtt_state.outbound_message->data;
  268. INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  269. net_tcp_send(client->pCon, buffer, client->sendTimeout);
  270. client->mqtt_state.outbound_message = NULL;
  271. client->keepAliveTick = 0;
  272. client->heart_beat_flag = 0;
  273. MQTT_Task(client);
  274. }
  275. } else if(client->connState == TCP_RECONNECT_REQ){
  276. INFO("%s, client->reconnectTick:%d\n", __func__, client->reconnectTick);
  277. client->reconnectTick ++;
  278. if(client->reconnectTick > MQTT_RECONNECT_TIMEOUT) {
  279. client->reconnectTick = 0;
  280. client->connState = TCP_RECONNECT;
  281. MQTT_Task(client);
  282. }
  283. }else if(client->connState == TCP_CONNECTING){
  284. INFO("%s, client->connectTick:%d\n", __func__, client->connectTick);
  285. client->connectTick ++;
  286. if(client->connectTick > MQTT_CONNECT_TIMEOUT)
  287. {
  288. client->connState = TCP_CONNECTING_ERROR;
  289. client->connectTick = 0;
  290. MQTT_Task(client);
  291. }
  292. }
  293. if(client->sendTimeout > 0)
  294. client->sendTimeout --;
  295. }
  296. void FUNCTION_ATTRIBUTE
  297. mqtt_tcpclient_discon_cb(void *arg, int8_t errno)
  298. {
  299. struct pando_tcp_conn *pespconn = (struct pando_tcp_conn *)arg;
  300. MQTT_Client* client = (MQTT_Client *)pespconn->reverse;
  301. INFO("TCP: Disconnected callback\r\n");
  302. client->connState = TCP_RECONNECT_REQ;
  303. if(client->disconnectedCb)
  304. client->disconnectedCb((uint32_t*)client);
  305. MQTT_Task(client);
  306. }
  307. /**
  308. * @brief Tcp client connect success callback function.
  309. * @param arg: contain the ip link information
  310. * @retval None
  311. */
  312. void FUNCTION_ATTRIBUTE
  313. mqtt_tcpclient_connect_cb(void *arg, int8_t errno)
  314. {
  315. struct pando_tcp_conn *pCon = (struct pando_tcp_conn *)arg;
  316. MQTT_Client* client = (MQTT_Client *)pCon->reverse;
  317. struct data_buf buffer;
  318. net_tcp_register_disconnected_callback(pCon, mqtt_tcpclient_discon_cb);
  319. net_tcp_register_recv_callback(pCon, mqtt_tcpclient_recv);
  320. net_tcp_register_sent_callback(pCon, mqtt_tcpclient_sent_cb);
  321. INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
  322. mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
  323. client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
  324. client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
  325. client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
  326. client->sendTimeout = MQTT_SEND_TIMOUT;
  327. buffer.length = client->mqtt_state.outbound_message->length;
  328. buffer.data = client->mqtt_state.outbound_message->data;
  329. INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  330. net_tcp_send(pCon, buffer, client->sendTimeout);
  331. client->mqtt_state.outbound_message = NULL;
  332. client->connState = MQTT_CONNECT_SENDING;
  333. MQTT_Task(client);
  334. }
  335. /**
  336. * @brief Tcp client connect repeat callback function.
  337. * @param arg: contain the ip link information
  338. * @retval None
  339. */
  340. /**
  341. * @brief MQTT publish function.
  342. * @param client: MQTT_Client reference
  343. * @param topic: string topic will publish to
  344. * @param data: buffer data send point to
  345. * @param data_length: length of data
  346. * @param qos: qos
  347. * @param retain: retain
  348. * @retval TRUE if success queue
  349. */
  350. BOOL FUNCTION_ATTRIBUTE
  351. MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
  352. {
  353. uint8_t dataBuffer[MQTT_BUF_SIZE];
  354. uint16_t dataLen;
  355. client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
  356. topic, data, data_length,
  357. qos, retain,
  358. &client->mqtt_state.pending_msg_id);
  359. if(client->mqtt_state.outbound_message->length == 0){
  360. INFO("MQTT: Queuing publish failed\r\n");
  361. return FALSE;
  362. }
  363. INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
  364. while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
  365. INFO("MQTT: Queue full\r\n");
  366. if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  367. INFO("MQTT: Serious buffer error\r\n");
  368. return FALSE;
  369. }
  370. }
  371. MQTT_Task(client);
  372. return TRUE;
  373. }
  374. /**
  375. * @brief MQTT subscibe function.
  376. * @param client: MQTT_Client reference
  377. * @param topic: string topic will subscribe
  378. * @param qos: qos
  379. * @retval TRUE if success queue
  380. */
  381. BOOL FUNCTION_ATTRIBUTE
  382. MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
  383. {
  384. uint8_t dataBuffer[MQTT_BUF_SIZE];
  385. uint16_t dataLen;
  386. client->mqtt_state.outbound_message = mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,
  387. topic, 0,
  388. &client->mqtt_state.pending_msg_id);
  389. INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n",topic, client->mqtt_state.pending_msg_id);
  390. while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1){
  391. INFO("MQTT: Queue full\r\n");
  392. if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1) {
  393. INFO("MQTT: Serious buffer error\r\n");
  394. return FALSE;
  395. }
  396. }
  397. MQTT_Task(client);
  398. return TRUE;
  399. }
  400. static void FUNCTION_ATTRIBUTE
  401. MQTT_exit(MQTT_Client *client)
  402. {
  403. if(client == NULL)
  404. {
  405. return;
  406. }
  407. if(client->host != NULL)
  408. {
  409. pd_free(client->host);
  410. client->host = NULL;
  411. }
  412. if(client->connect_info.password != NULL)
  413. {
  414. pd_free(client->connect_info.password);
  415. client->connect_info.password = NULL;
  416. }
  417. if(client->connect_info.client_id != NULL)
  418. {
  419. pd_free(client->connect_info.client_id);
  420. client->connect_info.client_id = NULL;
  421. }
  422. if(client->connect_info.username != NULL)
  423. {
  424. pd_free(client->connect_info.username);
  425. client->connect_info.username = NULL;
  426. }
  427. if(client->mqtt_state.in_buffer != NULL)
  428. {
  429. pd_free(client->mqtt_state.in_buffer);
  430. client->mqtt_state.in_buffer = NULL;
  431. }
  432. if(client->mqtt_state.out_buffer != NULL)
  433. {
  434. pd_free(client->mqtt_state.out_buffer);
  435. client->mqtt_state.out_buffer = NULL;
  436. }
  437. if(client->msgQueue.buf != NULL)
  438. {
  439. pd_free(client->msgQueue.buf);
  440. client->msgQueue.buf = NULL;
  441. }
  442. INFO("mqtt exit:\n");
  443. if(client->errorCb != NULL)
  444. {
  445. (client->errorCb)((uint32_t*)client);
  446. }
  447. }
  448. void FUNCTION_ATTRIBUTE
  449. MQTT_Task(MQTT_Client *client)
  450. {
  451. INFO("MQTT TASK: state: %d\n", client->connState);
  452. uint8_t dataBuffer[MQTT_BUF_SIZE];
  453. uint16_t dataLen;
  454. struct data_buf buffer;
  455. if(client == NULL)
  456. return;
  457. switch(client->connState){
  458. case TCP_RECONNECT_REQ:
  459. break;
  460. case TCP_RECONNECT:
  461. MQTT_Connect(client);
  462. INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
  463. client->connState = TCP_CONNECTING;
  464. break;
  465. case MQTT_DATA:
  466. INFO("MQTT TASK DATA\n");
  467. if(QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0) {
  468. break;
  469. }
  470. if(QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0){
  471. INFO("%s, dataLen:%d\n", __func__, dataLen);
  472. client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);
  473. client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);
  474. client->sendTimeout = MQTT_SEND_TIMOUT;
  475. buffer.length = dataLen;
  476. buffer.data = dataBuffer;
  477. INFO("MQTT: Sending, type: %d, id: %04X\r\n",client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
  478. net_tcp_send(client->pCon, buffer, client->sendTimeout);
  479. client->mqtt_state.outbound_message = NULL;
  480. break;
  481. }
  482. break;
  483. case TCP_CONNECTING_ERROR:
  484. MQTT_Disconnect(client);
  485. MQTT_exit(client);
  486. break;
  487. }
  488. }
  489. /**
  490. * @brief MQTT initialization connection function
  491. * @param client: MQTT_Client reference
  492. * @param host: Domain or IP string
  493. * @param port: Port to connect
  494. * @param security: 1 for ssl, 0 for none
  495. * @retval None
  496. */
  497. void FUNCTION_ATTRIBUTE
  498. MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security)
  499. {
  500. uint32_t temp;
  501. INFO("MQTT_InitConnection\r\n");
  502. pd_memset(mqttClient, 0, sizeof(MQTT_Client));
  503. temp = pd_strlen(host);
  504. mqttClient->host = (uint8_t*)pd_malloc(temp + 1);
  505. pd_memset(mqttClient->host, 0, temp + 1);
  506. pd_strcpy(mqttClient->host, host);
  507. mqttClient->host[temp] = 0;
  508. mqttClient->port = port;
  509. mqttClient->security = security;
  510. }
  511. /**
  512. * @brief MQTT initialization mqtt client function
  513. * @param client: MQTT_Client reference
  514. * @param clientid: MQTT client id
  515. * @param client_user:MQTT client user
  516. * @param client_pass:MQTT client password
  517. * @param client_pass:MQTT keep alive timer, in second
  518. * @retval None
  519. */
  520. void FUNCTION_ATTRIBUTE
  521. MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
  522. {
  523. uint32_t temp;
  524. INFO("MQTT_InitClient\r\n");
  525. pd_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));
  526. temp = pd_strlen(client_id);
  527. mqttClient->connect_info.client_id = (uint8_t*)pd_malloc(temp + 1);
  528. pd_memset(mqttClient->connect_info.client_id, 0, temp + 1);
  529. pd_strcpy(mqttClient->connect_info.client_id, client_id);
  530. mqttClient->connect_info.client_id[temp] = 0;
  531. temp = pd_strlen(client_user);
  532. mqttClient->connect_info.username = (uint8_t*)pd_malloc(temp + 1);
  533. pd_memset(mqttClient->connect_info.username, 0, temp + 1);
  534. pd_strcpy(mqttClient->connect_info.username, client_user);
  535. mqttClient->connect_info.username[temp] = 0;
  536. temp = pd_strlen(client_pass);
  537. mqttClient->connect_info.password = (uint8_t*)pd_malloc(temp + 1);
  538. pd_memset(mqttClient->connect_info.password, 0, temp + 1);
  539. pd_strcpy(mqttClient->connect_info.password, client_pass);
  540. mqttClient->connect_info.password[temp] = 0;
  541. mqttClient->connect_info.keepalive = keepAliveTime;
  542. mqttClient->connect_info.clean_session = cleanSession;
  543. mqttClient->mqtt_state.in_buffer = (uint8_t *)pd_malloc(MQTT_BUF_SIZE);
  544. pd_memset(mqttClient->mqtt_state.in_buffer, 0, MQTT_BUF_SIZE);
  545. mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;
  546. mqttClient->mqtt_state.out_buffer = (uint8_t *)pd_malloc(MQTT_BUF_SIZE);
  547. pd_memset(mqttClient->mqtt_state.out_buffer, 0, MQTT_BUF_SIZE);
  548. mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;
  549. mqttClient->mqtt_state.connect_info = &mqttClient->connect_info;
  550. mqttClient->mqtt_state.message_length_read = 0;
  551. mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
  552. QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);
  553. //MQTT_Task(mqttClient);
  554. }
  555. void FUNCTION_ATTRIBUTE
  556. MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
  557. {
  558. uint32_t temp;
  559. temp = pd_strlen(will_topic);
  560. mqttClient->connect_info.will_topic = (uint8_t*)pd_malloc(temp + 1);
  561. pd_memset(mqttClient->connect_info.will_topic, 0, temp + 1);
  562. pd_strcpy(mqttClient->connect_info.will_topic, will_topic);
  563. mqttClient->connect_info.will_topic[temp] = 0;
  564. temp = pd_strlen(will_msg);
  565. mqttClient->connect_info.will_message = (uint8_t*)pd_malloc(temp + 1);
  566. pd_memset(mqttClient->connect_info.will_message, 0, temp + 1);
  567. pd_strcpy(mqttClient->connect_info.will_message, will_msg);
  568. mqttClient->connect_info.will_message[temp] = 0;
  569. mqttClient->connect_info.will_qos = will_qos;
  570. mqttClient->connect_info.will_retain = will_retain;
  571. }
  572. /**
  573. * @brief Begin connect to MQTT broker
  574. * @param client: MQTT_Client reference
  575. * @retval None
  576. */
  577. void FUNCTION_ATTRIBUTE
  578. MQTT_Connect(MQTT_Client *mqttClient)
  579. {
  580. MQTT_Disconnect(mqttClient);
  581. mqttClient->pCon = (struct pando_tcp_conn *)pd_malloc(sizeof(struct pando_tcp_conn));
  582. pd_memset(mqttClient->pCon, 0, sizeof(struct pando_tcp_conn));
  583. (mqttClient->pCon)->reverse = mqttClient;
  584. mqttClient->pCon->secure = mqttClient->security;
  585. net_tcp_register_connected_callback(mqttClient->pCon, mqtt_tcpclient_connect_cb);
  586. //no reconnection call back. TODO
  587. mqttClient->keepAliveTick = 0;
  588. mqttClient->reconnectTick = 0;
  589. mqttClient->connectTick = 0;
  590. mqttClient->heart_beat_flag = 1;
  591. mqttClient->mqttTimer.interval = 1000;
  592. mqttClient->mqttTimer.timer_no = 1;
  593. mqttClient->mqttTimer.repeated = 1;
  594. mqttClient->mqttTimer.arg = mqttClient;
  595. mqttClient->mqttTimer.timer_cb = mqtt_timer;
  596. pando_timer_init(&(mqttClient->mqttTimer));
  597. pando_timer_stop(&(mqttClient->mqttTimer));
  598. pando_timer_start(&(mqttClient->mqttTimer));
  599. (mqttClient->pCon)->remote_port = mqttClient->port;
  600. if(UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->remote_ip)) {
  601. INFO("TCP: Connect to ip %s:%d\r\n", mqttClient->host, mqttClient->port);
  602. net_tcp_connect(mqttClient->pCon, mqttClient->sendTimeout);
  603. }
  604. else {
  605. INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);
  606. //need a host name function. TODO
  607. //espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
  608. }
  609. mqttClient->connState = TCP_CONNECTING;
  610. }
  611. void FUNCTION_ATTRIBUTE
  612. MQTT_Disconnect(MQTT_Client *mqttClient)
  613. {
  614. if(mqttClient->pCon)
  615. {
  616. INFO("Free memory\r\n");
  617. net_tcp_disconnect(mqttClient->pCon);
  618. pd_free(mqttClient->pCon);
  619. mqttClient->pCon = NULL;
  620. }
  621. pando_timer_stop(&(mqttClient->mqttTimer));
  622. }
  623. void FUNCTION_ATTRIBUTE
  624. MQTT_OnConnected(MQTT_Client *mqttClient, MqttCallback connectedCb)
  625. {
  626. mqttClient->connectedCb = connectedCb;
  627. }
  628. void FUNCTION_ATTRIBUTE
  629. MQTT_OnConnect_Error(MQTT_Client *mqttClient, MqttCallback error_cb)
  630. {
  631. mqttClient->errorCb= error_cb;
  632. }
  633. void FUNCTION_ATTRIBUTE
  634. MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
  635. {
  636. mqttClient->disconnectedCb = disconnectedCb;
  637. }
  638. void FUNCTION_ATTRIBUTE
  639. MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
  640. {
  641. mqttClient->dataCb = dataCb;
  642. }
  643. void FUNCTION_ATTRIBUTE
  644. MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
  645. {
  646. mqttClient->publishedCb = publishedCb;
  647. }