未验证 提交 3f659d27 编写于 作者: Y Yang Zhao 提交者: GitHub

fix: shell exit while ws rev is 0 (#13992)

* fix: taos shell support ws client

* fix: ping/pong

* fix: shell ctrl c stop query

* fix: shell exit while ws recv 0

* fix: remove magic number

* fix: handle error

* fix: handle sigpipe

* fix: change return error number

* fix: remove useless print msg

* fix: remove unused variable
上级 e165a862
......@@ -33,6 +33,7 @@
#define MAX_COMMAND_SIZE 1048586
#define HISTORY_FILE ".taos_history"
#define DEFAULT_RES_SHOW_NUM 100
#define TEMP_RECV_BUF 1024
typedef struct SShellHistory {
char* hist[MAX_HISTORY_SIZE];
......
......@@ -1136,7 +1136,7 @@ int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, si
int parse_cloud_dsn() {
if (args.cloudDsn == NULL) {
fprintf(stderr, "Cannot read cloud service info\n");
return 1;
return -1;
} else {
char *start = strstr(args.cloudDsn, "http://");
if (start != NULL) {
......@@ -1152,7 +1152,7 @@ int parse_cloud_dsn() {
char *port = strstr(args.cloudHost, ":");
if ((port == NULL) || (port + strlen(":")) == NULL) {
fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn);
return 1;
return -1;
}
char *token = strstr(port + strlen(":"), "?token=");
if ((token == NULL) || (token + strlen("?token=")) == NULL ||
......@@ -1272,7 +1272,9 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) {
sent += i;
}
if (i < 0) {
fprintf(stderr, "websocket send data error\n");
fprintf(stderr, "websocket send data error, please check the server\n");
free(data);
return -1;
}
free(data);
return 0;
......@@ -1353,7 +1355,6 @@ int wsclient_conn() {
} else {
fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port);
}
return 0;
} else {
cJSON *message = cJSON_GetObjectItem(root, "message");
......@@ -1406,10 +1407,10 @@ void wsclient_parse_frame(SWSParser * parser, uint8_t * recv_buffer) {
}
char *wsclient_get_response() {
uint8_t recv_buffer[1024]= {0};
uint8_t recv_buffer[TEMP_RECV_BUF]= {0};
int received = 0;
SWSParser parser;
int bytes = recv(args.socket, recv_buffer + received, 1023, 0);
int bytes = recv(args.socket, recv_buffer + received, TEMP_RECV_BUF - 1, 0);
if (bytes <= 0) {
fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes);
return NULL;
......@@ -1429,80 +1430,56 @@ char *wsclient_get_response() {
pos += bytes;
}
response[pos] = '\0';
if (NULL != strstr(response, "unexpected")) {
printf("motherfucker");
}
return response;
}
TAOS_FIELD *wsclient_print_header(cJSON *query, int *pcols, int *pprecison) {
TAOS_FIELD *fields = NULL;
cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count");
if (cJSON_IsNumber(fields_count)) {
*pcols = (int)fields_count->valueint;
fields = calloc((int)fields_count->valueint, sizeof(TAOS_FIELD));
cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names");
cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types");
cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths");
if (cJSON_IsArray(fields_names) && cJSON_IsArray(fields_types) && cJSON_IsArray(fields_lengths)) {
for (int i = 0; i < (int)fields_count->valueint; i++) {
strncpy(fields[i].name, cJSON_GetArrayItem(fields_names, i)->valuestring, 65);
fields[i].type = (uint8_t)cJSON_GetArrayItem(fields_types, i)->valueint;
fields[i].bytes = (int16_t)cJSON_GetArrayItem(fields_lengths, i)->valueint;
}
cJSON *precision = cJSON_GetObjectItem(query, "precision");
if (cJSON_IsNumber(precision)) {
*pprecison = (int)precision->valueint;
int width[TSDB_MAX_COLUMNS];
for (int col = 0; col < (int)fields_count->valueint; col++) {
width[col] = calcColWidth(fields + col, (int)precision->valueint);
}
printHeader(fields, width, (int)fields_count->valueint);
return fields;
} else {
fprintf(stderr, "Invalid precision key in json\n");
}
} else {
fprintf(stderr, "Invalid fields_names/fields_types/fields_lengths key in json\n");
}
} else {
fprintf(stderr, "Invalid fields_count key in json\n");
int wsclient_fetch_fields(cJSON *query, TAOS_FIELD * fields, int cols) {
cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names");
cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types");
cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths");
if (!cJSON_IsArray(fields_names) || !cJSON_IsArray(fields_types) || !cJSON_IsArray(fields_lengths)) {
fprintf(stderr, "Invalid or miss 'fields_names'/'fields_types'/'fields_lengths' key in response\n");
return -1;
}
if (fields != NULL) {
free(fields);
for (int i = 0; i < cols; i++) {
cJSON* field_name = cJSON_GetArrayItem(fields_names, i);
cJSON* field_type = cJSON_GetArrayItem(fields_types, i);
cJSON* field_length = cJSON_GetArrayItem(fields_lengths, i);
if (!cJSON_IsString(field_name) || !cJSON_IsNumber(field_type) || !cJSON_IsNumber(field_length)) {
fprintf(stderr, "Invalid or miss 'field_name'/'field_type'/'field_length' in query response");
return -1;
}
strncpy(fields[i].name, field_name->valuestring, 65);
fields[i].type = (uint8_t)field_type->valueint;
fields[i].bytes = (int16_t)field_length->valueint;
}
return NULL;
return 0;
}
int wsclient_check(cJSON *root, int64_t st, int64_t et) {
cJSON *code = cJSON_GetObjectItem(root, "code");
if (cJSON_IsNumber(code)) {
if (code->valueint == 0) {
return 0;
} else {
cJSON *message = cJSON_GetObjectItem(root, "message");
if (cJSON_IsString(message)) {
fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid message key in json\n");
}
}
} else {
fprintf(stderr, "Invalid code key in json\n");
cJSON *message = cJSON_GetObjectItem(root, "message");
if (!cJSON_IsNumber(code) || !cJSON_IsString(message)) {
fprintf(stderr, "Invalid or miss 'code'/'message' in response\n");
return -1;
}
return -1;
if (code->valueint != 0) {
fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6);
return -1;
}
return 0;
}
int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) {
char* response = wsclient_get_response();
if (response == NULL) {
return 0;
return -1;
}
if (*(int64_t *)response != id) {
fprintf(stderr, "Mismatch id with %"PRId64" expect %"PRId64"\n", *(int64_t *)response, id);
free(response);
return 0;
return -1;
}
int pos;
int width[TSDB_MAX_COLUMNS];
......@@ -1549,106 +1526,129 @@ void wsclient_query(char *command) {
if (wsclient_send_sql(command, WS_QUERY, 0)) {
return;
}
char *query_buffer = wsclient_get_response();
if (query_buffer == NULL) {
return;
}
cJSON* query = cJSON_Parse(query_buffer);
if (query == NULL) {
free(query_buffer);
fprintf(stderr, "Failed to parse response into json: %s\n", query_buffer);
free(query_buffer);
return;
}
et = taosGetTimestampUs();
free(query_buffer);
et = taosGetTimestampUs();
if (wsclient_check(query, st, et)) {
goto OVER;
cJSON_Delete(query);
return;
}
cJSON *is_update = cJSON_GetObjectItem(query, "is_update");
if (cJSON_IsBool(is_update)) {
if (is_update->valueint) {
cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (cJSON_IsNumber(affected_rows)) {
et = taosGetTimestampUs();
printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid affected_rows key in json\n");
}
cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count");
cJSON *precisionObj = cJSON_GetObjectItem(query, "precision");
cJSON *id = cJSON_GetObjectItem(query, "id");
if (!cJSON_IsBool(is_update) ||
!cJSON_IsNumber(fields_count) ||
!cJSON_IsNumber(precisionObj) ||
!cJSON_IsNumber(id)) {
fprintf(stderr, "Invalid or miss 'is_update'/'fields_count'/'precision'/'id' in query response\n");
cJSON_Delete(query);
return;
}
if (is_update->valueint) {
cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (cJSON_IsNumber(affected_rows)) {
et = taosGetTimestampUs();
printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
} else {
int cols = 0;
int precision = 0;
int64_t total_rows = 0;
int showed_rows = 0;
TAOS_FIELD *fields = wsclient_print_header(query, &cols, &precision);
if (fields != NULL) {
cJSON *id = cJSON_GetObjectItem(query, "id");
if (cJSON_IsNumber(id)) {
ws_id = id->valueint;
bool completed = false;
while (!completed && !stop_fetch) {
if (wsclient_send_sql(NULL, WS_FETCH, id->valueint) == 0) {
char *fetch_buffer = wsclient_get_response();
cJSON* fetch = cJSON_Parse(fetch_buffer);
if (fetch != NULL) {
free(fetch_buffer);
if (wsclient_check(fetch, st, et) == 0) {
cJSON *_completed = cJSON_GetObjectItem(fetch, "completed");
if (cJSON_IsBool(_completed)) {
if (_completed->valueint) {
cJSON_Delete(fetch);
completed = true;
continue;
}
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
if (cJSON_IsNumber(rows)) {
total_rows += rows->valueint;
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (cJSON_IsArray(lengths)) {
for (int i = 0; i < cols; i++) {
fields[i].bytes = (int16_t)(cJSON_GetArrayItem(lengths, i)->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, id->valueint) == 0) {
wsclient_print_data((int)rows->valueint, fields, cols, id->valueint, precision, &showed_rows);
}
}
cJSON_Delete(fetch);
continue;
} else {
fprintf(stderr, "Invalid lengths key in json\n");
}
} else {
fprintf(stderr, "Invalid rows key in json\n");
}
} else {
fprintf(stderr, "Invalid completed key in json\n");
}
}
cJSON_Delete(fetch);
} else {
fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer);
free(fetch_buffer);
break;
}
}
fprintf(stderr, "err occured in fetch/fetch_block ws actions\n");
break;
}
et = taosGetTimestampUs();
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
stop_fetch = false;
} else {
fprintf(stderr, "Invalid id key in json\n");
}
free(fields);
fprintf(stderr, "Invalid or miss 'affected_rows' key in response\n");
}
cJSON_Delete(query);
return;
}
ws_id = id->valueint;
int cols = (int)fields_count->valueint;
int precision = (int)precisionObj->valueint;
int64_t total_rows = 0;
int showed_rows = 0;
bool completed = false;
TAOS_FIELD fields[cols];
if (wsclient_fetch_fields(query, fields, cols)) {
cJSON_Delete(query);
return;
}
int width[cols];
for (int i = 0; i < cols; ++i) {
width[i] = calcColWidth(fields + i, precision);
}
printHeader(fields, width, cols);
cJSON_Delete(query);
while (!completed && !stop_fetch) {
if (wsclient_send_sql(NULL, WS_FETCH, ws_id)) {
return;
}
char *fetch_buffer = wsclient_get_response();
if (fetch_buffer == NULL) {
return;
}
cJSON *fetch = cJSON_Parse(fetch_buffer);
if (fetch == NULL) {
fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer);
free(fetch_buffer);
return;
}
free(fetch_buffer);
if (wsclient_check(fetch, st, et)) {
cJSON_Delete(fetch);
return;
}
cJSON *completedObj = cJSON_GetObjectItem(fetch, "completed");
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (!cJSON_IsBool(completedObj) || !cJSON_IsNumber(rows)) {
fprintf(stderr, "Invalid or miss 'completed'/'rows' in fetch response\n");
cJSON_Delete(fetch);
return;
}
if (completedObj->valueint) {
cJSON_Delete(fetch);
completed = true;
continue;
}
total_rows += rows->valueint;
if (!cJSON_IsArray(lengths)) {
fprintf(stderr, "Invalid or miss 'lengths' in fetch response\n");
cJSON_Delete(fetch);
return;
}
for (int i = 0; i < cols; i++) {
cJSON* length = cJSON_GetArrayItem(lengths, i);
if (!cJSON_IsNumber(length)) {
fprintf(stderr, "Invalid or miss 'lengths' key in fetch response\n");
cJSON_Delete(fetch);
return;
}
fields[i].bytes = (int16_t)(length->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, ws_id)) {
cJSON_Delete(fetch);
return;
}
if (wsclient_print_data((int)rows->valueint, fields, cols, ws_id, precision, &showed_rows)) {
cJSON_Delete(fetch);
return;
}
cJSON_Delete(fetch);
continue;
}
}
et = taosGetTimestampUs();
if (stop_fetch) {
printf("Query interrupted, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
stop_fetch = false;
} else {
fprintf(stderr, "Invalid is_update key in json\n");
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
}
OVER:
cJSON_Delete(query);
return;
}
\ No newline at end of file
......@@ -27,6 +27,8 @@ void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
tsem_post(&cancelSem);
}
void shellRestfulSendInterruptHandler(int32_t signum, void *sigInfo, void *context) {}
void *cancelHandler(void *arg) {
setThreadName("cancelHandler");
......@@ -167,6 +169,9 @@ int main(int argc, char* argv[]) {
taosSetSignal(SIGINT, shellQueryInterruptHandler);
taosSetSignal(SIGHUP, shellQueryInterruptHandler);
taosSetSignal(SIGABRT, shellQueryInterruptHandler);
if (args.restful || args.cloud) {
taosSetSignal(SIGPIPE, shellRestfulSendInterruptHandler);
}
/* Get grant information */
shellGetGrantInfo(args.con);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册