提交 16049c87 编写于 作者: S Shuduo Sang

support query clients.

上级 3fb51717
...@@ -13,8 +13,9 @@ typedef struct { ...@@ -13,8 +13,9 @@ typedef struct {
char sql[256]; char sql[256];
char dataDir[256]; char dataDir[256];
int filesNum; int filesNum;
int writeClients; int clients;
int rowsPerRequest; int rowsPerRequest;
int write;
} ProArgs; } ProArgs;
typedef struct { typedef struct {
...@@ -41,7 +42,7 @@ int main(int argc, char *argv[]) { ...@@ -41,7 +42,7 @@ int main(int argc, char *argv[]) {
statis.totalRows = 0; statis.totalRows = 0;
parseArg(argc, argv); parseArg(argc, argv);
if (arguments.writeClients > 0) { if (arguments.write) {
writeData(); writeData();
} else { } else {
readData(); readData();
...@@ -52,7 +53,7 @@ void parseArg(int argc, char *argv[]) { ...@@ -52,7 +53,7 @@ void parseArg(int argc, char *argv[]) {
strcpy(arguments.sql, "./sqlCmd.txt"); strcpy(arguments.sql, "./sqlCmd.txt");
strcpy(arguments.dataDir, "./testdata"); strcpy(arguments.dataDir, "./testdata");
arguments.filesNum = 2; arguments.filesNum = 2;
arguments.writeClients = 0; arguments.clients = 1;
arguments.rowsPerRequest = 100; arguments.rowsPerRequest = 100;
for (int i = 1; i < argc; ++i) { for (int i = 1; i < argc; ++i) {
...@@ -83,12 +84,12 @@ void parseArg(int argc, char *argv[]) { ...@@ -83,12 +84,12 @@ void parseArg(int argc, char *argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
else if (strcmp(argv[i], "-writeClients") == 0) { else if (strcmp(argv[i], "-clients") == 0) {
if (i < argc - 1) { if (i < argc - 1) {
arguments.writeClients = atoi(argv[++i]); arguments.clients = atoi(argv[++i]);
} }
else { else {
fprintf(stderr, "'-writeClients' requires a parameter, default:%d\n", arguments.writeClients); fprintf(stderr, "'-clients' requires a parameter, default:%d\n", arguments.clients);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
...@@ -101,6 +102,9 @@ void parseArg(int argc, char *argv[]) { ...@@ -101,6 +102,9 @@ void parseArg(int argc, char *argv[]) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
} }
else if (strcmp(argv[i], "-w") == 0) {
arguments.write = 1;
}
} }
} }
...@@ -215,7 +219,7 @@ void writeDataImp(void *param) { ...@@ -215,7 +219,7 @@ void writeDataImp(void *param) {
void writeData() { void writeData() {
printf("write data\n"); printf("write data\n");
printf("---- writeClients: %d\n", arguments.writeClients); printf("---- clients: %d\n", arguments.clients);
printf("---- dataDir: %s\n", arguments.dataDir); printf("---- dataDir: %s\n", arguments.dataDir);
printf("---- numOfFiles: %d\n", arguments.filesNum); printf("---- numOfFiles: %d\n", arguments.filesNum);
printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest); printf("---- rowsPerRequest: %d\n", arguments.rowsPerRequest);
...@@ -243,12 +247,12 @@ void writeData() { ...@@ -243,12 +247,12 @@ void writeData() {
int64_t st = getTimeStampMs(); int64_t st = getTimeStampMs();
int a = arguments.filesNum / arguments.writeClients; int a = arguments.filesNum / arguments.clients;
int b = arguments.filesNum % arguments.writeClients; int b = arguments.filesNum % arguments.clients;
int last = 0; int last = 0;
ThreadObj *threads = calloc((size_t)arguments.writeClients, sizeof(ThreadObj)); ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj));
for (int i = 0; i < arguments.writeClients; ++i) { for (int i = 0; i < arguments.clients; ++i) {
ThreadObj *pthread = threads + i; ThreadObj *pthread = threads + i;
pthread_attr_t thattr; pthread_attr_t thattr;
pthread->threadId = i + 1; pthread->threadId = i + 1;
...@@ -264,7 +268,7 @@ void writeData() { ...@@ -264,7 +268,7 @@ void writeData() {
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread); pthread_create(&pthread->pid, &thattr, (void *(*)(void *))writeDataImp, pthread);
} }
for (int i = 0; i < arguments.writeClients; i++) { for (int i = 0; i < arguments.clients; i++) {
pthread_join(threads[i].pid, NULL); pthread_join(threads[i].pid, NULL);
} }
...@@ -272,17 +276,15 @@ void writeData() { ...@@ -272,17 +276,15 @@ void writeData() {
float seconds = (float)elapsed / 1000; float seconds = (float)elapsed / 1000;
float rs = (float)statis.totalRows / seconds; float rs = (float)statis.totalRows / seconds;
free(threads);
printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs); printf("---- Spent %f seconds to insert %ld records, speed: %f Rows/Second\n", seconds, statis.totalRows, rs);
} }
void readData() { void readDataImp(void *param)
printf("read data\n"); {
printf("---- sql: %s\n", arguments.sql); ThreadObj *pThread = (ThreadObj *)param;
printf("Thread %d\n", pThread->threadId);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
FILE *fp = fopen(arguments.sql, "r"); FILE *fp = fopen(arguments.sql, "r");
if (fp == NULL) { if (fp == NULL) {
printf("failed to open file %s\n", arguments.sql); printf("failed to open file %s\n", arguments.sql);
...@@ -290,6 +292,10 @@ void readData() { ...@@ -290,6 +292,10 @@ void readData() {
} }
printf("open file %s success\n", arguments.sql); printf("open file %s success\n", arguments.sql);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
char *line = NULL; char *line = NULL;
size_t len = 0; size_t len = 0;
while (!feof(fp)) { while (!feof(fp)) {
...@@ -325,9 +331,36 @@ void readData() { ...@@ -325,9 +331,36 @@ void readData() {
int64_t elapsed = getTimeStampMs() - st; int64_t elapsed = getTimeStampMs() - st;
float seconds = (float)elapsed / 1000; float seconds = (float)elapsed / 1000;
printf("---- Spent %f seconds to query: %s", seconds, line); printf("---- Spent %f seconds to retrieve %d records, Thread:%d query: %s\n", seconds, rows, pThread->threadId, line);
} }
fclose(fp); fclose(fp);
} }
void readData() {
printf("read data\n");
printf("---- sql: %s\n", arguments.sql);
printf("---- clients: %d\n", arguments.clients);
void *taos = taos_connect("127.0.0.1", "root", "taosdata", NULL, 0);
if (taos == NULL)
taos_error(taos);
ThreadObj *threads = calloc((size_t)arguments.clients, sizeof(ThreadObj));
for (int i = 0; i < arguments.clients; ++i) {
ThreadObj *pthread = threads + i;
pthread_attr_t thattr;
pthread->threadId = i + 1;
pthread_attr_init(&thattr);
pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
pthread_create(&pthread->pid, &thattr, (void *(*)(void *))readDataImp, pthread);
}
for (int i = 0; i < arguments.clients; i++) {
pthread_join(threads[i].pid, NULL);
}
free(threads);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册