提交 8eb76f2f 编写于 作者: S shenglian zhou

[TD-6495]<enhance>:improve performance test

上级 95667d91
...@@ -8,22 +8,15 @@ ...@@ -8,22 +8,15 @@
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#define MAX_THREAD_LINE_BATCHES 256
int numThreads = 8; int numThreads = 8;
int numSuperTables = 8;
int numChildTables = 4; // per thread, per super table int numSuperTables = 1;
int numRowsPerChildTable = 2048; int numChildTables = 1024;
int numRowsPerChildTable = 16384;
void shuffle(char** lines, size_t n) {
if (n > 1) { int maxLinesPerBatch = 16384;
size_t i;
for (i = 0; i < n - 1; i++) {
size_t j = i + rand() / (RAND_MAX / (n - i) + 1);
char* t = lines[j];
lines[j] = lines[i];
lines[i] = t;
}
}
}
void printThreadId(pthread_t id, char* buf) void printThreadId(pthread_t id, char* buf)
{ {
...@@ -38,10 +31,15 @@ static int64_t getTimeInUs() { ...@@ -38,10 +31,15 @@ static int64_t getTimeInUs() {
return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec;
} }
typedef struct { typedef struct {
TAOS* taos;
char** lines; char** lines;
int numLines; int numLines;
} SThreadLinesBatch;
typedef struct {
TAOS* taos;
int numBatches;
SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES];
int64_t costTime; int64_t costTime;
} SThreadInsertArgs; } SThreadInsertArgs;
...@@ -49,12 +47,15 @@ static void* insertLines(void* args) { ...@@ -49,12 +47,15 @@ static void* insertLines(void* args) {
SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args; SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args;
char tidBuf[32] = {0}; char tidBuf[32] = {0};
printThreadId(pthread_self(), tidBuf); printThreadId(pthread_self(), tidBuf);
printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); for (int i = 0; i < insertArgs->numBatches; ++i) {
int64_t begin = getTimeInUs(); SThreadLinesBatch* batch = insertArgs->batches + i;
int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines); printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf);
int64_t end = getTimeInUs(); int64_t begin = getTimeInUs();
insertArgs->costTime = end-begin; int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines);
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); int64_t end = getTimeInUs();
insertArgs->costTime += end - begin;
printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf);
}
return NULL; return NULL;
} }
...@@ -71,6 +72,11 @@ int main(int argc, char* argv[]) { ...@@ -71,6 +72,11 @@ int main(int argc, char* argv[]) {
exit(1); exit(1);
} }
if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) {
printf("too many rows to be handle by threads");
exit(2);
}
char* info = taos_get_server_info(taos); char* info = taos_get_server_info(taos);
printf("server info: %s\n", info); printf("server info: %s\n", info);
info = taos_get_client_info(taos); info = taos_get_client_info(taos);
...@@ -78,7 +84,7 @@ int main(int argc, char* argv[]) { ...@@ -78,7 +84,7 @@ int main(int argc, char* argv[]) {
result = taos_query(taos, "drop database if exists db;"); result = taos_query(taos, "drop database if exists db;");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
result = taos_query(taos, "create database db precision 'ms';"); result = taos_query(taos, "create database db precision 'us';");
taos_free_result(result); taos_free_result(result);
usleep(100000); usleep(100000);
...@@ -93,14 +99,14 @@ int main(int argc, char* argv[]) { ...@@ -93,14 +99,14 @@ int main(int argc, char* argv[]) {
for (int i = 0; i < numSuperTables; i++) { for (int i = 0; i < numSuperTables; i++) {
char* lineStb = calloc(512, 1); char* lineStb = calloc(512, 1);
snprintf(lineStb, 512, lineFormat, i, snprintf(lineStb, 512, lineFormat, i,
numThreads * numSuperTables * numChildTables, numSuperTables * numChildTables,
ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable); ts + numSuperTables * numChildTables * numRowsPerChildTable);
linesStb[i] = lineStb; linesStb[i] = lineStb;
} }
SThreadInsertArgs args = {0}; SThreadInsertArgs args = {0};
args.taos = taos; args.taos = taos;
args.lines = linesStb; args.batches[0].lines = linesStb;
args.numLines = numSuperTables; args.batches[0].numLines = numSuperTables;
insertLines(&args); insertLines(&args);
for (int i = 0; i < numSuperTables; ++i) { for (int i = 0; i < numSuperTables; ++i) {
free(linesStb[i]); free(linesStb[i]);
...@@ -109,66 +115,70 @@ int main(int argc, char* argv[]) { ...@@ -109,66 +115,70 @@ int main(int argc, char* argv[]) {
} }
printf("generate lines...\n"); printf("generate lines...\n");
char*** linesThread = calloc(numThreads, sizeof(char**)); pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i = 0; i < numThreads; ++i) { for (int i = 0; i < numThreads; ++i) {
char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); argsThread[i].taos = taos;
linesThread[i] = lines; argsThread[i].numBatches = 0;
} }
for (int t = 0; t < numThreads; ++t) { int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable;
int l = 0; int totalBatches = (int) ((totalLines) / maxLinesPerBatch);
char** lines = linesThread[t]; if (totalLines % maxLinesPerBatch != 0) {
for (int i = 0; i < numSuperTables; ++i) { totalBatches += 1;
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = t*numSuperTables*numChildTables + j;
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l);
lines[l] = line;
++l;
}
}
}
} }
printf("shuffle lines...\n"); char*** allBatches = calloc(totalBatches, sizeof(char**));
for (int t = 0; t < numThreads; ++t) { for (int i = 0; i < totalBatches; ++i) {
shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable); allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*));
int threadNo = i % numThreads;
int batchNo = i / numThreads;
argsThread[threadNo].batches[batchNo].lines = allBatches[i];
argsThread[threadNo].numBatches = batchNo + 1;
}
int l = 0;
for (int i = 0; i < numSuperTables; ++i) {
for (int j = 0; j < numChildTables; ++j) {
for (int k = 0; k < numRowsPerChildTable; ++k) {
int stIdx = i;
int ctIdx = numSuperTables*numChildTables + j;
char* line = calloc(512, 1);
snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l);
int batchNo = l / maxLinesPerBatch;
int lineNo = l % maxLinesPerBatch;
allBatches[batchNo][lineNo] = line;
argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1;
++l;
}
}
} }
printf("begin multi-thread insertion...\n"); printf("begin multi-thread insertion...\n");
int64_t begin = taosGetTimestampUs(); int64_t begin = taosGetTimestampUs();
pthread_t* tids = calloc(numThreads, sizeof(pthread_t));
SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs));
for (int i=0; i < numThreads; ++i) { for (int i=0; i < numThreads; ++i) {
argsThread[i].lines = linesThread[i];
argsThread[i].taos = taos;
argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable;
pthread_create(tids+i, NULL, insertLines, argsThread+i); pthread_create(tids+i, NULL, insertLines, argsThread+i);
} }
for (int i = 0; i < numThreads; ++i) { for (int i = 0; i < numThreads; ++i) {
pthread_join(tids[i], NULL); pthread_join(tids[i], NULL);
} }
int64_t end = taosGetTimestampUs(); int64_t end = taosGetTimestampUs();
int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable; uint64_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable;
printf("TOTAL LINES: %d\n", totalLines); printf("TOTAL LINES: %zu\n", linesNum);
printf("THREADS: %d\n", numThreads); printf("THREADS: %d\n", numThreads);
int64_t sumTime = 0;
for (int i=0; i<numThreads; ++i) {
sumTime += argsThread[i].costTime;
}
printf("TIME: %d(ms)\n", (int)(end-begin)/1000); printf("TIME: %d(ms)\n", (int)(end-begin)/1000);
double throughput = (double)(totalLines)/(double)(end-begin) * 1000000; double throughput = (double)(totalLines)/(double)(end-begin) * 1000000;
printf("THROUGHPUT:%d/s\n", (int)throughput); printf("THROUGHPUT:%d/s\n", (int)throughput);
for (int i = 0; i < totalBatches; ++i) {
free(allBatches[i]);
}
free(allBatches);
free(argsThread); free(argsThread);
free(tids); free(tids);
for (int i = 0; i < numThreads; ++i) {
free(linesThread[i]);
}
free(linesThread);
taos_close(taos); taos_close(taos);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册