From 16049c8720d46a7f4a3bb1eed2f3f67c4035dde2 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Sat, 18 Jul 2020 16:59:58 +0800 Subject: [PATCH] support query clients. --- tests/comparisonTest/tdengine/q5.txt | 1 + tests/comparisonTest/tdengine/tdengineTest.c | 75 ++++++++++++++------ 2 files changed, 55 insertions(+), 21 deletions(-) create mode 100644 tests/comparisonTest/tdengine/q5.txt diff --git a/tests/comparisonTest/tdengine/q5.txt b/tests/comparisonTest/tdengine/q5.txt new file mode 100644 index 0000000000..5f36955dd6 --- /dev/null +++ b/tests/comparisonTest/tdengine/q5.txt @@ -0,0 +1 @@ +select * from db.devices; diff --git a/tests/comparisonTest/tdengine/tdengineTest.c b/tests/comparisonTest/tdengine/tdengineTest.c index de92526337..3d78a3d0a5 100644 --- a/tests/comparisonTest/tdengine/tdengineTest.c +++ b/tests/comparisonTest/tdengine/tdengineTest.c @@ -13,8 +13,9 @@ typedef struct { char sql[256]; char dataDir[256]; int filesNum; - int writeClients; + int clients; int rowsPerRequest; + int write; } ProArgs; typedef struct { @@ -41,7 +42,7 @@ int main(int argc, char *argv[]) { statis.totalRows = 0; parseArg(argc, argv); - if (arguments.writeClients > 0) { + if (arguments.write) { writeData(); } else { readData(); @@ -52,7 +53,7 @@ void parseArg(int argc, char *argv[]) { strcpy(arguments.sql, "./sqlCmd.txt"); strcpy(arguments.dataDir, "./testdata"); arguments.filesNum = 2; - arguments.writeClients = 0; + arguments.clients = 1; arguments.rowsPerRequest = 100; for (int i = 1; i < argc; ++i) { @@ -83,12 +84,12 @@ void parseArg(int argc, char *argv[]) { exit(EXIT_FAILURE); } } - else if (strcmp(argv[i], "-writeClients") == 0) { + else if (strcmp(argv[i], "-clients") == 0) { if (i < argc - 1) { - arguments.writeClients = atoi(argv[++i]); + arguments.clients = atoi(argv[++i]); } else { - fprintf(stderr, "'-writeClients' requires a parameter, default:%d\n", arguments.writeClients); + fprintf(stderr, "'-clients' requires a parameter, default:%d\n", arguments.clients); exit(EXIT_FAILURE); } } @@ -101,6 +102,9 @@ void parseArg(int argc, char *argv[]) { exit(EXIT_FAILURE); } } + else if (strcmp(argv[i], "-w") == 0) { + arguments.write = 1; + } } } @@ -215,7 +219,7 @@ void writeDataImp(void *param) { void writeData() { printf("write data\n"); - printf("---- writeClients: %d\n", arguments.writeClients); + printf("---- clients: %d\n", arguments.clients); printf("---- dataDir: %s\n", arguments.dataDir); printf("---- numOfFiles: %d\n", arguments.filesNum); printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest); @@ -243,12 +247,12 @@ void writeData() { int64_t st = getTimeStampMs(); - int a = arguments.filesNum / arguments.writeClients; - int b = arguments.filesNum % arguments.writeClients; + int a = arguments.filesNum / arguments.clients; + int b = arguments.filesNum % arguments.clients; int last = 0; - ThreadObj *threads = calloc((size_t)arguments.writeClients, sizeof(ThreadObj)); - for (int i = 0; i < arguments.writeClients; ++i) { + ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj)); + for (int i = 0; i < arguments.clients; ++i) { ThreadObj *pthread = threads + i; pthread_attr_t thattr; pthread->threadId = i + 1; @@ -264,7 +268,7 @@ void writeData() { pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread); } - for (int i = 0; i < arguments.writeClients; i++) { + for (int i = 0; i < arguments.clients; i++) { pthread_join(threads[i].pid, NULL); } @@ -272,17 +276,15 @@ void writeData() { float seconds = (float)elapsed / 1000; float rs = (float)statis.totalRows / seconds; + free(threads); + printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs); } -void readData() { - printf("read data\n"); - printf("---- sql: %s\n", arguments.sql); - - void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); - if (taos == NULL) - taos_error(taos); - +void readDataImp(void *param) +{ + ThreadObj *pThread = (ThreadObj *)param; + printf("Thread %d\n", pThread->threadId); FILE *fp = fopen(arguments.sql, "r"); if (fp == NULL) { printf("failed to open file %s\n", arguments.sql); @@ -290,6 +292,10 @@ void readData() { } printf("open file %s success\n", arguments.sql); + void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + if (taos == NULL) + taos_error(taos); + char *line = NULL; size_t len = 0; while (!feof(fp)) { @@ -325,9 +331,36 @@ void readData() { int64_t elapsed = getTimeStampMs() - st; float seconds = (float)elapsed / 1000; - printf("---- Spent %f seconds to query: %s", seconds, line); + printf("---- Spent %f seconds to retrieve %d records, Thread:%d query: %s\n", seconds, rows, pThread->threadId, line); } fclose(fp); } +void readData() { + printf("read data\n"); + printf("---- sql: %s\n", arguments.sql); + printf("---- clients: %d\n", arguments.clients); + + void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); + if (taos == NULL) + taos_error(taos); + + ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj)); + + for (int i = 0; i < arguments.clients; ++i) { + ThreadObj *pthread = threads + i; + pthread_attr_t thattr; + pthread->threadId = i + 1; + pthread_attr_init(&thattr); + pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE); + pthread_create(&pthread->pid, &thattr, (void *(*)(void *))readDataImp, pthread); + } + + for (int i = 0; i < arguments.clients; i++) { + pthread_join(threads[i].pid, NULL); + } + + free(threads); +} + -- GitLab