From 3f659d274332c334eec06f43ad5c28be36099404 Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Mon, 20 Jun 2022 16:09:28 +0800 Subject: [PATCH] fix: shell exit while ws rev is 0 (#13992) * fix: taos shell support ws client * fix: ping/pong * fix: shell ctrl c stop query * fix: shell exit while ws recv 0 * fix: remove magic number * fix: handle error * fix: handle sigpipe * fix: change return error number * fix: remove useless print msg * fix: remove unused variable --- src/kit/shell/inc/shell.h | 1 + src/kit/shell/src/shellEngine.c | 286 ++++++++++++++++---------------- src/kit/shell/src/shellMain.c | 5 + 3 files changed, 149 insertions(+), 143 deletions(-) diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index de34d40a10..c5405e52bd 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -33,6 +33,7 @@ #define MAX_COMMAND_SIZE 1048586 #define HISTORY_FILE ".taos_history" #define DEFAULT_RES_SHOW_NUM 100 +#define TEMP_RECV_BUF 1024 typedef struct SShellHistory { char* hist[MAX_HISTORY_SIZE]; diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index 4c3d60169e..b42c084b20 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -1136,7 +1136,7 @@ int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, si int parse_cloud_dsn() { if (args.cloudDsn == NULL) { fprintf(stderr, "Cannot read cloud service info\n"); - return 1; + return -1; } else { char *start = strstr(args.cloudDsn, "http://"); if (start != NULL) { @@ -1152,7 +1152,7 @@ int parse_cloud_dsn() { char *port = strstr(args.cloudHost, ":"); if ((port == NULL) || (port + strlen(":")) == NULL) { fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn); - return 1; + return -1; } char *token = strstr(port + strlen(":"), "?token="); if ((token == NULL) || (token + strlen("?token=")) == NULL || @@ -1272,7 +1272,9 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) { sent += i; } if (i < 0) { - fprintf(stderr, "websocket send data error\n"); + fprintf(stderr, "websocket send data error, please check the server\n"); + free(data); + return -1; } free(data); return 0; @@ -1353,7 +1355,6 @@ int wsclient_conn() { } else { fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port); } - return 0; } else { cJSON *message = cJSON_GetObjectItem(root, "message"); @@ -1406,10 +1407,10 @@ void wsclient_parse_frame(SWSParser * parser, uint8_t * recv_buffer) { } char *wsclient_get_response() { - uint8_t recv_buffer[1024]= {0}; + uint8_t recv_buffer[TEMP_RECV_BUF]= {0}; int received = 0; SWSParser parser; - int bytes = recv(args.socket, recv_buffer + received, 1023, 0); + int bytes = recv(args.socket, recv_buffer + received, TEMP_RECV_BUF - 1, 0); if (bytes <= 0) { fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes); return NULL; @@ -1429,80 +1430,56 @@ char *wsclient_get_response() { pos += bytes; } response[pos] = '\0'; - if (NULL != strstr(response, "unexpected")) { - printf("motherfucker"); - } return response; } -TAOS_FIELD *wsclient_print_header(cJSON *query, int *pcols, int *pprecison) { - TAOS_FIELD *fields = NULL; - cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count"); - if (cJSON_IsNumber(fields_count)) { - *pcols = (int)fields_count->valueint; - fields = calloc((int)fields_count->valueint, sizeof(TAOS_FIELD)); - cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names"); - cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types"); - cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths"); - if (cJSON_IsArray(fields_names) && cJSON_IsArray(fields_types) && cJSON_IsArray(fields_lengths)) { - for (int i = 0; i < (int)fields_count->valueint; i++) { - strncpy(fields[i].name, cJSON_GetArrayItem(fields_names, i)->valuestring, 65); - fields[i].type = (uint8_t)cJSON_GetArrayItem(fields_types, i)->valueint; - fields[i].bytes = (int16_t)cJSON_GetArrayItem(fields_lengths, i)->valueint; - } - cJSON *precision = cJSON_GetObjectItem(query, "precision"); - if (cJSON_IsNumber(precision)) { - *pprecison = (int)precision->valueint; - int width[TSDB_MAX_COLUMNS]; - for (int col = 0; col < (int)fields_count->valueint; col++) { - width[col] = calcColWidth(fields + col, (int)precision->valueint); - } - printHeader(fields, width, (int)fields_count->valueint); - return fields; - } else { - fprintf(stderr, "Invalid precision key in json\n"); - } - } else { - fprintf(stderr, "Invalid fields_names/fields_types/fields_lengths key in json\n"); - } - } else { - fprintf(stderr, "Invalid fields_count key in json\n"); +int wsclient_fetch_fields(cJSON *query, TAOS_FIELD * fields, int cols) { + cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names"); + cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types"); + cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths"); + if (!cJSON_IsArray(fields_names) || !cJSON_IsArray(fields_types) || !cJSON_IsArray(fields_lengths)) { + fprintf(stderr, "Invalid or miss 'fields_names'/'fields_types'/'fields_lengths' key in response\n"); + return -1; } - if (fields != NULL) { - free(fields); + for (int i = 0; i < cols; i++) { + cJSON* field_name = cJSON_GetArrayItem(fields_names, i); + cJSON* field_type = cJSON_GetArrayItem(fields_types, i); + cJSON* field_length = cJSON_GetArrayItem(fields_lengths, i); + if (!cJSON_IsString(field_name) || !cJSON_IsNumber(field_type) || !cJSON_IsNumber(field_length)) { + fprintf(stderr, "Invalid or miss 'field_name'/'field_type'/'field_length' in query response"); + return -1; + } + strncpy(fields[i].name, field_name->valuestring, 65); + fields[i].type = (uint8_t)field_type->valueint; + fields[i].bytes = (int16_t)field_length->valueint; } - return NULL; + return 0; } int wsclient_check(cJSON *root, int64_t st, int64_t et) { cJSON *code = cJSON_GetObjectItem(root, "code"); - if (cJSON_IsNumber(code)) { - if (code->valueint == 0) { - return 0; - } else { - cJSON *message = cJSON_GetObjectItem(root, "message"); - if (cJSON_IsString(message)) { - fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6); - } else { - fprintf(stderr, "Invalid message key in json\n"); - } - } - } else { - fprintf(stderr, "Invalid code key in json\n"); + cJSON *message = cJSON_GetObjectItem(root, "message"); + if (!cJSON_IsNumber(code) || !cJSON_IsString(message)) { + fprintf(stderr, "Invalid or miss 'code'/'message' in response\n"); + return -1; } - return -1; + if (code->valueint != 0) { + fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6); + return -1; + } + return 0; } int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) { char* response = wsclient_get_response(); if (response == NULL) { - return 0; + return -1; } if (*(int64_t *)response != id) { fprintf(stderr, "Mismatch id with %"PRId64" expect %"PRId64"\n", *(int64_t *)response, id); free(response); - return 0; + return -1; } int pos; int width[TSDB_MAX_COLUMNS]; @@ -1549,106 +1526,129 @@ void wsclient_query(char *command) { if (wsclient_send_sql(command, WS_QUERY, 0)) { return; } - char *query_buffer = wsclient_get_response(); if (query_buffer == NULL) { return; } cJSON* query = cJSON_Parse(query_buffer); if (query == NULL) { - free(query_buffer); fprintf(stderr, "Failed to parse response into json: %s\n", query_buffer); + free(query_buffer); return; } - et = taosGetTimestampUs(); free(query_buffer); + et = taosGetTimestampUs(); if (wsclient_check(query, st, et)) { - goto OVER; + cJSON_Delete(query); return; } cJSON *is_update = cJSON_GetObjectItem(query, "is_update"); - if (cJSON_IsBool(is_update)) { - if (is_update->valueint) { - cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows"); - if (cJSON_IsNumber(affected_rows)) { - et = taosGetTimestampUs(); - printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6); - } else { - fprintf(stderr, "Invalid affected_rows key in json\n"); - } + cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count"); + cJSON *precisionObj = cJSON_GetObjectItem(query, "precision"); + cJSON *id = cJSON_GetObjectItem(query, "id"); + if (!cJSON_IsBool(is_update) || + !cJSON_IsNumber(fields_count) || + !cJSON_IsNumber(precisionObj) || + !cJSON_IsNumber(id)) { + fprintf(stderr, "Invalid or miss 'is_update'/'fields_count'/'precision'/'id' in query response\n"); + cJSON_Delete(query); + return; + } + if (is_update->valueint) { + cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows"); + if (cJSON_IsNumber(affected_rows)) { + et = taosGetTimestampUs(); + printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6); } else { - int cols = 0; - int precision = 0; - int64_t total_rows = 0; - int showed_rows = 0; - TAOS_FIELD *fields = wsclient_print_header(query, &cols, &precision); - if (fields != NULL) { - cJSON *id = cJSON_GetObjectItem(query, "id"); - if (cJSON_IsNumber(id)) { - ws_id = id->valueint; - bool completed = false; - while (!completed && !stop_fetch) { - if (wsclient_send_sql(NULL, WS_FETCH, id->valueint) == 0) { - char *fetch_buffer = wsclient_get_response(); - cJSON* fetch = cJSON_Parse(fetch_buffer); - if (fetch != NULL) { - free(fetch_buffer); - if (wsclient_check(fetch, st, et) == 0) { - cJSON *_completed = cJSON_GetObjectItem(fetch, "completed"); - if (cJSON_IsBool(_completed)) { - if (_completed->valueint) { - cJSON_Delete(fetch); - completed = true; - continue; - } - cJSON *rows = cJSON_GetObjectItem(fetch, "rows"); - if (cJSON_IsNumber(rows)) { - total_rows += rows->valueint; - cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths"); - if (cJSON_IsArray(lengths)) { - for (int i = 0; i < cols; i++) { - fields[i].bytes = (int16_t)(cJSON_GetArrayItem(lengths, i)->valueint); - } - if (showed_rows < DEFAULT_RES_SHOW_NUM) { - if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, id->valueint) == 0) { - wsclient_print_data((int)rows->valueint, fields, cols, id->valueint, precision, &showed_rows); - } - } - cJSON_Delete(fetch); - continue; - } else { - fprintf(stderr, "Invalid lengths key in json\n"); - } - } else { - fprintf(stderr, "Invalid rows key in json\n"); - } - } else { - fprintf(stderr, "Invalid completed key in json\n"); - } - } - cJSON_Delete(fetch); - } else { - fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer); - free(fetch_buffer); - break; - } - } - fprintf(stderr, "err occured in fetch/fetch_block ws actions\n"); - break; - } - et = taosGetTimestampUs(); - printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); - stop_fetch = false; - } else { - fprintf(stderr, "Invalid id key in json\n"); - } - free(fields); + fprintf(stderr, "Invalid or miss 'affected_rows' key in response\n"); + } + cJSON_Delete(query); + return; + } + ws_id = id->valueint; + int cols = (int)fields_count->valueint; + int precision = (int)precisionObj->valueint; + int64_t total_rows = 0; + int showed_rows = 0; + bool completed = false; + TAOS_FIELD fields[cols]; + if (wsclient_fetch_fields(query, fields, cols)) { + cJSON_Delete(query); + return; + } + int width[cols]; + for (int i = 0; i < cols; ++i) { + width[i] = calcColWidth(fields + i, precision); + } + printHeader(fields, width, cols); + + cJSON_Delete(query); + + while (!completed && !stop_fetch) { + if (wsclient_send_sql(NULL, WS_FETCH, ws_id)) { + return; + } + char *fetch_buffer = wsclient_get_response(); + if (fetch_buffer == NULL) { + return; + } + cJSON *fetch = cJSON_Parse(fetch_buffer); + if (fetch == NULL) { + fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer); + free(fetch_buffer); + return; + } + free(fetch_buffer); + if (wsclient_check(fetch, st, et)) { + cJSON_Delete(fetch); + return; + } + cJSON *completedObj = cJSON_GetObjectItem(fetch, "completed"); + cJSON *rows = cJSON_GetObjectItem(fetch, "rows"); + cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths"); + if (!cJSON_IsBool(completedObj) || !cJSON_IsNumber(rows)) { + fprintf(stderr, "Invalid or miss 'completed'/'rows' in fetch response\n"); + cJSON_Delete(fetch); + return; + } + if (completedObj->valueint) { + cJSON_Delete(fetch); + completed = true; + continue; + } + total_rows += rows->valueint; + if (!cJSON_IsArray(lengths)) { + fprintf(stderr, "Invalid or miss 'lengths' in fetch response\n"); + cJSON_Delete(fetch); + return; + } + for (int i = 0; i < cols; i++) { + cJSON* length = cJSON_GetArrayItem(lengths, i); + if (!cJSON_IsNumber(length)) { + fprintf(stderr, "Invalid or miss 'lengths' key in fetch response\n"); + cJSON_Delete(fetch); + return; } + fields[i].bytes = (int16_t)(length->valueint); } + if (showed_rows < DEFAULT_RES_SHOW_NUM) { + if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, ws_id)) { + cJSON_Delete(fetch); + return; + } + if (wsclient_print_data((int)rows->valueint, fields, cols, ws_id, precision, &showed_rows)) { + cJSON_Delete(fetch); + return; + } + cJSON_Delete(fetch); + continue; + } + } + et = taosGetTimestampUs(); + if (stop_fetch) { + printf("Query interrupted, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); + stop_fetch = false; } else { - fprintf(stderr, "Invalid is_update key in json\n"); + printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); } -OVER: - cJSON_Delete(query); - return; } \ No newline at end of file diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 6b45ddff63..bc2a32a4cb 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -27,6 +27,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&cancelSem); } +void shellRestfulSendInterruptHandler(int32_t signum, void *sigInfo, void *context) {} + void *cancelHandler(void *arg) { setThreadName("cancelHandler"); @@ -167,6 +169,9 @@ int main(int argc, char* argv[]) { taosSetSignal(SIGINT, shellQueryInterruptHandler); taosSetSignal(SIGHUP, shellQueryInterruptHandler); taosSetSignal(SIGABRT, shellQueryInterruptHandler); + if (args.restful || args.cloud) { + taosSetSignal(SIGPIPE, shellRestfulSendInterruptHandler); + } /* Get grant information */ shellGetGrantInfo(args.con); -- GitLab