From 8af6479858ab8b9b3a11513d55cd42407b01550d Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Mon, 18 Jul 2022 20:49:43 +0800 Subject: [PATCH] fix: use taosws lib for websocket interact in taos shell (#14802) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: update taos-tools ” * chore: add taosws-rs as plugin * chore: update taosws-rs * chore: add taosws install/remove in scripts * chore: update taosws-rs * chore: update taosws-rs * fix: _smartly_ build ws lib * chore: enhance CMakeLists.txt * fix: packaging/tools/make_install.sh * chore: update taos-tools * fix: use rust websock lib * feat: add vertical and file dump for shell * test: change test case * test: change test cases * fix: remove test case and add macro * fix: change print message * feat: add timeout and hint for error response * fix: small typo * fix: windows compile * fix: typo * fix: reconnect * fix: do not parse cloud dsn * fix: update taosws-rs * fix: win and mac compile Co-authored-by: Shuduo Sang --- src/kit/shell/inc/shell.h | 53 +- src/kit/shell/src/shellDarwin.c | 53 +- src/kit/shell/src/shellEngine.c | 849 +++++++--------------- src/kit/shell/src/shellLinux.c | 52 +- src/kit/shell/src/shellMain.c | 37 +- src/kit/shell/src/shellWindows.c | 88 +-- src/plugins/taosws-rs | 2 +- tests/develop-test/0-others/taos_shell.py | 18 +- tests/develop-test/fulltest-others.sh | 3 +- tests/parallel_test/cases.task | 3 +- 10 files changed, 361 insertions(+), 797 deletions(-) diff --git a/src/kit/shell/inc/shell.h b/src/kit/shell/inc/shell.h index c5405e52bd..aa5f7f9494 100644 --- a/src/kit/shell/inc/shell.h +++ b/src/kit/shell/inc/shell.h @@ -25,6 +25,9 @@ #include "taos.h" #include "taosdef.h" #include "tsclient.h" +#ifdef WEBSOCKET +#include "taosws.h" +#endif #define MAX_USERNAME_SIZE 64 #define MAX_DBNAME_SIZE 64 @@ -41,18 +44,6 @@ typedef struct SShellHistory { int hend; } SShellHistory; -typedef enum enumWebSocketFrameType { - TEXT_FRAME = 0x81, - PING_FRAME = 0x19, - PONG_FRAME = 0x8A, -} WebSocketFrameType; - -typedef struct SWSParser { - int offset; - int payload_length; - WebSocketFrameType frame; -} SWSParser; - typedef struct SShellArguments { char* host; char* password; @@ -61,11 +52,6 @@ typedef struct SShellArguments { char* database; char* timezone; bool restful; -#ifdef WINDOWS - SOCKET socket; -#else - int socket; -#endif TAOS* con; bool is_raw_time; bool is_use_passwd; @@ -81,21 +67,14 @@ typedef struct SShellArguments { int pktNum; char* pktType; char* netTestRole; - char* cloudDsn; - bool cloud; - char* cloudHost; - char* cloudPort; - char* cloudToken; + char* dsn; +#ifdef WEBSOCKET + WS_TAOS* ws_conn; +#endif + bool cloud; + uint32_t timeout; } SShellArguments; -typedef enum WS_ACTION_TYPE_S { - WS_CONN, - WS_QUERY, - WS_FETCH, - WS_FETCH_BLOCK, - WS_CLOSE, -} WS_ACTION_TYPE; - /**************** Function declarations ****************/ extern void shellParseArgument(int argc, char* argv[], SShellArguments* arguments); extern void shellInit(SShellArguments* args); @@ -105,6 +84,10 @@ extern int regex_match(const char* s, const char* reg, int cflags); int32_t shellReadCommand(TAOS* con, char command[]); int32_t shellRunCommand(TAOS* con, char* command); void shellRunCommandOnServer(TAOS* con, char command[]); +#ifdef WEBSOCKET +void shellRunCommandOnWebsocket(char command[]); +#endif +void printField(const char* val, TAOS_FIELD* field, int width, int32_t length, int precision); void read_history(); void write_history(); void source_file(TAOS* con, char* fptr); @@ -114,15 +97,12 @@ void get_history_path(char* history); void shellCheck(TAOS* con, SShellArguments* args); void cleanup_handler(void* arg); void exitShell(); +#ifdef WEBSOCKET +int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical); +#endif int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode); void shellGetGrantInfo(void* con); int isCommentLine(char* line); -int wsclient_handshake(); -int wsclient_conn(); -void wsclient_query(char* command); -int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id); -int tcpConnect(char* host, int port); -int parse_cloud_dsn(); /**************** Global variable declarations ****************/ extern char PROMPT_HEADER[]; @@ -135,7 +115,6 @@ extern int get_old_terminal_mode(struct termios* tio); extern void reset_terminal_mode(); extern SShellArguments args; extern int64_t result; -extern int64_t ws_id; extern bool stop_fetch; #endif diff --git a/src/kit/shell/src/shellDarwin.c b/src/kit/shell/src/shellDarwin.c index 7803113a0f..eb7809ebe8 100644 --- a/src/kit/shell/src/shellDarwin.c +++ b/src/kit/shell/src/shellDarwin.c @@ -64,6 +64,8 @@ void printHelp() { printf("%s%s%s\n", indent, indent, "Connect and interact with TDengine use restful."); printf("%s%s\n", indent, "-E"); printf("%s%s%s\n", indent, indent, "The DSN to use when connecting TDengine's cloud services."); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s\n", indent, indent, "The timeout in seconds for websocket interact."); exit(EXIT_SUCCESS); } @@ -204,12 +206,21 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { else if (strcmp(argv[i], "-E") == 0) { if (i < argc - 1) { - arguments->cloudDsn = argv[++i]; + arguments->dsn = argv[++i]; } else { fprintf(stderr, "options -E requires an argument\n"); exit(EXIT_FAILURE); } } + + else if (strcmp(argv[i], "-t") == 0) { + if (i < argc -1) { + arguments->timeout = atoi(argv[++i]); + } else { + fprintf(stderr, "options -t requires an argument\n"); + exit(EXIT_FAILURE); + } + } // For temperory command TODO else if (strcmp(argv[i], "--help") == 0) { @@ -221,10 +232,10 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { exit(EXIT_FAILURE); } } - if (args.cloudDsn == NULL) { + if (args.dsn == NULL) { if (args.cloud) { - args.cloudDsn = getenv("TDENGINE_CLOUD_DSN"); - if (args.cloudDsn == NULL) { + args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (args.dsn == NULL) { args.cloud = false; } } @@ -578,37 +589,3 @@ void exitShell() { tcsetattr(0, TCSANOW, &oldtio); exit(EXIT_SUCCESS); } - -int tcpConnect(char* host, int port) { - struct sockaddr_in serv_addr; - if (port == 0) { - port = 6041; - args.port = 6041; - } - if (NULL == host) { - host = "localhost"; - args.host = "localhost"; - } - - struct hostent *server = gethostbyname(host); - if ((server == NULL) || (server->h_addr == NULL)) { - fprintf(stderr, "no such host: %s\n", host); - return -1; - } - memset(&serv_addr, 0, sizeof(struct sockaddr_in)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(port); - memcpy(&(serv_addr.sin_addr.s_addr), server->h_addr, server->h_length); - args.socket = socket(AF_INET, SOCK_STREAM, 0); - if (args.socket < 0) { - fprintf(stderr, "failed to create socket\n"); - return -1; - } - int retConn = connect(args.socket, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr)); - if (retConn < 0) { - fprintf(stderr, "failed to connect\n"); - close(args.socket); - return -1; - } - return 0; -} \ No newline at end of file diff --git a/src/kit/shell/src/shellEngine.c b/src/kit/shell/src/shellEngine.c index a9febf4c32..939ef05069 100644 --- a/src/kit/shell/src/shellEngine.c +++ b/src/kit/shell/src/shellEngine.c @@ -71,15 +71,21 @@ void shellInit(SShellArguments *_args) { if (_args->user == NULL) { _args->user = TSDB_DEFAULT_USER; } +#ifdef WEBSOCKET + if (_args->dsn) { - if (_args->restful || _args->cloud) { - if (wsclient_handshake()) { + args.ws_conn = ws_connect_with_dsn(args.dsn); + if (args.ws_conn == NULL) { + fprintf(stderr, "failed to connect %s, reason: %s\n", args.dsn, ws_errstr(NULL)); exit(EXIT_FAILURE); } - if (wsclient_conn()) { - exit(EXIT_FAILURE); + if (_args->restful) { + fprintf(stdout, "successfully connect to %s\n\n", args.dsn); + } else { + fprintf(stdout, "successfully connect to cloud service\n\n"); } } else { +#endif // set options before initializing if (_args->timezone != NULL) { taos_options(TSDB_OPTION_TIMEZONE, _args->timezone); @@ -102,7 +108,9 @@ void shellInit(SShellArguments *_args) { fflush(stdout); exit(EXIT_FAILURE); } +#ifdef WEBSOCKET } +#endif /* Read history TODO : release resources here*/ read_history(); @@ -158,11 +166,15 @@ static int32_t shellRunSingleCommand(TAOS *con, char *command) { // Analyse the command. if (regex_match(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) { +#ifdef WEBSOCKET if (args.restful || args.cloud) { - close(args.socket); + ws_close(args.ws_conn); } else { +#endif taos_close(con); +#ifdef WEBSOCKET } +#endif write_history(); #ifdef WINDOWS exit(EXIT_SUCCESS); @@ -197,8 +209,15 @@ static int32_t shellRunSingleCommand(TAOS *con, char *command) { source_file(con, c_ptr); return 0; } - - shellRunCommandOnServer(con, command); +#ifdef WEBSOCKET + if (args.cloud || args.restful) { + shellRunCommandOnWebsocket(command); + } else { +#endif + shellRunCommandOnServer(con, command); +#ifdef WEBSOCKET + } +#endif return 0; } @@ -256,8 +275,8 @@ void freeResultWithRid(int64_t rid) { taosReleaseRef(tscObjRef, rid); } } - -void shellRunCommandOnServer(TAOS *con, char command[]) { +#ifdef WEBSOCKET +void shellRunCommandOnWebsocket(char command[]) { int64_t st, et; wordexp_t full_path; char * sptr = NULL; @@ -289,11 +308,101 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { printMode = true; // When output to a file, the switch does not work. } - if (args.restful || args.cloud) { - wsclient_query(command); + if (args.ws_conn == NULL) { + args.ws_conn = ws_connect_with_dsn(args.dsn); + if (args.ws_conn == NULL) { + if (args.cloud) { + fprintf(stderr, "failed to connect cloud service, reason: %s\n", ws_errstr(NULL)); + } else { + fprintf(stderr, "failed to connect %s, reason: %s\n", args.host, ws_errstr(NULL)); + } + return; + } + } + + st = taosGetTimestampUs(); + + WS_RES* res = ws_query_timeout(args.ws_conn, command, args.timeout); + int code = ws_errno(res); + if (code != 0) { + et = taosGetTimestampUs(); + fprintf(stderr, "\nDB error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6); + if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) { + fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n"); + } else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) { + fprintf(stderr, "TDengine server is down, will try to reconnect\n"); + args.ws_conn = NULL; + } + ws_free_result(res); + return; + } + + if (regex_match(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { + fprintf(stdout, "Database changed.\n\n"); + fflush(stdout); return; } + int numOfRows = 0; + if (ws_is_update_query(res)) { + numOfRows = ws_affected_rows(res); + et = taosGetTimestampUs(); + printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, (et - st)/1E6); + } else { + int error_no = 0; + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + if (numOfRows < 0) { + ws_free_result(res); + return; + } + et = taosGetTimestampUs(); + if (error_no == 0) { + printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, (et - st)/1E6); + } else { + printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, (et - st) / 1E6); + } + } + printf("\n"); + + if (fname != NULL) { + wordfree(&full_path); + } + ws_free_result(res); +} +#endif + +void shellRunCommandOnServer(TAOS *con, char command[]) { + int64_t st, et; + wordexp_t full_path; + char * sptr = NULL; + char * cptr = NULL; + char * fname = NULL; + bool printMode = false; + + if ((sptr = tstrstr(command, ">>", true)) != NULL) { + cptr = tstrstr(command, ";", true); + if (cptr != NULL) { + *cptr = '\0'; + } + + if (wordexp(sptr + 2, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: invalid filename: %s\n", sptr + 2); + return; + } + *sptr = '\0'; + fname = full_path.we_wordv[0]; + } + + if ((sptr = tstrstr(command, "\\G", true)) != NULL) { + cptr = tstrstr(command, ";", true); + if (cptr != NULL) { + *cptr = '\0'; + } + + *sptr = '\0'; + printMode = true; // When output to a file, the switch does not work. + } + st = taosGetTimestampUs(); TAOS_RES* pSql = taos_query_h(con, command, &result); @@ -533,6 +642,60 @@ static void dumpFieldToFile(FILE* fp, const char* val, TAOS_FIELD* field, int32_ break; } } +#ifdef WEBSOCKET +static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { + wordexp_t full_path; + + if (wordexp((char *)fname, &full_path, 0) != 0) { + fprintf(stderr, "ERROR: invalid file name: %s\n", fname); + return -1; + } + + FILE* fp = fopen(full_path.we_wordv[0], "w"); + if (fp == NULL) { + fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]); + wordfree(&full_path); + return -1; + } + + wordfree(&full_path); + int numOfRows = 0; + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields_v2(wres); + int num_fields = ws_field_count(wres); + int precision = ws_result_precision(wres); + for (int col = 0; col < num_fields; col++) { + if (col > 0) { + fprintf(fp, ","); + } + fprintf(fp, "%s", fields[col].name); + } + fputc('\n', fp); + stop_fetch = false; + while (!stop_fetch) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (rows == 0) { + break; + } + uint8_t ty; + uint32_t len; + numOfRows += rows; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + if (j > 0) { + fputc(',', fp); + } + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + dumpFieldToFile(fp, (const char*)value, fields + j, len, precision); + } + fputc('\n', fp); + } + } + fclose(fp); + return numOfRows; +} +#endif static int dumpResultToFile(const char* fname, TAOS_RES* tres) { TAOS_ROW row = taos_fetch_row(tres); @@ -658,7 +821,7 @@ static void shellPrintNChar(const char *str, int length, int width) { } -static void printField(const char* val, TAOS_FIELD* field, int width, int32_t length, int precision) { +void printField(const char* val, TAOS_FIELD* field, int width, int32_t length, int precision) { if (val == NULL) { int w = width; if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_NCHAR || field->type == TSDB_DATA_TYPE_TIMESTAMP) { @@ -737,6 +900,46 @@ bool isSelectQuery(TAOS_RES* tres) { return false; } +#ifdef WEBSOCKET +static int verticalPrintWebsocket(WS_RES* wres) { + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields_v2(wres); + int precision = ws_result_precision(wres); + + int maxColNameLen = 0; + for (int col = 0; col < num_fields; col++) { + int len = (int)strlen(fields[col].name); + if (len > maxColNameLen) { + maxColNameLen = len; + } + } + int numOfRows = 0; + stop_fetch = false; + while (!stop_fetch) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (rows == 0) { + break; + } + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + printf("*************************** %d.row ***************************\n", numOfRows + 1); + for (int j = 0; j < num_fields; j++) { + TAOS_FIELD* field = fields + j; + int padding = (int)(maxColNameLen - strlen(field->name)); + printf("%*.s%s: ", padding, " ", field->name); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + printField((const char*)value, field, 0, len, precision); + putchar('\n'); + } + numOfRows++; + } + } + return numOfRows; +} +#endif static int verticalPrintResult(TAOS_RES* tres) { TAOS_ROW row = taos_fetch_row(tres); @@ -879,6 +1082,45 @@ static void printHeader(TAOS_FIELD* fields, int* width, int num_fields) { putchar('\n'); } +#ifdef WEBSOCKET +static int horizontalPrintWebsocket(WS_RES* wres) { + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields_v2(wres); + int precision = ws_result_precision(wres); + + int width[TSDB_MAX_COLUMNS]; + for (int col = 0; col < num_fields; col++) { + width[col] = calcColWidth(fields + col, precision); + } + + printHeader(fields, width, num_fields); + + int numOfRows = 0; + stop_fetch = false; + while (!stop_fetch) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (rows == 0) { + break; + } + numOfRows += rows; + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + putchar(' '); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + printField((const char*)value, fields+j, width[j], len, precision); + putchar(' '); + putchar('|'); + } + putchar('\n'); + } + } + return numOfRows; +} +#endif static int horizontalPrintResult(TAOS_RES* tres) { TAOS_ROW row = taos_fetch_row(tres); @@ -934,6 +1176,24 @@ static int horizontalPrintResult(TAOS_RES* tres) { return numOfRows; } +#ifdef WEBSOCKET +int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { + int numOfRows = 0; + if (fname != NULL) { + numOfRows = dumpWebsocketToFile(fname, wres); + } else if (vertical) { + numOfRows = verticalPrintWebsocket(wres); + } else { + numOfRows = horizontalPrintWebsocket(wres); + } + if (stop_fetch) { + *error_no = -1; + } else { + *error_no = ws_errno(wres); + } + return numOfRows; +} +#endif int shellDumpResult(TAOS_RES *tres, char *fname, int *error_no, bool vertical) { int numOfRows = 0; @@ -1087,568 +1347,3 @@ void source_file(TAOS *con, char *fptr) { void shellGetGrantInfo(void *con) { return; } - -void _base64_encode_triple(unsigned char triple[3], char res[4]) { - int tripleValue, i; - - tripleValue = triple[0]; - tripleValue *= 256; - tripleValue += triple[1]; - tripleValue *= 256; - tripleValue += triple[2]; - - for (i = 0; i < 4; i++) { - res[3 - i] = BASE64_CHARS[tripleValue % 64]; - tripleValue /= 64; - } -} - -int taos_base64_encode(unsigned char *source, size_t sourcelen, char *target, size_t targetlen) { - /* check if the result will fit in the target buffer */ - if ((sourcelen + 2) / 3 * 4 > targetlen - 1) return 0; - - /* encode all full triples */ - while (sourcelen >= 3) { - _base64_encode_triple(source, target); - sourcelen -= 3; - source += 3; - target += 4; - } - - /* encode the last one or two characters */ - if (sourcelen > 0) { - unsigned char temp[3]; - memset(temp, 0, sizeof(temp)); - memcpy(temp, source, sourcelen); - _base64_encode_triple(temp, target); - target[3] = '='; - if (sourcelen == 1) target[2] = '='; - - target += 4; - } - - /* terminate the string */ - target[0] = 0; - - return 1; -} - -int parse_cloud_dsn() { - if (args.cloudDsn == NULL) { - fprintf(stderr, "Cannot read cloud service info\n"); - return -1; - } else { - char *start = strstr(args.cloudDsn, "http://"); - if (start != NULL) { - args.cloudHost = start + strlen("http://"); - } else { - start = strstr(args.cloudDsn, "https://"); - if (start != NULL) { - args.cloudHost = start + strlen("https://"); - } else { - args.cloudHost = args.cloudDsn; - } - } - char *port = strstr(args.cloudHost, ":"); - if (port == NULL) { - fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn); - return -1; - } - char *token = strstr(port + strlen(":"), "?token="); - if ((token == NULL) || - (strlen(token + strlen("?token=")) == 0)) { - fprintf(stderr, "Invalid format in TDengine cloud dsn: %s\n", args.cloudDsn); - return -1; - } - port[0] = '\0'; - args.cloudPort = port + strlen(":"); - token[0] = '\0'; - args.cloudToken = token + strlen("?token="); - } - return 0; -} - -int wsclient_handshake() { - char request_header[1024]; - char recv_buf[1024]; - unsigned char key_nonce[16]; - char websocket_key[256]; - memset(request_header, 0, 1024); - memset(recv_buf, 0, 1024); - srand(time(NULL)); - int i; - for (i = 0; i < 16; i++) { - key_nonce[i] = rand() & 0xff; - } - taos_base64_encode(key_nonce, 16, websocket_key, 256); - if (args.cloud) { - snprintf(request_header, 1024, - "GET /rest/ws?token=%s HTTP/1.1\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nHost: " - "%s:%s\r\nSec-WebSocket-Key: " - "%s\r\nSec-WebSocket-Version: 13\r\n\r\n", - args.cloudToken, args.cloudHost, args.cloudPort, websocket_key); - } else { - snprintf(request_header, 1024, - "GET /rest/ws HTTP/1.1\r\nUpgrade: websocket\r\nConnection: Upgrade\r\nHost: %s:%d\r\nSec-WebSocket-Key: " - "%s\r\nSec-WebSocket-Version: 13\r\n\r\n", - args.host, args.port, websocket_key); - } - - ssize_t n = send(args.socket, request_header, strlen(request_header), 0); - if (n <= 0) { -#ifdef WINDOWS - fprintf(stderr, "send failed with error: %d\n", WSAGetLastError()); -#else - fprintf(stderr, "web socket handshake error\n"); -#endif - return -1; - } - n = recv(args.socket, recv_buf, 1023, 0); - if (NULL == strstr(recv_buf, "HTTP/1.1 101")) { - fprintf(stderr, "web socket handshake failed: %s\n", recv_buf); - } - return 0; -} - -int wsclient_send(char *strdata, WebSocketFrameType frame) { - struct timeval tv; - unsigned char mask[4]; - unsigned int mask_int; - unsigned long long payload_len; - unsigned int payload_len_small; - unsigned int payload_offset = 6; - unsigned int len_size; - // unsigned long long be_payload_len; - unsigned int sent = 0; - int i; - unsigned int frame_size; - char *data; - gettimeofday(&tv, NULL); - srand(tv.tv_usec * tv.tv_sec); - mask_int = rand(); - memcpy(mask, &mask_int, 4); - payload_len = strlen(strdata); - if (payload_len <= 125) { - frame_size = 6 + payload_len; - payload_len_small = payload_len; - } else if (payload_len > 125 && payload_len <= 0xffff) { - frame_size = 8 + payload_len; - payload_len_small = 126; - payload_offset += 2; - } else if (payload_len > 0xffff && payload_len <= 0xffffffffffffffffLL) { - frame_size = 14 + payload_len; - payload_len_small = 127; - payload_offset += 8; - } else { - fprintf(stderr, "websocket send too large data\n"); - return -1; - } - data = (char *)malloc(frame_size); - memset(data, 0, frame_size); - *data = frame; - *(data + 1) = payload_len_small | 0x80; - if (payload_len_small == 126) { - payload_len &= 0xffff; - len_size = 2; - for (i = 0; i < len_size; i++) { - *(data + 2 + i) = *((char *)&payload_len + (len_size - i - 1)); - } - } - if (payload_len_small == 127) { - payload_len &= 0xffffffffffffffffLL; - len_size = 8; - for (i = 0; i < len_size; i++) { - *(data + 2 + i) = *((char *)&payload_len + (len_size - i - 1)); - } - } - for (i = 0; i < 4; i++) *(data + (payload_offset - 4) + i) = mask[i]; - - memcpy(data + payload_offset, strdata, strlen(strdata)); - for (i = 0; i < strlen(strdata); i++) *(data + payload_offset + i) ^= mask[i % 4] & 0xff; - sent = 0; - i = 0; - while (sent < frame_size && i >= 0) { - i = send(args.socket, data + sent, frame_size - sent, 0); - sent += i; - } - if (i < 0) { - fprintf(stderr, "websocket send data error, please check the server\n"); - free(data); - return -1; - } - free(data); - return 0; -} - -int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id) { - int code = 1; - cJSON *json = cJSON_CreateObject(); - cJSON *_args = cJSON_CreateObject(); - cJSON_AddNumberToObject(_args, "req_id", 1); - switch (type) { - case WS_CONN: - cJSON_AddStringToObject(json, "action", "conn"); - cJSON_AddStringToObject(_args, "user", args.user); - cJSON_AddStringToObject(_args, "password", args.password); - cJSON_AddStringToObject(_args, "db", args.database); - - break; - case WS_QUERY: - cJSON_AddStringToObject(json, "action", "query"); - cJSON_AddStringToObject(_args, "sql", command); - break; - case WS_FETCH: - cJSON_AddStringToObject(json, "action", "fetch"); - cJSON_AddNumberToObject(_args, "id", id); - break; - case WS_FETCH_BLOCK: - cJSON_AddStringToObject(json, "action", "fetch_block"); - cJSON_AddNumberToObject(_args, "id", id); - break; - case WS_CLOSE: - cJSON_AddStringToObject(json, "action", "close"); - cJSON_AddNumberToObject(_args, "id", id); - break; - } - cJSON_AddItemToObject(json, "args", _args); - char *strdata = NULL; - strdata = cJSON_Print(json); - if (wsclient_send(strdata, TEXT_FRAME)) { - goto OVER; - } - code = 0; -OVER: - free(strdata); - cJSON_Delete(json); - return code; -} - -int wsclient_conn() { - if (wsclient_send_sql(NULL, WS_CONN, 0)) { - return -1; - } - char recv_buffer[1024]; - memset(recv_buffer, 0, 1024); - int bytes = recv(args.socket, recv_buffer, 1023, 0); - if (bytes <= 0) { - fprintf(stderr, "failed to receive from socket\n"); - return -1; - } - - char *received_json = strstr(recv_buffer, "{"); - cJSON *root = cJSON_Parse(received_json); - if (root == NULL) { - fprintf(stderr, "fail to parse response into json: %s\n", recv_buffer); - return -1; - } - - cJSON *code = cJSON_GetObjectItem(root, "code"); - if (!cJSON_IsNumber(code)) { - fprintf(stderr, "wrong code key in json: %s\n", received_json); - cJSON_Delete(root); - return -1; - } - if (code->valueint == 0) { - cJSON_Delete(root); - if (args.cloud) { - fprintf(stdout, "Successfully connect to %s:%s in restful mode\n\n", args.cloudHost, args.cloudPort); - } else { - fprintf(stdout, "Successfully connect to %s:%d in restful mode\n\n", args.host, args.port); - } - return 0; - } else { - cJSON *message = cJSON_GetObjectItem(root, "message"); - if (!cJSON_IsString(message)) { - fprintf(stderr, "wrong message key in json: %s\n", received_json); - cJSON_Delete(root); - return -1; - } - fprintf(stderr, "failed to connection, reason: %s\n", message->valuestring); - } - cJSON_Delete(root); - return -1; -} - -void wsclient_parse_frame(SWSParser * parser, uint8_t * recv_buffer) { - unsigned char msg_opcode = recv_buffer[0] & 0x0F; - unsigned char msg_masked = (recv_buffer[1] >> 7) & 0x01; - int payload_length = 0; - int pos = 2; - int length_field = recv_buffer[1] &(~0x80); - unsigned int mask = 0; - if (length_field <= 125) { - payload_length = length_field; - } else if (length_field == 126) { - payload_length = recv_buffer[2]; - for (int i = 0; i < 1; i++) { - payload_length = (payload_length << 8) + recv_buffer[3 + i]; - } - pos += 2; - } else if (length_field == 127) { - payload_length = recv_buffer[2]; - for (int i = 0; i < 7; i++) { - payload_length = (payload_length << 8) + recv_buffer[3 + i]; - } - pos += 8; - } - if (msg_masked) { - mask = *((unsigned int *) (recv_buffer + pos)); - pos += 4; - const uint8_t *c = recv_buffer + pos; - for (int i = 0; i < payload_length; i++) { - recv_buffer[i] = c[i] ^ ((unsigned char *) (&mask))[i % 4]; - } - } - if (msg_opcode == 0x9) { - parser->frame = PING_FRAME; - } - parser->offset = pos; - parser->payload_length = payload_length; -} - -char *wsclient_get_response() { - uint8_t recv_buffer[TEMP_RECV_BUF]= {0}; - int received = 0; - SWSParser parser; - int bytes = recv(args.socket, recv_buffer + received, TEMP_RECV_BUF - 1, 0); - if (bytes <= 0) { - fprintf(stderr, "websocket recv failed with bytes: %d\n", bytes); - return NULL; - } - wsclient_parse_frame(&parser, recv_buffer); - if (parser.frame == PING_FRAME) { - if (wsclient_send("pong", PONG_FRAME)) { - return NULL; - } - return wsclient_get_response(); - } - char* response = calloc(1, parser.payload_length + 1); - int pos = bytes - parser.offset; - memcpy(response, recv_buffer + parser.offset, pos); - while (pos < parser.payload_length) { - bytes = recv(args.socket, response + pos, parser.payload_length - pos, 0); - pos += bytes; - } - response[pos] = '\0'; - return response; -} - -int wsclient_fetch_fields(cJSON *query, TAOS_FIELD * fields, int cols) { - cJSON *fields_names = cJSON_GetObjectItem(query, "fields_names"); - cJSON *fields_types = cJSON_GetObjectItem(query, "fields_types"); - cJSON *fields_lengths = cJSON_GetObjectItem(query, "fields_lengths"); - if (!cJSON_IsArray(fields_names) || !cJSON_IsArray(fields_types) || !cJSON_IsArray(fields_lengths)) { - fprintf(stderr, "Invalid or miss 'fields_names'/'fields_types'/'fields_lengths' key in response\n"); - return -1; - } - for (int i = 0; i < cols; i++) { - cJSON* field_name = cJSON_GetArrayItem(fields_names, i); - cJSON* field_type = cJSON_GetArrayItem(fields_types, i); - cJSON* field_length = cJSON_GetArrayItem(fields_lengths, i); - if (!cJSON_IsString(field_name) || !cJSON_IsNumber(field_type) || !cJSON_IsNumber(field_length)) { - fprintf(stderr, "Invalid or miss 'field_name'/'field_type'/'field_length' in query response"); - return -1; - } - strncpy(fields[i].name, field_name->valuestring, 65); - fields[i].type = (uint8_t)field_type->valueint; - fields[i].bytes = (int16_t)field_length->valueint; - } - return 0; -} - -int wsclient_check(cJSON *root, int64_t st, int64_t et) { - cJSON *code = cJSON_GetObjectItem(root, "code"); - cJSON *message = cJSON_GetObjectItem(root, "message"); - if (!cJSON_IsNumber(code) || !cJSON_IsString(message)) { - fprintf(stderr, "Invalid or miss 'code'/'message' in response\n"); - return -1; - } - if (code->valueint != 0) { - fprintf(stderr, "\nDB error: %s (%.6fs)\n", message->valuestring, (et - st) / 1E6); - return -1; - } - return 0; -} - -int wsclient_print_data(int rows, TAOS_FIELD *fields, int cols, int64_t id, int precision, int* pshowed_rows) { - char* response = wsclient_get_response(); - if (response == NULL) { - return -1; - } - - if (*(int64_t *)response != id) { - fprintf(stderr, "Mismatch id with %"PRId64" expect %"PRId64"\n", *(int64_t *)response, id); - free(response); - return -1; - } - int pos; - int width[TSDB_MAX_COLUMNS]; - for (int c = 0; c < cols; c++) { - width[c] = calcColWidth(fields + c, precision); - } - for (int i = 0; i < rows; i++) { - if (*pshowed_rows == DEFAULT_RES_SHOW_NUM) { - printf("\n"); - printf(" Notice: The result shows only the first %d rows.\n", DEFAULT_RES_SHOW_NUM); - printf("\n"); - printf(" You can use Ctrl+C to stop the underway fetching.\n"); - printf("\n"); - free(response); - return 0; - } - for (int c = 0; c < cols; c++) { - pos = 8; - pos += i * fields[c].bytes; - for (int j = 0; j < c; j++) { - pos += fields[j].bytes * rows; - } - putchar(' '); - int16_t length = 0; - if (fields[c].type == TSDB_DATA_TYPE_NCHAR || fields[c].type == TSDB_DATA_TYPE_BINARY || - fields[c].type == TSDB_DATA_TYPE_JSON) { - length = *(int16_t *)(response + pos); - pos += 2; - } - printField((const char *)(response + pos), fields + c, width[c], (int32_t)length, precision); - putchar(' '); - putchar('|'); - } - putchar('\n'); - *pshowed_rows += 1; - } - free(response); - return 0; -} - -void wsclient_query(char *command) { - int64_t st, et; - st = taosGetTimestampUs(); - if (wsclient_send_sql(command, WS_QUERY, 0)) { - return; - } - char *query_buffer = wsclient_get_response(); - if (query_buffer == NULL) { - return; - } - cJSON* query = cJSON_Parse(query_buffer); - if (query == NULL) { - fprintf(stderr, "Failed to parse response into json: %s\n", query_buffer); - free(query_buffer); - return; - } - free(query_buffer); - et = taosGetTimestampUs(); - if (wsclient_check(query, st, et)) { - cJSON_Delete(query); - return; - } - cJSON *is_update = cJSON_GetObjectItem(query, "is_update"); - cJSON *fields_count = cJSON_GetObjectItem(query, "fields_count"); - cJSON *precisionObj = cJSON_GetObjectItem(query, "precision"); - cJSON *id = cJSON_GetObjectItem(query, "id"); - if (!cJSON_IsBool(is_update) || - !cJSON_IsNumber(fields_count) || - !cJSON_IsNumber(precisionObj) || - !cJSON_IsNumber(id)) { - fprintf(stderr, "Invalid or miss 'is_update'/'fields_count'/'precision'/'id' in query response\n"); - cJSON_Delete(query); - return; - } - if (is_update->valueint) { - cJSON *affected_rows = cJSON_GetObjectItem(query, "affected_rows"); - if (cJSON_IsNumber(affected_rows)) { - et = taosGetTimestampUs(); - printf("Update OK, %d row(s) in set (%.6fs)\n\n", (int)affected_rows->valueint, (et - st) / 1E6); - } else { - fprintf(stderr, "Invalid or miss 'affected_rows' key in response\n"); - } - cJSON_Delete(query); - return; - } - ws_id = id->valueint; - int cols = (int)fields_count->valueint; - int precision = (int)precisionObj->valueint; - int64_t total_rows = 0; - int showed_rows = 0; - bool completed = false; - TAOS_FIELD fields[TSDB_MAX_COLUMNS]; - if (wsclient_fetch_fields(query, fields, cols)) { - cJSON_Delete(query); - return; - } - int width[TSDB_MAX_COLUMNS]; - for (int i = 0; i < cols; ++i) { - width[i] = calcColWidth(fields + i, precision); - } - printHeader(fields, width, cols); - - cJSON_Delete(query); - - while (!completed && !stop_fetch) { - if (wsclient_send_sql(NULL, WS_FETCH, ws_id)) { - return; - } - char *fetch_buffer = wsclient_get_response(); - if (fetch_buffer == NULL) { - return; - } - cJSON *fetch = cJSON_Parse(fetch_buffer); - if (fetch == NULL) { - fprintf(stderr, "failed to parse response into json: %s\n", fetch_buffer); - free(fetch_buffer); - return; - } - free(fetch_buffer); - if (wsclient_check(fetch, st, et)) { - cJSON_Delete(fetch); - return; - } - cJSON *completedObj = cJSON_GetObjectItem(fetch, "completed"); - cJSON *rows = cJSON_GetObjectItem(fetch, "rows"); - cJSON *lengths = cJSON_GetObjectItem(fetch, "lengths"); - if (!cJSON_IsBool(completedObj) || !cJSON_IsNumber(rows)) { - fprintf(stderr, "Invalid or miss 'completed'/'rows' in fetch response\n"); - cJSON_Delete(fetch); - return; - } - if (completedObj->valueint) { - cJSON_Delete(fetch); - completed = true; - continue; - } - total_rows += rows->valueint; - if (!cJSON_IsArray(lengths)) { - fprintf(stderr, "Invalid or miss 'lengths' in fetch response\n"); - cJSON_Delete(fetch); - return; - } - for (int i = 0; i < cols; i++) { - cJSON* length = cJSON_GetArrayItem(lengths, i); - if (!cJSON_IsNumber(length)) { - fprintf(stderr, "Invalid or miss 'lengths' key in fetch response\n"); - cJSON_Delete(fetch); - return; - } - fields[i].bytes = (int16_t)(length->valueint); - } - if (showed_rows < DEFAULT_RES_SHOW_NUM) { - if (wsclient_send_sql(NULL, WS_FETCH_BLOCK, ws_id)) { - cJSON_Delete(fetch); - return; - } - if (wsclient_print_data((int)rows->valueint, fields, cols, ws_id, precision, &showed_rows)) { - cJSON_Delete(fetch); - return; - } - cJSON_Delete(fetch); - continue; - } - } - et = taosGetTimestampUs(); - if (stop_fetch) { - printf("Query interrupted, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); - stop_fetch = false; - } else { - printf("Query OK, %" PRId64 " row(s) in set (%.6fs)\n\n", total_rows, (et - st) / 1E6); - } -} diff --git a/src/kit/shell/src/shellLinux.c b/src/kit/shell/src/shellLinux.c index 863da2e1a7..7a444a18c8 100644 --- a/src/kit/shell/src/shellLinux.c +++ b/src/kit/shell/src/shellLinux.c @@ -53,7 +53,8 @@ static struct argp_option options[] = { {"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."}, {"pkttype", 'S', "PKTTYPE", 0, "Choose packet type used for net test, default is TCP. Only speed test could be either TCP or UDP."}, {"restful", 'R', 0, 0, "Connect and interact with TDengine use restful."}, - {0, 'E', "DSN", 0, "The DSN to use when connecting TDengine's cloud services."}, + {"cloudDsn", 'E', "DSN", 0, "The DSN to use when connecting TDengine's cloud services."}, + {"timeout", 't', "SECONDS", 0, "The timeout seconds for websocket to interact."}, {0}}; static error_t parse_opt(int key, char *arg, struct argp_state *state) { @@ -178,12 +179,19 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { break; case 'E': if (arg) { - arguments->cloudDsn = arg; + arguments->dsn = arg; } else { fprintf(stderr, "Invalid -E option\n"); return -1; } break; + case 't': + if (arg) { + arguments->timeout = atoi(arg); + } else { + fprintf(stderr, "Invalid -t option\n"); + } + break; default: return ARGP_ERR_UNKNOWN; } @@ -237,10 +245,10 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { argp_parse(&argp, argc, argv, 0, 0, arguments); - if (args.cloudDsn == NULL) { + if (args.dsn == NULL) { if (args.cloud) { - args.cloudDsn = getenv("TDENGINE_CLOUD_DSN"); - if (args.cloudDsn == NULL) { + args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (args.dsn == NULL) { args.cloud = false; } } @@ -607,37 +615,3 @@ void exitShell() { taos_cleanup(); exit(EXIT_SUCCESS); } - -int tcpConnect(char* host, int port) { - struct sockaddr_in serv_addr; - if (port == 0) { - port = 6041; - args.port = 6041; - } - if (NULL == host) { - host = "localhost"; - args.host = "localhost"; - } - - struct hostent *server = gethostbyname(host); - if ((server == NULL) || (server->h_addr == NULL)) { - fprintf(stderr, "no such host: %s\n", host); - return -1; - } - memset(&serv_addr, 0, sizeof(struct sockaddr_in)); - serv_addr.sin_family = AF_INET; - serv_addr.sin_port = htons(port); - memcpy(&(serv_addr.sin_addr.s_addr), server->h_addr, server->h_length); - args.socket = socket(AF_INET, SOCK_STREAM, 0); - if (args.socket < 0) { - fprintf(stderr, "failed to create socket\n"); - return -1; - } - int retConn = connect(args.socket, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr)); - if (retConn < 0) { - fprintf(stderr, "failed to connect\n"); - close(args.socket); - return -1; - } - return 0; -} diff --git a/src/kit/shell/src/shellMain.c b/src/kit/shell/src/shellMain.c index 866dd2d6c6..17ef859c2d 100644 --- a/src/kit/shell/src/shellMain.c +++ b/src/kit/shell/src/shellMain.c @@ -39,9 +39,6 @@ void *cancelHandler(void *arg) { } if (args.restful || args.cloud) { stop_fetch = true; - if (wsclient_send_sql(NULL, WS_CLOSE, ws_id)) { - exit(EXIT_FAILURE); - } } #ifdef LINUX int64_t rid = atomic_val_compare_exchange_64(&result, result, 0); @@ -96,11 +93,9 @@ SShellArguments args = {.host = NULL, .pktNum = 100, .pktType = "TCP", .netTestRole = NULL, - .cloudDsn = NULL, .cloud = true, - .cloudHost = NULL, - .cloudPort = NULL, - .cloudToken = NULL, + .dsn = NULL, + .timeout = 10, }; /* @@ -140,17 +135,18 @@ int main(int argc, char* argv[]) { exit(0); } - if (args.cloud) { - if (parse_cloud_dsn()) { - exit(EXIT_FAILURE); - } - if (tcpConnect(args.cloudHost, atoi(args.cloudPort))) { - exit(EXIT_FAILURE); - } - } else if (args.restful) { - if (tcpConnect(args.host, args.port)) { - exit(EXIT_FAILURE); - } + if (args.restful) { + args.dsn = calloc(1, 1024); + + if (args.host == NULL) { + args.host = "localhost"; + } + + if (args.port == 0) { + args.port = 6041; + } + + snprintf(args.dsn, 1024, "ws://%s:%d/rest/ws",args.host, args.port); } /* Initialize the shell */ @@ -169,11 +165,6 @@ int main(int argc, char* argv[]) { taosSetSignal(SIGINT, shellQueryInterruptHandler); taosSetSignal(SIGHUP, shellQueryInterruptHandler); taosSetSignal(SIGABRT, shellQueryInterruptHandler); - if (args.restful || args.cloud) { -#ifdef LINUX - taosSetSignal(SIGPIPE, shellRestfulSendInterruptHandler); -#endif - } /* Get grant information */ shellGetGrantInfo(args.con); diff --git a/src/kit/shell/src/shellWindows.c b/src/kit/shell/src/shellWindows.c index 83d8630eb8..34d0c545d1 100644 --- a/src/kit/shell/src/shellWindows.c +++ b/src/kit/shell/src/shellWindows.c @@ -66,6 +66,8 @@ void printHelp() { printf("%s%s%s\n", indent, indent, "Connect and interact with TDengine use restful."); printf("%s%s\n", indent, "-E"); printf("%s%s%s\n", indent, indent, "The DSN to use when connecting TDengine's cloud services."); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s\n", indent, indent, "The timeout seconds for websocekt to interact."); printf("%s%s\n", indent, "-S"); printf("%s%s%s\n", indent, indent, "Packet type used for net test, default is TCP."); printf("%s%s\n", indent, "-V"); @@ -226,13 +228,22 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { else if (strcmp(argv[i], "-E") == 0) { if (i < argc - 1) { - arguments->cloudDsn = argv[++i]; + arguments->dsn = argv[++i]; } else { fprintf(stderr, "options -E requires an argument\n"); exit(EXIT_FAILURE); } } + else if (strcmp(argv[i], "-t") == 0) { + if (i < argc - 1) { + arguments->timeout = atoi(argv[++i]); + } else { + fprintf(stderr, "option -t requires an argument\n"); + exit(EXIT_FAILURE); + } + } + else if (strcmp(argv[i], "-V") == 0) { printVersion(); exit(EXIT_SUCCESS); @@ -247,16 +258,16 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { exit(EXIT_FAILURE); } } - if (args.cloudDsn == NULL) { + if (args.dsn == NULL) { if (args.cloud) { - args.cloudDsn = getenv("TDENGINE_CLOUD_DSN"); - if (args.cloudDsn[strlen(args.cloudDsn) - 1] == '\"') { - args.cloudDsn[strlen(args.cloudDsn) - 1] = '\0'; + args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (args.dsn[strlen(args.dsn) - 1] == '\"') { + args.dsn[strlen(args.dsn) - 1] = '\0'; } - if (args.cloudDsn[0] == '\"') { - args.cloudDsn += 1; + if (args.dsn[0] == '\"') { + args.dsn += 1; } - if (args.cloudDsn == NULL) { + if (args.dsn == NULL) { args.cloud = false; } } @@ -374,64 +385,3 @@ void get_history_path(char *history) { } void exitShell() { exit(EXIT_SUCCESS); } - -int tcpConnect(char* host, int iport) { - int iResult; - WSADATA wsaData; - struct addrinfo *aResult = NULL, - *ptr = NULL, - hints; - if (iport == 0) { - iport = 6041; - args.port = 6041; - } - if (NULL == host) { - host = "localhost"; - args.host = "localhost"; - } - char port[10] = {0}; - - sprintf_s(port, 10, "%d", iport); - - iResult = WSAStartup(MAKEWORD(2,2), &wsaData); - if (iResult != 0) { - printf("WSAStartup failed with error: %d\n", iResult); - return 1; - } - memset(&hints, 0, sizeof(struct addrinfo)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - iResult = getaddrinfo(host, port, &hints, &aResult); - if ( iResult != 0 ) { - printf("getaddrinfo failed with error: %d\n", iResult); - WSACleanup(); - return 1; - } - - for(ptr=aResult; ptr != NULL ; ptr=ptr->ai_next) { - // Create a SOCKET for connecting to server - args.socket = socket(ptr->ai_family, ptr->ai_socktype, - ptr->ai_protocol); - if (args.socket == INVALID_SOCKET) { - printf("socket failed with error: %ld\n", WSAGetLastError()); - WSACleanup(); - return 1; - } - - // Connect to server. - iResult = connect( args.socket, ptr->ai_addr, (int)ptr->ai_addrlen); - if (iResult == SOCKET_ERROR) { - closesocket(args.socket); - args.socket = INVALID_SOCKET; - continue; - } - break; - } - if (args.socket == INVALID_SOCKET) { - printf("Unable to connect to server!\n"); - WSACleanup(); - return 1; - } - return 0; -} \ No newline at end of file diff --git a/src/plugins/taosws-rs b/src/plugins/taosws-rs index c5fded266d..4afe6f4bcf 160000 --- a/src/plugins/taosws-rs +++ b/src/plugins/taosws-rs @@ -1 +1 @@ -Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343 +Subproject commit 4afe6f4bcf5ac6eca48e0f41ec73a0aea2940335 diff --git a/tests/develop-test/0-others/taos_shell.py b/tests/develop-test/0-others/taos_shell.py index b1293958c2..5d73884763 100644 --- a/tests/develop-test/0-others/taos_shell.py +++ b/tests/develop-test/0-others/taos_shell.py @@ -69,23 +69,23 @@ class TDTestCase: def run(self): binPath = self.getPath() self.binPath = binPath - self.checkresult("drop database if exists test", "Update OK") - self.checkresult("create database if not exists test", "Update OK") + self.checkresult("drop database if exists test", "Query OK") + self.checkresult("create database if not exists test", "Query OK") self.checkresult("create stable test.stb (ts timestamp, c1 nchar(8), c2 double, c3 int) tags (t1 int)", "Update OK") - self.checkresult("create table test.tb1 using test.stb tags (1)", "Update OK") - self.checkresult("create table test.tb2 using test.stb tags (2)", "Update OK") + self.checkresult("create table test.tb1 using test.stb tags (1)", "Query OK") + self.checkresult("create table test.tb2 using test.stb tags (2)", "Query OK") self.checkresult("select tbname from test.stb", "Query OK, 2 row(s) in set") - self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Update OK") - self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Update OK") - self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Update OK") - self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Update OK") + self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Query OK") + self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Query OK") + self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Query OK") + self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Query OK") self.checkresult("select * from test.stb", "Query OK, 4 row(s) in set") taosBenchmark = self.getPath(tool="taosBenchmark") cmd = "%s -n 100 -t 100 -y" %taosBenchmark tdLog.info("%s" % cmd) os.system("%s" % cmd) self.checkresult("select * from test.meters", "Query OK, 10000 row(s) in set") - self.checkresult("select * from test.meters","Notice: The result shows only the first 100 rows") + # self.checkresult("select * from test.meters","Notice: The result shows only the first 100 rows") def stop(self): tdSql.close() diff --git a/tests/develop-test/fulltest-others.sh b/tests/develop-test/fulltest-others.sh index d39b4c1fac..b9e056a67b 100755 --- a/tests/develop-test/fulltest-others.sh +++ b/tests/develop-test/fulltest-others.sh @@ -1,3 +1,2 @@ python3 ./test.py -f 0-others/json_tag.py -python3 ./test.py -f 0-others/TD-12435.py -python3 ./test.py -f 0-others/taos_shell.py \ No newline at end of file +python3 ./test.py -f 0-others/TD-12435.py \ No newline at end of file diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 059d77f18d..a50f781c64 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -714,8 +714,7 @@ 5,,develop-test,python3 ./test.py -f 2-query/timeline_agg_func_groupby.py 5,,develop-test,python3 ./test.py -f 2-query/session_two_stage.py 5,,develop-test,python3 ./test.py -f 2-query/function_histogram.py -5,,develop-test,python3 ./test.py -f 0-others/TD-12435.py -5,,develop-test,python3 ./test.py -f 0-others/taos_shell.py +5,,develop-test,python3 ./test.py -f 0-others/TD-12435.py 5,,develop-test,python3 ./test.py -f 0-others/json_tag.py 5,,develop-test,python3 ./test.py -f 2-query/function_mode.py 5,,develop-test,python3 ./test.py -f 2-query/function_now.py -- GitLab