access.c 9.0 KB


  1. #include <sys/cdefs.h>
  2. //
  3. // Created by 李建 on 2023/12/6.
  4. //
  5. #include "access.h"
  6. #include <stdio.h>
  7. #include <stdlib.h>
  8. #include "esp_log.h"
  9. #include <cJSON.h>
  10. #include "../system/miscellaneous_interface.h"
  11. #include "mqtt_client.h"
  12. #include <string.h>
  13. #include <freertos/event_groups.h>
  14. #include "../sub_device/zero_device.h"
  15. #include "../channel.h"
  16. #include "../framework_compat.h"
  17. #include "login.h"
  18. static const char *TAG = "ACCESS";
  19. esp_mqtt_client_handle_t client;
  20. extern EventGroupHandle_t frame_work_event_group;
  21. static bool connected = false;
  22. static void log_error_if_nonzero(const char *message, int error_code) {
  23. if (error_code != 0) {
  24. ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code);
  25. }
  26. }
  27. // 处理收到的publish 消息
  28. static void mqtt_data_callback(char *topic, char *payload) {
  29. cJSON *root = cJSON_Parse(payload);
  30. cJSON *data = cJSON_GetObjectItem(root, "data");
  31. cJSON *subDevice = cJSON_GetObjectItemCaseSensitive(root, "subDeviceId");
  32. sparrow_topic_type topicType = SPARROW_TOPIC_TYPE_UNKNOWN;
  33. char *str = strtok(topic, "/");
  34. while (str != NULL) {
  35. if (strcmp(str, "status") == 0) {
  36. topicType = SPARROW_TOPIC_STATUS;
  37. break;
  38. } else if (strcmp(str, "event") == 0) {
  39. topicType = SPARROW_TOPIC_EVENT;
  40. break;
  41. } else if (strcmp(str, "command") == 0) {
  42. topicType = SPARROW_TOPIC_COMMAND;
  43. break;
  44. }
  45. str = strtok(NULL, "/");
  46. }
  47. ESP_LOGE(TAG, "type :%d", topicType);
  48. if (strcmp(subDevice->valuestring, "01") == 0) {
  49. channel_send_to_sub_device(SPARROW_CHANNEL_PORT_0, cJSON_PrintUnformatted(data), topicType);
  50. } else {
  51. channel_send_to_sub_device(SPARROW_CHANNEL_PORT_1, cJSON_PrintUnformatted(data), topicType);
  52. }
  53. cJSON_Delete(root);
  54. }
  55. static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) {
  56. ESP_LOGE(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id);
  57. esp_mqtt_event_handle_t event = event_data;
  58. MQTT_t *mqttBuf = handler_args;
  59. mqttBuf->event_id = event->event_id;
  60. mqttBuf->return_code = event->error_handle->connect_return_code;
  61. switch ((esp_mqtt_event_id_t) event_id) {
  62. case MQTT_EVENT_CONNECTED:
  63. ESP_LOGE(TAG, "MQTT_EVENT_CONNECTED");
  64. connected = true;
  65. xTaskNotifyGive(mqttBuf->taskHandle);
  66. break;
  67. case MQTT_EVENT_DISCONNECTED:
  68. ESP_LOGE(TAG, "MQTT_EVENT_DISCONNECTED");
  69. connected = false;
  70. xTaskNotifyGive(mqttBuf->taskHandle);
  71. break;
  72. case MQTT_EVENT_SUBSCRIBED:
  73. ESP_LOGE(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
  74. break;
  75. case MQTT_EVENT_UNSUBSCRIBED:
  76. ESP_LOGE(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id);
  77. break;
  78. case MQTT_EVENT_PUBLISHED:
  79. ESP_LOGE(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id);
  80. break;
  81. case MQTT_EVENT_DATA:
  82. ESP_LOGE(TAG, "MQTT_EVENT_DATA");
  83. mqtt_data_callback(event->topic, event->data);
  84. break;
  85. case MQTT_EVENT_ERROR:
  86. ESP_LOGE(TAG, "MQTT_EVENT_ERROR");
  87. if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
  88. log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
  89. log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
  90. log_error_if_nonzero("captured as transport's socket errno",
  91. event->error_handle->esp_transport_sock_errno);
  92. ESP_LOGE(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
  93. }
  94. xTaskNotifyGive(mqttBuf->taskHandle);
  95. break;
  96. default:
  97. ESP_LOGE(TAG, "Other event id:%d", event->event_id);
  98. xTaskNotifyGive(mqttBuf->taskHandle);
  99. break;
  100. }
  101. }
  102. /**
  103. * 设备上报属性
  104. * @param pub
  105. */
  106. void sparrow_publish_status(publish_message_t pub) {
  107. // 构造请求协议
  108. char str_device_serial[16];
  109. get_device_serial(str_device_serial);
  110. cJSON *root = cJSON_CreateObject();
  111. cJSON_AddStringToObject(root, "action", "devSend");
  112. cJSON_AddNumberToObject(root, "msgId", 0);
  113. cJSON_AddStringToObject(root, "deviceCode", str_device_serial);
  114. cJSON_AddStringToObject(root, "subDeviceId", "");
  115. cJSON_AddNumberToObject(root, "timestamp", get_sys_time_ms());
  116. cJSON_AddItemToObject(root, "data", pub.data);
  117. char *payload = cJSON_PrintUnformatted(root);
  118. if (payload) {
  119. char topic[300];
  120. sprintf(topic, "%s%s", TOPIC_PUB_STATUS, str_device_serial);
  121. if (connected) esp_mqtt_client_publish(client, topic, payload, 0, 1, false);
  122. cJSON_Delete(root);
  123. }
  124. free(payload);
  125. }
  126. /**
  127. * 设备上报事件
  128. * @param pub
  129. */
  130. void sparrow_publish_event(publish_message_t pub) {
  131. }
  132. /**
  133. * 设备上报指令响应
  134. * @param pub
  135. */
  136. void sparrow_publish_command(publish_message_t pub) {
  137. }
  138. void device_access_task(void *pv) {
  139. ESP_LOGI(TAG, "begin device access cloud...");
  140. char *addr = yx_data_get(DATANAME_ACCESS_ADDR);
  141. char *strDeviceId = yx_data_get(DATANAME_DEVICE_ID);
  142. char *accessToken = yx_data_get(DATANAME_ACCESS_TOKEN);
  143. char uri[128] = "";
  144. sprintf(uri, "mqtt://%s", addr);
  145. char device_id_hex[10];
  146. sprintf(device_id_hex, "%X", (int) atol(strDeviceId));
  147. char device_serial[16];
  148. get_device_serial(device_serial);
  149. const esp_mqtt_client_config_t mqtt_cfg = {
  150. .broker.address.uri = uri,
  151. .session.protocol_ver = MQTT_PROTOCOL_V_3_1,
  152. .credentials.client_id = device_serial,
  153. .credentials.username = device_id_hex,
  154. .credentials.authentication.password = accessToken,
  155. .session.keepalive = 120,
  156. .network.timeout_ms = 30 * 1000,
  157. .network.reconnect_timeout_ms = 3 * 1000,
  158. .network.disable_auto_reconnect = true // 禁用自动重连
  159. };
  160. MQTT_t mqttBuf = {
  161. .taskHandle = xTaskGetCurrentTaskHandle(),
  162. };
  163. client = esp_mqtt_client_init(&mqtt_cfg);
  164. esp_mqtt_client_register_event(client, MQTT_EVENT_ANY, mqtt_event_handler, &mqttBuf);
  165. esp_err_t err = esp_mqtt_client_start(client);
  166. if(err != ESP_OK) {
  167. esp_mqtt_client_disconnect(client);
  168. esp_mqtt_client_destroy(client);
  169. xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED);
  170. vTaskDelete(NULL);
  171. }
  172. free(addr);
  173. free(strDeviceId);
  174. free(accessToken);
  175. for (;;) {
  176. ulTaskNotifyTake(pdTRUE, portMAX_DELAY);
  177. // 如果连接报错而且是没有认证授权,执行登录
  178. if (mqttBuf.event_id == MQTT_EVENT_ERROR) {
  179. iotx_event_post(IOT_CONNECT_CLOUD_FAILT);
  180. xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED);
  181. if (mqttBuf.return_code == MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED) {
  182. ESP_LOGE(TAG, "access error: MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED, 3 sec retry");
  183. if (client != NULL) {
  184. esp_mqtt_client_disconnect(client);
  185. esp_mqtt_client_destroy(client);
  186. client = NULL;
  187. }
  188. vTaskDelay(3000 / portTICK_PERIOD_MS);
  189. // 开始登录,如果创建任务失败,直接重启
  190. if (xTaskCreate(&device_login_task, "login task",
  191. 1024 * 4, NULL, 5, NULL) != pdPASS) {
  192. ESP_LOGE(TAG, "start login task fail");
  193. esp_restart();
  194. // 84:fc:e6:6a:a3:98
  195. }
  196. break;
  197. } else {
  198. ESP_LOGE(TAG, "mqtt error:%d", mqttBuf.return_code);
  199. break;
  200. }
  201. } else if (mqttBuf.event_id == MQTT_EVENT_CONNECTED) {
  202. // 订阅主题
  203. char str_device_serial[16];
  204. get_device_serial(str_device_serial);
  205. char topic[300];
  206. sprintf(topic, "%s%s", TOPIC_SUB_COMMAND, str_device_serial);
  207. esp_mqtt_client_subscribe(client, topic, 1);
  208. sprintf(topic, "%s%s", TOPIC_SUB_EVENT, str_device_serial);
  209. esp_mqtt_client_subscribe(client, topic, 1);
  210. sprintf(topic, "%s%s", TOPIC_SUB_STATUS, str_device_serial);
  211. esp_mqtt_client_subscribe(client, topic, 1);
  212. iotx_event_post(IOT_CONNECT_CLOUD_SUCCESS);
  213. publish_app_version();
  214. xEventGroupSetBits(frame_work_event_group, MQTT_CONNECTED);
  215. } else if (mqttBuf.event_id == MQTT_EVENT_DISCONNECTED) {
  216. xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED);
  217. iotx_event_post(IOT_CONNECT_CLOUD_FAILT);
  218. break;
  219. }
  220. }
  221. ESP_LOGI(TAG, "delete access task");
  222. esp_mqtt_client_disconnect(client);
  223. esp_mqtt_client_destroy(client);
  224. xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED);
  225. vTaskDelete(NULL);
  226. }