#include // // Created by 李建 on 2023/12/6. // #include "access.h" #include #include #include "esp_log.h" #include #include "../system/miscellaneous_interface.h" #include "mqtt_client.h" #include #include #include "../sub_device/zero_device.h" #include "../channel.h" #include "../framework_compat.h" #include "login.h" static const char *TAG = "ACCESS"; esp_mqtt_client_handle_t client; extern EventGroupHandle_t frame_work_event_group; static bool connected = false; static void log_error_if_nonzero(const char *message, int error_code) { if (error_code != 0) { ESP_LOGE(TAG, "Last error %s: 0x%x", message, error_code); } } // 处理收到的publish 消息 static void mqtt_data_callback(char *topic, char *payload) { cJSON *root = cJSON_Parse(payload); cJSON *data = cJSON_GetObjectItem(root, "data"); cJSON *subDevice = cJSON_GetObjectItemCaseSensitive(root, "subDeviceId"); sparrow_topic_type topicType = SPARROW_TOPIC_TYPE_UNKNOWN; char *str = strtok(topic, "/"); while (str != NULL) { if (strcmp(str, "status") == 0) { topicType = SPARROW_TOPIC_STATUS; break; } else if (strcmp(str, "event") == 0) { topicType = SPARROW_TOPIC_EVENT; break; } else if (strcmp(str, "command") == 0) { topicType = SPARROW_TOPIC_COMMAND; break; } str = strtok(NULL, "/"); } ESP_LOGE(TAG, "type :%d", topicType); if (strcmp(subDevice->valuestring, "01") == 0) { channel_send_to_sub_device(SPARROW_CHANNEL_PORT_0, cJSON_PrintUnformatted(data), topicType); } else { channel_send_to_sub_device(SPARROW_CHANNEL_PORT_1, cJSON_PrintUnformatted(data), topicType); } cJSON_Delete(root); } static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGE(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id); esp_mqtt_event_handle_t event = event_data; MQTT_t *mqttBuf = handler_args; mqttBuf->event_id = event->event_id; mqttBuf->return_code = event->error_handle->connect_return_code; switch ((esp_mqtt_event_id_t) event_id) { case MQTT_EVENT_CONNECTED: ESP_LOGE(TAG, "MQTT_EVENT_CONNECTED"); connected = true; xTaskNotifyGive(mqttBuf->taskHandle); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGE(TAG, "MQTT_EVENT_DISCONNECTED"); connected = false; xTaskNotifyGive(mqttBuf->taskHandle); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGE(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGE(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_PUBLISHED: ESP_LOGE(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGE(TAG, "MQTT_EVENT_DATA"); mqtt_data_callback(event->topic, event->data); break; case MQTT_EVENT_ERROR: ESP_LOGE(TAG, "MQTT_EVENT_ERROR"); if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) { log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err); log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err); log_error_if_nonzero("captured as transport's socket errno", event->error_handle->esp_transport_sock_errno); ESP_LOGE(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno)); } xTaskNotifyGive(mqttBuf->taskHandle); break; default: ESP_LOGE(TAG, "Other event id:%d", event->event_id); xTaskNotifyGive(mqttBuf->taskHandle); break; } } /** * 设备上报属性 * @param pub */ void sparrow_publish_status(publish_message_t pub) { // 构造请求协议 char str_device_serial[16]; get_device_serial(str_device_serial); cJSON *root = cJSON_CreateObject(); cJSON_AddStringToObject(root, "action", "devSend"); cJSON_AddNumberToObject(root, "msgId", 0); cJSON_AddStringToObject(root, "deviceCode", str_device_serial); cJSON_AddStringToObject(root, "subDeviceId", ""); cJSON_AddNumberToObject(root, "timestamp", get_sys_time_ms()); cJSON_AddItemToObject(root, "data", pub.data); char *payload = cJSON_PrintUnformatted(root); if (payload) { char topic[300]; sprintf(topic, "%s%s", TOPIC_PUB_STATUS, str_device_serial); if (connected) esp_mqtt_client_publish(client, topic, payload, 0, 1, false); cJSON_Delete(root); } free(payload); } /** * 设备上报事件 * @param pub */ void sparrow_publish_event(publish_message_t pub) { } /** * 设备上报指令响应 * @param pub */ void sparrow_publish_command(publish_message_t pub) { } void device_access_task(void *pv) { ESP_LOGI(TAG, "begin device access cloud..."); char *addr = yx_data_get(DATANAME_ACCESS_ADDR); char *strDeviceId = yx_data_get(DATANAME_DEVICE_ID); char *accessToken = yx_data_get(DATANAME_ACCESS_TOKEN); char uri[128] = ""; sprintf(uri, "mqtt://%s", addr); char device_id_hex[10]; sprintf(device_id_hex, "%X", (int) atol(strDeviceId)); char device_serial[16]; get_device_serial(device_serial); const esp_mqtt_client_config_t mqtt_cfg = { .broker.address.uri = uri, .session.protocol_ver = MQTT_PROTOCOL_V_3_1, .credentials.client_id = device_serial, .credentials.username = device_id_hex, .credentials.authentication.password = accessToken, .session.keepalive = 120, .network.timeout_ms = 30 * 1000, .network.reconnect_timeout_ms = 3 * 1000, .network.disable_auto_reconnect = true // 禁用自动重连 }; MQTT_t mqttBuf = { .taskHandle = xTaskGetCurrentTaskHandle(), }; client = esp_mqtt_client_init(&mqtt_cfg); esp_mqtt_client_register_event(client, MQTT_EVENT_ANY, mqtt_event_handler, &mqttBuf); esp_err_t err = esp_mqtt_client_start(client); if(err != ESP_OK) { esp_mqtt_client_disconnect(client); esp_mqtt_client_destroy(client); xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED); vTaskDelete(NULL); } free(addr); free(strDeviceId); free(accessToken); for (;;) { ulTaskNotifyTake(pdTRUE, portMAX_DELAY); // 如果连接报错而且是没有认证授权,执行登录 if (mqttBuf.event_id == MQTT_EVENT_ERROR) { iotx_event_post(IOT_CONNECT_CLOUD_FAILT); xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED); if (mqttBuf.return_code == MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED) { ESP_LOGE(TAG, "access error: MQTT_CONNECTION_REFUSE_NOT_AUTHORIZED, 3 sec retry"); if (client != NULL) { esp_mqtt_client_disconnect(client); esp_mqtt_client_destroy(client); client = NULL; } vTaskDelay(3000 / portTICK_PERIOD_MS); // 开始登录,如果创建任务失败,直接重启 if (xTaskCreate(&device_login_task, "login task", 1024 * 4, NULL, 5, NULL) != pdPASS) { ESP_LOGE(TAG, "start login task fail"); esp_restart(); // 84:fc:e6:6a:a3:98 } break; } else { ESP_LOGE(TAG, "mqtt error:%d", mqttBuf.return_code); break; } } else if (mqttBuf.event_id == MQTT_EVENT_CONNECTED) { // 订阅主题 char str_device_serial[16]; get_device_serial(str_device_serial); char topic[300]; sprintf(topic, "%s%s", TOPIC_SUB_COMMAND, str_device_serial); esp_mqtt_client_subscribe(client, topic, 1); sprintf(topic, "%s%s", TOPIC_SUB_EVENT, str_device_serial); esp_mqtt_client_subscribe(client, topic, 1); sprintf(topic, "%s%s", TOPIC_SUB_STATUS, str_device_serial); esp_mqtt_client_subscribe(client, topic, 1); iotx_event_post(IOT_CONNECT_CLOUD_SUCCESS); publish_app_version(); xEventGroupSetBits(frame_work_event_group, MQTT_CONNECTED); } else if (mqttBuf.event_id == MQTT_EVENT_DISCONNECTED) { xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED); iotx_event_post(IOT_CONNECT_CLOUD_FAILT); break; } } ESP_LOGI(TAG, "delete access task"); esp_mqtt_client_disconnect(client); esp_mqtt_client_destroy(client); xEventGroupSetBits(frame_work_event_group, MQTT_DISCONNECTED); vTaskDelete(NULL); }