未验证 提交 8af64798 编写于 作者: Y Yang Zhao 提交者: GitHub

fix: use taosws lib for websocket interact in taos shell (#14802)

* fix: update taos-tools
”

* chore: add taosws-rs as plugin

* chore: update taosws-rs

* chore: add taosws install/remove in scripts

* chore: update taosws-rs

* chore: update taosws-rs

* fix: _smartly_ build ws lib

* chore: enhance CMakeLists.txt

* fix: packaging/tools/make_install.sh

* chore: update taos-tools

* fix: use rust websock lib

* feat: add vertical and file dump for shell

* test: change test case

* test: change test cases

* fix: remove test case and add macro

* fix: change print message

* feat: add timeout and hint for error response

* fix: small typo

* fix: windows compile

* fix: typo

* fix: reconnect

* fix: do not parse cloud dsn

* fix: update taosws-rs

* fix: win and mac compile
Co-authored-by: sangshuduo's avatarShuduo Sang <sangshuduo@gmail.com>
上级 4226c900
......@@ -25,6 +25,9 @@
#include "taos.h"
#include "taosdef.h"
#include "tsclient.h"
#ifdef WEBSOCKET
#include "taosws.h"
#endif
#define MAX_USERNAME_SIZE 64
#define MAX_DBNAME_SIZE 64
......@@ -41,18 +44,6 @@ typedef struct SShellHistory {
int hend;
} SShellHistory;
typedef enum enumWebSocketFrameType {
TEXT_FRAME = 0x81,
PING_FRAME = 0x19,
PONG_FRAME = 0x8A,
} WebSocketFrameType;
typedef struct SWSParser {
int offset;
int payload_length;
WebSocketFrameType frame;
} SWSParser;
typedef struct SShellArguments {
char* host;
char* password;
......@@ -61,11 +52,6 @@ typedef struct SShellArguments {
char* database;
char* timezone;
bool restful;
#ifdef WINDOWS
SOCKET socket;
#else
int socket;
#endif
TAOS* con;
bool is_raw_time;
bool is_use_passwd;
......@@ -81,21 +67,14 @@ typedef struct SShellArguments {
int pktNum;
char* pktType;
char* netTestRole;
char* cloudDsn;
bool cloud;
char* cloudHost;
char* cloudPort;
char* cloudToken;
char* dsn;
#ifdef WEBSOCKET
WS_TAOS* ws_conn;
#endif
bool cloud;
uint32_t timeout;
} SShellArguments;
typedef enum WS_ACTION_TYPE_S {
WS_CONN,
WS_QUERY,
WS_FETCH,
WS_FETCH_BLOCK,
WS_CLOSE,
} WS_ACTION_TYPE;
/**************** Function declarations ****************/
extern void shellParseArgument(int argc, char* argv[], SShellArguments* arguments);
extern void shellInit(SShellArguments* args);
......@@ -105,6 +84,10 @@ extern int regex_match(const char* s, const char* reg, int cflags);
int32_t shellReadCommand(TAOS* con, char command[]);
int32_t shellRunCommand(TAOS* con, char* command);
void shellRunCommandOnServer(TAOS* con, char command[]);
#ifdef WEBSOCKET
void shellRunCommandOnWebsocket(char command[]);
#endif
void printField(const char* val, TAOS_FIELD* field, int width, int32_t length, int precision);
void read_history();
void write_history();
void source_file(TAOS* con, char* fptr);
......@@ -114,15 +97,12 @@ void get_history_path(char* history);
void shellCheck(TAOS* con, SShellArguments* args);
void cleanup_handler(void* arg);
void exitShell();
#ifdef WEBSOCKET
int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical);
#endif
int shellDumpResult(TAOS_RES* con, char* fname, int* error_no, bool printMode);
void shellGetGrantInfo(void* con);
int isCommentLine(char* line);
int wsclient_handshake();
int wsclient_conn();
void wsclient_query(char* command);
int wsclient_send_sql(char *command, WS_ACTION_TYPE type, int64_t id);
int tcpConnect(char* host, int port);
int parse_cloud_dsn();
/**************** Global variable declarations ****************/
extern char PROMPT_HEADER[];
......@@ -135,7 +115,6 @@ extern int get_old_terminal_mode(struct termios* tio);
extern void reset_terminal_mode();
extern SShellArguments args;
extern int64_t result;
extern int64_t ws_id;
extern bool stop_fetch;
#endif
......@@ -64,6 +64,8 @@ void printHelp() {
printf("%s%s%s\n", indent, indent, "Connect and interact with TDengine use restful.");
printf("%s%s\n", indent, "-E");
printf("%s%s%s\n", indent, indent, "The DSN to use when connecting TDengine's cloud services.");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "The timeout in seconds for websocket interact.");
exit(EXIT_SUCCESS);
}
......@@ -204,12 +206,21 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
else if (strcmp(argv[i], "-E") == 0) {
if (i < argc - 1) {
arguments->cloudDsn = argv[++i];
arguments->dsn = argv[++i];
} else {
fprintf(stderr, "options -E requires an argument\n");
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-t") == 0) {
if (i < argc -1) {
arguments->timeout = atoi(argv[++i]);
} else {
fprintf(stderr, "options -t requires an argument\n");
exit(EXIT_FAILURE);
}
}
// For temperory command TODO
else if (strcmp(argv[i], "--help") == 0) {
......@@ -221,10 +232,10 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE);
}
}
if (args.cloudDsn == NULL) {
if (args.dsn == NULL) {
if (args.cloud) {
args.cloudDsn = getenv("TDENGINE_CLOUD_DSN");
if (args.cloudDsn == NULL) {
args.dsn = getenv("TDENGINE_CLOUD_DSN");
if (args.dsn == NULL) {
args.cloud = false;
}
}
......@@ -578,37 +589,3 @@ void exitShell() {
tcsetattr(0, TCSANOW, &oldtio);
exit(EXIT_SUCCESS);
}
int tcpConnect(char* host, int port) {
struct sockaddr_in serv_addr;
if (port == 0) {
port = 6041;
args.port = 6041;
}
if (NULL == host) {
host = "localhost";
args.host = "localhost";
}
struct hostent *server = gethostbyname(host);
if ((server == NULL) || (server->h_addr == NULL)) {
fprintf(stderr, "no such host: %s\n", host);
return -1;
}
memset(&serv_addr, 0, sizeof(struct sockaddr_in));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
memcpy(&(serv_addr.sin_addr.s_addr), server->h_addr, server->h_length);
args.socket = socket(AF_INET, SOCK_STREAM, 0);
if (args.socket < 0) {
fprintf(stderr, "failed to create socket\n");
return -1;
}
int retConn = connect(args.socket, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr));
if (retConn < 0) {
fprintf(stderr, "failed to connect\n");
close(args.socket);
return -1;
}
return 0;
}
\ No newline at end of file
此差异已折叠。
......@@ -53,7 +53,8 @@ static struct argp_option options[] = {
{"pktnum", 'N', "PKTNUM", 0, "Packet numbers used for net test, default is 100."},
{"pkttype", 'S', "PKTTYPE", 0, "Choose packet type used for net test, default is TCP. Only speed test could be either TCP or UDP."},
{"restful", 'R', 0, 0, "Connect and interact with TDengine use restful."},
{0, 'E', "DSN", 0, "The DSN to use when connecting TDengine's cloud services."},
{"cloudDsn", 'E', "DSN", 0, "The DSN to use when connecting TDengine's cloud services."},
{"timeout", 't', "SECONDS", 0, "The timeout seconds for websocket to interact."},
{0}};
static error_t parse_opt(int key, char *arg, struct argp_state *state) {
......@@ -178,12 +179,19 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
break;
case 'E':
if (arg) {
arguments->cloudDsn = arg;
arguments->dsn = arg;
} else {
fprintf(stderr, "Invalid -E option\n");
return -1;
}
break;
case 't':
if (arg) {
arguments->timeout = atoi(arg);
} else {
fprintf(stderr, "Invalid -t option\n");
}
break;
default:
return ARGP_ERR_UNKNOWN;
}
......@@ -237,10 +245,10 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
argp_parse(&argp, argc, argv, 0, 0, arguments);
if (args.cloudDsn == NULL) {
if (args.dsn == NULL) {
if (args.cloud) {
args.cloudDsn = getenv("TDENGINE_CLOUD_DSN");
if (args.cloudDsn == NULL) {
args.dsn = getenv("TDENGINE_CLOUD_DSN");
if (args.dsn == NULL) {
args.cloud = false;
}
}
......@@ -607,37 +615,3 @@ void exitShell() {
taos_cleanup();
exit(EXIT_SUCCESS);
}
int tcpConnect(char* host, int port) {
struct sockaddr_in serv_addr;
if (port == 0) {
port = 6041;
args.port = 6041;
}
if (NULL == host) {
host = "localhost";
args.host = "localhost";
}
struct hostent *server = gethostbyname(host);
if ((server == NULL) || (server->h_addr == NULL)) {
fprintf(stderr, "no such host: %s\n", host);
return -1;
}
memset(&serv_addr, 0, sizeof(struct sockaddr_in));
serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(port);
memcpy(&(serv_addr.sin_addr.s_addr), server->h_addr, server->h_length);
args.socket = socket(AF_INET, SOCK_STREAM, 0);
if (args.socket < 0) {
fprintf(stderr, "failed to create socket\n");
return -1;
}
int retConn = connect(args.socket, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr));
if (retConn < 0) {
fprintf(stderr, "failed to connect\n");
close(args.socket);
return -1;
}
return 0;
}
......@@ -39,9 +39,6 @@ void *cancelHandler(void *arg) {
}
if (args.restful || args.cloud) {
stop_fetch = true;
if (wsclient_send_sql(NULL, WS_CLOSE, ws_id)) {
exit(EXIT_FAILURE);
}
}
#ifdef LINUX
int64_t rid = atomic_val_compare_exchange_64(&result, result, 0);
......@@ -96,11 +93,9 @@ SShellArguments args = {.host = NULL,
.pktNum = 100,
.pktType = "TCP",
.netTestRole = NULL,
.cloudDsn = NULL,
.cloud = true,
.cloudHost = NULL,
.cloudPort = NULL,
.cloudToken = NULL,
.dsn = NULL,
.timeout = 10,
};
/*
......@@ -140,17 +135,18 @@ int main(int argc, char* argv[]) {
exit(0);
}
if (args.cloud) {
if (parse_cloud_dsn()) {
exit(EXIT_FAILURE);
}
if (tcpConnect(args.cloudHost, atoi(args.cloudPort))) {
exit(EXIT_FAILURE);
}
} else if (args.restful) {
if (tcpConnect(args.host, args.port)) {
exit(EXIT_FAILURE);
}
if (args.restful) {
args.dsn = calloc(1, 1024);
if (args.host == NULL) {
args.host = "localhost";
}
if (args.port == 0) {
args.port = 6041;
}
snprintf(args.dsn, 1024, "ws://%s:%d/rest/ws",args.host, args.port);
}
/* Initialize the shell */
......@@ -169,11 +165,6 @@ int main(int argc, char* argv[]) {
taosSetSignal(SIGINT, shellQueryInterruptHandler);
taosSetSignal(SIGHUP, shellQueryInterruptHandler);
taosSetSignal(SIGABRT, shellQueryInterruptHandler);
if (args.restful || args.cloud) {
#ifdef LINUX
taosSetSignal(SIGPIPE, shellRestfulSendInterruptHandler);
#endif
}
/* Get grant information */
shellGetGrantInfo(args.con);
......
......@@ -66,6 +66,8 @@ void printHelp() {
printf("%s%s%s\n", indent, indent, "Connect and interact with TDengine use restful.");
printf("%s%s\n", indent, "-E");
printf("%s%s%s\n", indent, indent, "The DSN to use when connecting TDengine's cloud services.");
printf("%s%s\n", indent, "-t");
printf("%s%s%s\n", indent, indent, "The timeout seconds for websocekt to interact.");
printf("%s%s\n", indent, "-S");
printf("%s%s%s\n", indent, indent, "Packet type used for net test, default is TCP.");
printf("%s%s\n", indent, "-V");
......@@ -226,13 +228,22 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
else if (strcmp(argv[i], "-E") == 0) {
if (i < argc - 1) {
arguments->cloudDsn = argv[++i];
arguments->dsn = argv[++i];
} else {
fprintf(stderr, "options -E requires an argument\n");
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-t") == 0) {
if (i < argc - 1) {
arguments->timeout = atoi(argv[++i]);
} else {
fprintf(stderr, "option -t requires an argument\n");
exit(EXIT_FAILURE);
}
}
else if (strcmp(argv[i], "-V") == 0) {
printVersion();
exit(EXIT_SUCCESS);
......@@ -247,16 +258,16 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
exit(EXIT_FAILURE);
}
}
if (args.cloudDsn == NULL) {
if (args.dsn == NULL) {
if (args.cloud) {
args.cloudDsn = getenv("TDENGINE_CLOUD_DSN");
if (args.cloudDsn[strlen(args.cloudDsn) - 1] == '\"') {
args.cloudDsn[strlen(args.cloudDsn) - 1] = '\0';
args.dsn = getenv("TDENGINE_CLOUD_DSN");
if (args.dsn[strlen(args.dsn) - 1] == '\"') {
args.dsn[strlen(args.dsn) - 1] = '\0';
}
if (args.cloudDsn[0] == '\"') {
args.cloudDsn += 1;
if (args.dsn[0] == '\"') {
args.dsn += 1;
}
if (args.cloudDsn == NULL) {
if (args.dsn == NULL) {
args.cloud = false;
}
}
......@@ -374,64 +385,3 @@ void get_history_path(char *history) {
}
void exitShell() { exit(EXIT_SUCCESS); }
int tcpConnect(char* host, int iport) {
int iResult;
WSADATA wsaData;
struct addrinfo *aResult = NULL,
*ptr = NULL,
hints;
if (iport == 0) {
iport = 6041;
args.port = 6041;
}
if (NULL == host) {
host = "localhost";
args.host = "localhost";
}
char port[10] = {0};
sprintf_s(port, 10, "%d", iport);
iResult = WSAStartup(MAKEWORD(2,2), &wsaData);
if (iResult != 0) {
printf("WSAStartup failed with error: %d\n", iResult);
return 1;
}
memset(&hints, 0, sizeof(struct addrinfo));
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
iResult = getaddrinfo(host, port, &hints, &aResult);
if ( iResult != 0 ) {
printf("getaddrinfo failed with error: %d\n", iResult);
WSACleanup();
return 1;
}
for(ptr=aResult; ptr != NULL ; ptr=ptr->ai_next) {
// Create a SOCKET for connecting to server
args.socket = socket(ptr->ai_family, ptr->ai_socktype,
ptr->ai_protocol);
if (args.socket == INVALID_SOCKET) {
printf("socket failed with error: %ld\n", WSAGetLastError());
WSACleanup();
return 1;
}
// Connect to server.
iResult = connect( args.socket, ptr->ai_addr, (int)ptr->ai_addrlen);
if (iResult == SOCKET_ERROR) {
closesocket(args.socket);
args.socket = INVALID_SOCKET;
continue;
}
break;
}
if (args.socket == INVALID_SOCKET) {
printf("Unable to connect to server!\n");
WSACleanup();
return 1;
}
return 0;
}
\ No newline at end of file
Subproject commit c5fded266d3b10508e38bf3285bb7ecf798bc343
Subproject commit 4afe6f4bcf5ac6eca48e0f41ec73a0aea2940335
......@@ -69,23 +69,23 @@ class TDTestCase:
def run(self):
binPath = self.getPath()
self.binPath = binPath
self.checkresult("drop database if exists test", "Update OK")
self.checkresult("create database if not exists test", "Update OK")
self.checkresult("drop database if exists test", "Query OK")
self.checkresult("create database if not exists test", "Query OK")
self.checkresult("create stable test.stb (ts timestamp, c1 nchar(8), c2 double, c3 int) tags (t1 int)", "Update OK")
self.checkresult("create table test.tb1 using test.stb tags (1)", "Update OK")
self.checkresult("create table test.tb2 using test.stb tags (2)", "Update OK")
self.checkresult("create table test.tb1 using test.stb tags (1)", "Query OK")
self.checkresult("create table test.tb2 using test.stb tags (2)", "Query OK")
self.checkresult("select tbname from test.stb", "Query OK, 2 row(s) in set")
self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Update OK")
self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Update OK")
self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Update OK")
self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Update OK")
self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Query OK")
self.checkresult("insert into test.tb1 values (now, 'beijing', 1.23, 18)", "Query OK")
self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Query OK")
self.checkresult("insert into test.tb2 values (now, 'beijing', 1.23, 18)", "Query OK")
self.checkresult("select * from test.stb", "Query OK, 4 row(s) in set")
taosBenchmark = self.getPath(tool="taosBenchmark")
cmd = "%s -n 100 -t 100 -y" %taosBenchmark
tdLog.info("%s" % cmd)
os.system("%s" % cmd)
self.checkresult("select * from test.meters", "Query OK, 10000 row(s) in set")
self.checkresult("select * from test.meters","Notice: The result shows only the first 100 rows")
# self.checkresult("select * from test.meters","Notice: The result shows only the first 100 rows")
def stop(self):
tdSql.close()
......
python3 ./test.py -f 0-others/json_tag.py
python3 ./test.py -f 0-others/TD-12435.py
python3 ./test.py -f 0-others/taos_shell.py
\ No newline at end of file
python3 ./test.py -f 0-others/TD-12435.py
\ No newline at end of file
......@@ -714,8 +714,7 @@
5,,develop-test,python3 ./test.py -f 2-query/timeline_agg_func_groupby.py
5,,develop-test,python3 ./test.py -f 2-query/session_two_stage.py
5,,develop-test,python3 ./test.py -f 2-query/function_histogram.py
5,,develop-test,python3 ./test.py -f 0-others/TD-12435.py
5,,develop-test,python3 ./test.py -f 0-others/taos_shell.py
5,,develop-test,python3 ./test.py -f 0-others/TD-12435.py
5,,develop-test,python3 ./test.py -f 0-others/json_tag.py
5,,develop-test,python3 ./test.py -f 2-query/function_mode.py
5,,develop-test,python3 ./test.py -f 2-query/function_now.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册