提交 6fc5933b 编写于 作者: A Alex Duan

feat(shell): autotab reset shellEngine.c

上级 ef242d0a
...@@ -137,7 +137,7 @@ void shellInit(SShellArguments *_args) { ...@@ -137,7 +137,7 @@ void shellInit(SShellArguments *_args) {
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
#endif #endif
return; return;
} }
...@@ -262,22 +262,41 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -262,22 +262,41 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int64_t st, et; int64_t st, et;
wordexp_t full_path; wordexp_t full_path;
char * sptr = NULL; char * sptr = NULL;
char * tmp = NULL;
char * cptr = NULL; char * cptr = NULL;
char * fname = NULL; char * fname = NULL;
bool printMode = false; bool printMode = false;
int match;
sptr = command;
while ((sptr = tstrstr(sptr, ">>", true)) != NULL) {
// find the last ">>" if any
tmp = sptr;
sptr += 2;
}
sptr = tmp;
if (sptr != NULL) {
// select ... where col >> n op m ...;
match = regex_match(sptr + 2, "^\\s*.{1,}\\s*[\\>|\\<|\\<=|\\>=|=|!=]\\s*.{1,};\\s*$", REG_EXTENDED | REG_ICASE);
if (match == 0) {
// select col >> n from ...;
match = regex_match(sptr + 2, "^\\s*.{1,}\\s{1,}.{1,};\\s*$", REG_EXTENDED | REG_ICASE);
if (match == 0) {
cptr = tstrstr(command, ";", true);
if (cptr != NULL) {
*cptr = '\0';
}
if ((sptr = tstrstr(command, ">>", true)) != NULL) { if (wordexp(sptr + 2, &full_path, 0) != 0) {
cptr = tstrstr(command, ";", true); fprintf(stderr, "ERROR: invalid filename: %s\n", sptr + 2);
if (cptr != NULL) { return;
*cptr = '\0'; }
} *sptr = '\0';
fname = full_path.we_wordv[0];
if (wordexp(sptr + 2, &full_path, 0) != 0) { }
fprintf(stderr, "ERROR: invalid filename: %s\n", sptr + 2);
return;
} }
*sptr = '\0';
fname = full_path.we_wordv[0];
} }
if ((sptr = tstrstr(command, "\\G", true)) != NULL) { if ((sptr = tstrstr(command, "\\G", true)) != NULL) {
...@@ -305,12 +324,12 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -305,12 +324,12 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
int64_t oresult = atomic_load_64(&result); int64_t oresult = atomic_load_64(&result);
if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { if (regex_match(command, "^\\s*use\\s+([a-zA-Z0-9_]+|`.+`)\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) {
fprintf(stdout, "Database changed.\n\n"); fprintf(stdout, "Database changed.\n\n");
fflush(stdout); fflush(stdout);
// call back auto tab module // call back auto tab module
callbackAutoTab(command, pSql, true); callbackAutoTab(command, pSql, true);
atomic_store_64(&result, 0); atomic_store_64(&result, 0);
freeResultWithRid(oresult); freeResultWithRid(oresult);
...@@ -346,14 +365,13 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { ...@@ -346,14 +365,13 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
} else { } else {
printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6);
} }
} else { } else {
int num_rows_affacted = taos_affected_rows(pSql); int num_rows_affacted = taos_affected_rows(pSql);
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6); printf("Query OK, %d of %d row(s) in database (%.6fs)\n", num_rows_affacted, num_rows_affacted, (et - st) / 1E6);
// call auto tab // call auto tab
callbackAutoTab(command, pSql, false); callbackAutoTab(command, pSql, false);
} }
printf("\n"); printf("\n");
...@@ -1144,7 +1162,7 @@ int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, si ...@@ -1144,7 +1162,7 @@ int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, si
int parse_cloud_dsn() { int parse_cloud_dsn() {
if (args.cloudDsn == NULL) { if (args.cloudDsn == NULL) {
fprintf(stderr, "Cannot read cloud service info\n"); fprintf(stderr, "Cannot read cloud service info\n");
return -1; return 1;
} else { } else {
char *start = strstr(args.cloudDsn, "http://"); char *start = strstr(args.cloudDsn, "http://");
if (start != NULL) { if (start != NULL) {
...@@ -1160,7 +1178,7 @@ int parse_cloud_dsn() { ...@@ -1160,7 +1178,7 @@ int parse_cloud_dsn() {
char *port = strstr(args.cloudHost, ":"); char *port = strstr(args.cloudHost, ":");
if ((port == NULL) || (port + strlen(":")) == NULL) { if ((port == NULL) || (port + strlen(":")) == NULL) {
fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn); fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn);
return -1; return 1;
} }
char *token = strstr(port + strlen(":"), "?token="); char *token = strstr(port + strlen(":"), "?token=");
if ((token == NULL) || (token + strlen("?token=")) == NULL || if ((token == NULL) || (token + strlen("?token=")) == NULL ||
...@@ -1218,11 +1236,12 @@ int wsclient_handshake() { ...@@ -1218,11 +1236,12 @@ int wsclient_handshake() {
return 0; return 0;
} }
int wsclient_send(char *strdata, WebSocketFrameType frame) { int wsclient_send(char *strdata) {
struct timeval tv; struct timeval tv;
unsigned char mask[4]; unsigned char mask[4];
unsigned int mask_int; unsigned int mask_int;
unsigned long long payload_len; unsigned long long payload_len;
unsigned char finNopcode;
unsigned int payload_len_small; unsigned int payload_len_small;
unsigned int payload_offset = 6; unsigned int payload_offset = 6;
unsigned int len_size; unsigned int len_size;
...@@ -1236,6 +1255,7 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) { ...@@ -1236,6 +1255,7 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) {
mask_int = rand(); mask_int = rand();
memcpy(mask, &mask_int, 4); memcpy(mask, &mask_int, 4);
payload_len = strlen(strdata); payload_len = strlen(strdata);
finNopcode = 0x81;
if (payload_len <= 125) { if (payload_len <= 125) {
frame_size = 6 + payload_len; frame_size = 6 + payload_len;
payload_len_small = payload_len; payload_len_small = payload_len;
...@@ -1253,7 +1273,7 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) { ...@@ -1253,7 +1273,7 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) {
} }
data = (char *)malloc(frame_size); data = (char *)malloc(frame_size);
memset(data, 0, frame_size); memset(data, 0, frame_size);
*data = frame; *data = finNopcode;
*(data + 1) = payload_len_small | 0x80; *(data + 1) = payload_len_small | 0x80;
if (payload_len_small == 126) { if (payload_len_small == 126) {
payload_len &= 0xffff; payload_len &= 0xffff;
...@@ -1280,16 +1300,13 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) { ...@@ -1280,16 +1300,13 @@ int wsclient_send(char *strdata, WebSocketFrameType frame) {
sent += i; sent += i;
} }
if (i < 0) { if (i < 0) {
fprintf(stderr, "websocket send data error, please check the server\n"); fprintf(stderr, "websocket send data error\n");
free(data);
return -1;
} }
free(data); free(data);
return 0; return 0;
} }
int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id) { int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int id) {
int code = 1;
cJSON *json = cJSON_CreateObject(); cJSON *json = cJSON_CreateObject();
cJSON *_args = cJSON_CreateObject(); cJSON *_args = cJSON_CreateObject();
cJSON_AddNumberToObject(_args, "req_id", 1); cJSON_AddNumberToObject(_args, "req_id", 1);
...@@ -1313,22 +1330,15 @@ int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id) { ...@@ -1313,22 +1330,15 @@ int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id) {
cJSON_AddStringToObject(json, "action", "fetch_block"); cJSON_AddStringToObject(json, "action", "fetch_block");
cJSON_AddNumberToObject(_args, "id", id); cJSON_AddNumberToObject(_args, "id", id);
break; break;
case WS_CLOSE:
cJSON_AddStringToObject(json, "action", "close");
cJSON_AddNumberToObject(_args, "id", id);
break;
} }
cJSON_AddItemToObject(json, "args", _args); cJSON_AddItemToObject(json, "args", _args);
char *strdata = NULL; char *strdata = NULL;
strdata = cJSON_Print(json); strdata = cJSON_Print(json);
if (wsclient_send(strdata, TEXT_FRAME)) { if (wsclient_send(strdata)) {
goto OVER; free(strdata);
} return -1;
code = 0; }
OVER: return 0;
free(strdata);
cJSON_Delete(json);
return code;
} }
int wsclient_conn() { int wsclient_conn() {
...@@ -1363,6 +1373,7 @@ int wsclient_conn() { ...@@ -1363,6 +1373,7 @@ int wsclient_conn() {
} else { } else {
fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port); fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port);
} }
return 0; return 0;
} else { } else {
cJSON *message = cJSON_GetObjectItem(root, "message"); cJSON *message = cJSON_GetObjectItem(root, "message");
...@@ -1377,135 +1388,143 @@ int wsclient_conn() { ...@@ -1377,135 +1388,143 @@ int wsclient_conn() {
return -1; return -1;
} }
void wsclient_parse_frame(SWSParser * parser, uint8_t * recv_buffer) { cJSON *wsclient_parse_response() {
unsigned char msg_opcode = recv_buffer[0] & 0x0F; char *recv_buffer = calloc(1, 4096);
unsigned char msg_masked = (recv_buffer[1] >> 7) & 0x01; int start = 0;
int payload_length = 0; bool found = false;
int pos = 2; int received = 0;
int length_field = recv_buffer[1] &(~0x80); int bytes;
unsigned int mask = 0; int recv_length = 4095;
if (length_field <= 125) { do {
payload_length = length_field; bytes = recv(args.socket, recv_buffer + received, recv_length - received, 0);
} else if (length_field == 126) { if (bytes == -1) {
payload_length = recv_buffer[2]; free(recv_buffer);
for (int i = 0; i < 1; i++) { fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes);
payload_length = (payload_length << 8) + recv_buffer[3 + i]; return NULL;
} }
pos += 2;
} else if (length_field == 127) { if (!found) {
payload_length = recv_buffer[2]; for (; start < recv_length - received; start++) {
for (int i = 0; i < 7; i++) { if ((recv_buffer + start)[0] == '{') {
payload_length = (payload_length << 8) + recv_buffer[3 + i]; found = true;
break;
}
}
} }
pos += 8; if (NULL != strstr(recv_buffer + start, "}")) {
} break;
if (msg_masked) {
mask = *((unsigned int *) (recv_buffer + pos));
pos += 4;
const uint8_t *c = recv_buffer + pos;
for (int i = 0; i < payload_length; i++) {
recv_buffer[i] = c[i] ^ ((unsigned char *) (&mask))[i % 4];
} }
} received += bytes;
if (msg_opcode == 0x9) { if (received >= recv_length) {
parser->frame = PING_FRAME; recv_length += 4096;
} recv_buffer = realloc(recv_buffer + start, recv_length);
parser->offset = pos;
parser->payload_length = payload_length;
}
char *wsclient_get_response() {
uint8_t recv_buffer[TEMP_RECV_BUF]= {0};
int received = 0;
SWSParser parser;
int bytes = recv(args.socket, recv_buffer + received, TEMP_RECV_BUF - 1, 0);
if (bytes <= 0) {
fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes);
return NULL;
}
wsclient_parse_frame(&parser, recv_buffer);
if (parser.frame == PING_FRAME) {
if (wsclient_send("pong", PONG_FRAME)) {
return NULL;
} }
return wsclient_get_response(); } while (1);
} cJSON *res = cJSON_Parse(recv_buffer + start);
char* response = calloc(1, parser.payload_length + 1); if (res == NULL) {
int pos = bytes - parser.offset; fprintf(stderr, "fail to parse response into json: %s\n", recv_buffer + start);
memcpy(response, recv_buffer + parser.offset, pos); free(recv_buffer);
while (pos < parser.payload_length) { return NULL;
bytes = recv(args.socket, response + pos, parser.payload_length - pos, 0);
pos += bytes;
} }
response[pos] = '\0'; return res;
return response;
} }
int wsclient_fetch_fields(cJSON *query, TAOS_FIELD * fields, int cols) { TAOS_FIELD *wsclient_print_header(cJSON *query, int *pcols, int *pprecison) {
cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names"); TAOS_FIELD *fields = NULL;
cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types"); cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count");
cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths"); if (cJSON_IsNumber(fields_count)) {
if (!cJSON_IsArray(fields_names) || !cJSON_IsArray(fields_types) || !cJSON_IsArray(fields_lengths)) { *pcols = (int)fields_count->valueint;
fprintf(stderr, "Invalid or miss 'fields_names'/'fields_types'/'fields_lengths' key in response\n"); fields = calloc((int)fields_count->valueint, sizeof(TAOS_FIELD));
return -1; cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names");
} cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types");
for (int i = 0; i < cols; i++) { cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths");
cJSON* field_name = cJSON_GetArrayItem(fields_names, i); if (cJSON_IsArray(fields_names) && cJSON_IsArray(fields_types) && cJSON_IsArray(fields_lengths)) {
cJSON* field_type = cJSON_GetArrayItem(fields_types, i); for (int i = 0; i < (int)fields_count->valueint; i++) {
cJSON* field_length = cJSON_GetArrayItem(fields_lengths, i); strncpy(fields[i].name, cJSON_GetArrayItem(fields_names, i)->valuestring, 65);
if (!cJSON_IsString(field_name) || !cJSON_IsNumber(field_type) || !cJSON_IsNumber(field_length)) { fields[i].type = (uint8_t)cJSON_GetArrayItem(fields_types, i)->valueint;
fprintf(stderr, "Invalid or miss 'field_name'/'field_type'/'field_length' in query response"); fields[i].bytes = (int16_t)cJSON_GetArrayItem(fields_lengths, i)->valueint;
return -1; }
cJSON *precision = cJSON_GetObjectItem(query, "precision");
if (cJSON_IsNumber(precision)) {
*pprecison = (int)precision->valueint;
int width[TSDB_MAX_COLUMNS];
for (int col = 0; col < (int)fields_count->valueint; col++) {
width[col] = calcColWidth(fields + col, (int)precision->valueint);
}
printHeader(fields, width, (int)fields_count->valueint);
return fields;
} else {
fprintf(stderr, "Invalid precision key in json\n");
}
} else {
fprintf(stderr, "Invalid fields_names/fields_types/fields_lengths key in json\n");
} }
strncpy(fields[i].name, field_name->valuestring, 65); } else {
fields[i].type = (uint8_t)field_type->valueint; fprintf(stderr, "Invalid fields_count key in json\n");
fields[i].bytes = (int16_t)field_length->valueint;
} }
return 0; if (fields != NULL) {
free(fields);
}
return NULL;
} }
int wsclient_check(cJSON *root, int64_t st, int64_t et) { int wsclient_check(cJSON *root, int64_t st, int64_t et) {
cJSON *code = cJSON_GetObjectItem(root, "code"); cJSON *code = cJSON_GetObjectItem(root, "code");
cJSON *message = cJSON_GetObjectItem(root, "message"); if (cJSON_IsNumber(code)) {
if (!cJSON_IsNumber(code) || !cJSON_IsString(message)) { if (code->valueint == 0) {
fprintf(stderr, "Invalid or miss 'code'/'message' in response\n"); return 0;
return -1; } else {
} cJSON *message = cJSON_GetObjectItem(root, "message");
if (code->valueint != 0) { if (cJSON_IsString(message)) {
fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6); fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6);
return -1; } else {
fprintf(stderr, "Invalid message key in json\n");
}
}
} else {
fprintf(stderr, "Invalid code key in json\n");
} }
return 0; return -1;
} }
int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) { int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) {
char* response = wsclient_get_response(); char *recv_buffer = calloc(1, 4096);
if (response == NULL) { int col_length = 0;
return -1; for (int i = 0; i < cols; i++) {
col_length += fields[i].bytes;
} }
int total_recv_len = col_length * rows + 12;
int received = 0;
int recv_length = 4095;
int start = 0;
int pos;
do {
int bytes = recv(args.socket, recv_buffer + received, recv_length - received, 0);
received += bytes;
if (received >= recv_length) {
recv_length += 4096;
recv_buffer = realloc(recv_buffer, recv_length);
}
} while (received < total_recv_len);
if (*(int64_t *)response != id) { while (1) {
fprintf(stderr, "Mismatch id with %"PRId64" expect %"PRId64"\n", *(int64_t *)response, id); if (*(int64_t *)(recv_buffer + start) == id) {
free(response); break;
return -1; }
start++;
} }
int pos; start += 8;
int width[TSDB_MAX_COLUMNS]; int width[TSDB_MAX_COLUMNS];
for (int c = 0; c < cols; c++) { for (int c = 0; c < cols; c++) {
width[c] = calcColWidth(fields + c, precision); width[c] = calcColWidth(fields + c, precision);
} }
for (int i = 0; i < rows; i++) { for (int i = 0; i < rows; i++) {
if (*pshowed_rows == DEFAULT_RES_SHOW_NUM) { if (*pshowed_rows == DEFAULT_RES_SHOW_NUM) {
printf("\n"); free(recv_buffer);
printf(" Notice: The result shows only the first %d rows.\n", DEFAULT_RES_SHOW_NUM);
printf("\n");
printf(" You can use Ctrl+C to stop the underway fetching.\n");
printf("\n");
free(response);
return 0; return 0;
} }
for (int c = 0; c < cols; c++) { for (int c = 0; c < cols; c++) {
pos = 8; pos = start;
pos += i * fields[c].bytes; pos += i * fields[c].bytes;
for (int j = 0; j < c; j++) { for (int j = 0; j < c; j++) {
pos += fields[j].bytes * rows; pos += fields[j].bytes * rows;
...@@ -1514,17 +1533,17 @@ int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int ...@@ -1514,17 +1533,17 @@ int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int
int16_t length = 0; int16_t length = 0;
if (fields[c].type == TSDB_DATA_TYPE_NCHAR || fields[c].type == TSDB_DATA_TYPE_BINARY || if (fields[c].type == TSDB_DATA_TYPE_NCHAR || fields[c].type == TSDB_DATA_TYPE_BINARY ||
fields[c].type == TSDB_DATA_TYPE_JSON) { fields[c].type == TSDB_DATA_TYPE_JSON) {
length = *(int16_t *)(response + pos); length = *(int16_t *)(recv_buffer + pos);
pos += 2; pos += 2;
} }
printField((const char *)(response + pos), fields + c, width[c], (int32_t)length, precision); printField((const char *)(recv_buffer + pos), fields + c, width[c], (int32_t)length, precision);
putchar(' '); putchar(' ');
putchar('|'); putchar('|');
} }
putchar('\n'); putchar('\n');
*pshowed_rows += 1; *pshowed_rows += 1;
} }
free(response); free(recv_buffer);
return 0; return 0;
} }
...@@ -1534,129 +1553,90 @@ void wsclient_query(char *command) { ...@@ -1534,129 +1553,90 @@ void wsclient_query(char *command) {
if (wsclient_send_sql(command, WS_QUERY, 0)) { if (wsclient_send_sql(command, WS_QUERY, 0)) {
return; return;
} }
char *query_buffer = wsclient_get_response();
if (query_buffer == NULL) { et = taosGetTimestampUs();
return; cJSON *query = wsclient_parse_response();
}
cJSON* query = cJSON_Parse(query_buffer);
if (query == NULL) { if (query == NULL) {
fprintf(stderr, "Failed to parse response into json: %s\n", query_buffer);
free(query_buffer);
return; return;
} }
free(query_buffer);
et = taosGetTimestampUs();
if (wsclient_check(query, st, et)) { if (wsclient_check(query, st, et)) {
cJSON_Delete(query);
return; return;
} }
cJSON *is_update = cJSON_GetObjectItem(query, "is_update"); cJSON *is_update = cJSON_GetObjectItem(query, "is_update");
cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count"); if (cJSON_IsBool(is_update)) {
cJSON *precisionObj = cJSON_GetObjectItem(query, "precision"); if (is_update->valueint) {
cJSON *id = cJSON_GetObjectItem(query, "id"); cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (!cJSON_IsBool(is_update) || if (cJSON_IsNumber(affected_rows)) {
!cJSON_IsNumber(fields_count) || printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
!cJSON_IsNumber(precisionObj) || } else {
!cJSON_IsNumber(id)) { fprintf(stderr, "Invalid affected_rows key in json\n");
fprintf(stderr, "Invalid or miss 'is_update'/'fields_count'/'precision'/'id' in query response\n");
cJSON_Delete(query);
return;
}
if (is_update->valueint) {
cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows");
if (cJSON_IsNumber(affected_rows)) {
et = taosGetTimestampUs();
printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid or miss 'affected_rows' key in response\n");
}
cJSON_Delete(query);
return;
}
ws_id = id->valueint;
int cols = (int)fields_count->valueint;
int precision = (int)precisionObj->valueint;
int64_t total_rows = 0;
int showed_rows = 0;
bool completed = false;
TAOS_FIELD fields[TSDB_MAX_COLUMNS];
if (wsclient_fetch_fields(query, fields, cols)) {
cJSON_Delete(query);
return;
}
int width[TSDB_MAX_COLUMNS];
for (int i = 0; i < cols; ++i) {
width[i] = calcColWidth(fields + i, precision);
}
printHeader(fields, width, cols);
cJSON_Delete(query);
while (!completed && !stop_fetch) {
if (wsclient_send_sql(NULL, WS_FETCH, ws_id)) {
return;
}
char *fetch_buffer = wsclient_get_response();
if (fetch_buffer == NULL) {
return;
}
cJSON *fetch = cJSON_Parse(fetch_buffer);
if (fetch == NULL) {
fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer);
free(fetch_buffer);
return;
}
free(fetch_buffer);
if (wsclient_check(fetch, st, et)) {
cJSON_Delete(fetch);
return;
}
cJSON *completedObj = cJSON_GetObjectItem(fetch, "completed");
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (!cJSON_IsBool(completedObj) || !cJSON_IsNumber(rows)) {
fprintf(stderr, "Invalid or miss 'completed'/'rows' in fetch response\n");
cJSON_Delete(fetch);
return;
}
if (completedObj->valueint) {
cJSON_Delete(fetch);
completed = true;
continue;
}
total_rows += rows->valueint;
if (!cJSON_IsArray(lengths)) {
fprintf(stderr, "Invalid or miss 'lengths' in fetch response\n");
cJSON_Delete(fetch);
return;
}
for (int i = 0; i < cols; i++) {
cJSON* length = cJSON_GetArrayItem(lengths, i);
if (!cJSON_IsNumber(length)) {
fprintf(stderr, "Invalid or miss 'lengths' key in fetch response\n");
cJSON_Delete(fetch);
return;
}
fields[i].bytes = (int16_t)(length->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, ws_id)) {
cJSON_Delete(fetch);
return;
} }
if (wsclient_print_data((int)rows->valueint, fields, cols, ws_id, precision, &showed_rows)) { } else {
cJSON_Delete(fetch); int cols = 0;
return; int precision = 0;
int64_t total_rows = 0;
int showed_rows = 0;
TAOS_FIELD *fields = wsclient_print_header(query, &cols, &precision);
if (fields != NULL) {
cJSON *id = cJSON_GetObjectItem(query, "id");
if (cJSON_IsNumber(id)) {
bool completed = false;
while (!completed) {
if (wsclient_send_sql(NULL, WS_FETCH, (int)id->valueint) == 0) {
cJSON *fetch = wsclient_parse_response();
if (fetch != NULL) {
if (wsclient_check(fetch, st, et) == 0) {
cJSON *_completed = cJSON_GetObjectItem(fetch, "completed");
if (cJSON_IsBool(_completed)) {
if (_completed->valueint) {
completed = true;
continue;
}
cJSON *rows = cJSON_GetObjectItem(fetch, "rows");
if (cJSON_IsNumber(rows)) {
total_rows += rows->valueint;
cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths");
if (cJSON_IsArray(lengths)) {
for (int i = 0; i < cols; i++) {
fields[i].bytes = (int16_t)(cJSON_GetArrayItem(lengths, i)->valueint);
}
if (showed_rows < DEFAULT_RES_SHOW_NUM) {
if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, (int)id->valueint) == 0) {
wsclient_print_data((int)rows->valueint, fields, cols, id->valueint, precision, &showed_rows);
}
}
continue;
} else {
fprintf(stderr, "Invalid lengths key in json\n");
}
} else {
fprintf(stderr, "Invalid rows key in json\n");
}
} else {
fprintf(stderr, "Invalid completed key in json\n");
}
}
}
}
fprintf(stderr, "err occured in fetch/fetch_block ws actions\n");
break;
}
if (showed_rows == DEFAULT_RES_SHOW_NUM) {
printf("\n");
printf(" Notice: The result shows only the first %d rows.\n", DEFAULT_RES_SHOW_NUM);
printf("\n");
}
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
} else {
fprintf(stderr, "Invalid id key in json\n");
}
free(fields);
} }
cJSON_Delete(fetch);
continue;
} }
}
et = taosGetTimestampUs();
if (stop_fetch) {
printf("Query interrupted, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6);
stop_fetch = false;
} else { } else {
printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); fprintf(stderr, "Invalid is_update key in json\n");
} }
cJSON_Delete(query);
return;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册