From 8eb76f2f34d95a0f5100a147c3d26f569300b9b6 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 3 Sep 2021 15:18:17 +0800 Subject: [PATCH] [TD-6495]:improve performance test --- tests/examples/c/schemaless.c | 140 ++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 65 deletions(-) diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c index 43e9e16978..a254e3b786 100644 --- a/tests/examples/c/schemaless.c +++ b/tests/examples/c/schemaless.c @@ -8,22 +8,15 @@ #include #include +#define MAX_THREAD_LINE_BATCHES 256 + int numThreads = 8; -int numSuperTables = 8; -int numChildTables = 4; // per thread, per super table -int numRowsPerChildTable = 2048; - -void shuffle(char** lines, size_t n) { - if (n > 1) { - size_t i; - for (i = 0; i < n - 1; i++) { - size_t j = i + rand() / (RAND_MAX / (n - i) + 1); - char* t = lines[j]; - lines[j] = lines[i]; - lines[i] = t; - } - } -} + +int numSuperTables = 1; +int numChildTables = 1024; +int numRowsPerChildTable = 16384; + +int maxLinesPerBatch = 16384; void printThreadId(pthread_t id, char* buf) { @@ -38,10 +31,15 @@ static int64_t getTimeInUs() { return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; } -typedef struct { - TAOS* taos; +typedef struct { char** lines; int numLines; +} SThreadLinesBatch; + +typedef struct { + TAOS* taos; + int numBatches; + SThreadLinesBatch batches[MAX_THREAD_LINE_BATCHES]; int64_t costTime; } SThreadInsertArgs; @@ -49,12 +47,15 @@ static void* insertLines(void* args) { SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args; char tidBuf[32] = {0}; printThreadId(pthread_self(), tidBuf); - printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); - int64_t begin = getTimeInUs(); - int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines); - int64_t end = getTimeInUs(); - insertArgs->costTime = end-begin; - printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); + for (int i = 0; i < insertArgs->numBatches; ++i) { + SThreadLinesBatch* batch = insertArgs->batches + i; + printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); + int64_t begin = getTimeInUs(); + int32_t code = taos_insert_lines(insertArgs->taos, batch->lines, batch->numLines); + int64_t end = getTimeInUs(); + insertArgs->costTime += end - begin; + printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); + } return NULL; } @@ -71,6 +72,11 @@ int main(int argc, char* argv[]) { exit(1); } + if (numThreads * MAX_THREAD_LINE_BATCHES* maxLinesPerBatch < numSuperTables*numChildTables*numRowsPerChildTable) { + printf("too many rows to be handle by threads"); + exit(2); + } + char* info = taos_get_server_info(taos); printf("server info: %s\n", info); info = taos_get_client_info(taos); @@ -78,7 +84,7 @@ int main(int argc, char* argv[]) { result = taos_query(taos, "drop database if exists db;"); taos_free_result(result); usleep(100000); - result = taos_query(taos, "create database db precision 'ms';"); + result = taos_query(taos, "create database db precision 'us';"); taos_free_result(result); usleep(100000); @@ -93,14 +99,14 @@ int main(int argc, char* argv[]) { for (int i = 0; i < numSuperTables; i++) { char* lineStb = calloc(512, 1); snprintf(lineStb, 512, lineFormat, i, - numThreads * numSuperTables * numChildTables, - ts + numThreads * numSuperTables * numChildTables * numRowsPerChildTable); + numSuperTables * numChildTables, + ts + numSuperTables * numChildTables * numRowsPerChildTable); linesStb[i] = lineStb; } SThreadInsertArgs args = {0}; args.taos = taos; - args.lines = linesStb; - args.numLines = numSuperTables; + args.batches[0].lines = linesStb; + args.batches[0].numLines = numSuperTables; insertLines(&args); for (int i = 0; i < numSuperTables; ++i) { free(linesStb[i]); @@ -109,66 +115,70 @@ int main(int argc, char* argv[]) { } printf("generate lines...\n"); - char*** linesThread = calloc(numThreads, sizeof(char**)); + pthread_t* tids = calloc(numThreads, sizeof(pthread_t)); + SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs)); for (int i = 0; i < numThreads; ++i) { - char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); - linesThread[i] = lines; + argsThread[i].taos = taos; + argsThread[i].numBatches = 0; } - for (int t = 0; t < numThreads; ++t) { - int l = 0; - char** lines = linesThread[t]; - for (int i = 0; i < numSuperTables; ++i) { - for (int j = 0; j < numChildTables; ++j) { - for (int k = 0; k < numRowsPerChildTable; ++k) { - int stIdx = i; - int ctIdx = t*numSuperTables*numChildTables + j; - char* line = calloc(512, 1); - snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l); - lines[l] = line; - ++l; - } - } - } + int64_t totalLines = numSuperTables * numChildTables * numRowsPerChildTable; + int totalBatches = (int) ((totalLines) / maxLinesPerBatch); + if (totalLines % maxLinesPerBatch != 0) { + totalBatches += 1; } - printf("shuffle lines...\n"); - for (int t = 0; t < numThreads; ++t) { - shuffle(linesThread[t], numSuperTables * numChildTables * numRowsPerChildTable); + char*** allBatches = calloc(totalBatches, sizeof(char**)); + for (int i = 0; i < totalBatches; ++i) { + allBatches[i] = calloc(maxLinesPerBatch, sizeof(char*)); + int threadNo = i % numThreads; + int batchNo = i / numThreads; + argsThread[threadNo].batches[batchNo].lines = allBatches[i]; + argsThread[threadNo].numBatches = batchNo + 1; + } + + int l = 0; + for (int i = 0; i < numSuperTables; ++i) { + for (int j = 0; j < numChildTables; ++j) { + for (int k = 0; k < numRowsPerChildTable; ++k) { + int stIdx = i; + int ctIdx = numSuperTables*numChildTables + j; + char* line = calloc(512, 1); + snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l); + int batchNo = l / maxLinesPerBatch; + int lineNo = l % maxLinesPerBatch; + allBatches[batchNo][lineNo] = line; + argsThread[batchNo % numThreads].batches[batchNo/numThreads].numLines = lineNo + 1; + ++l; + } + } } printf("begin multi-thread insertion...\n"); int64_t begin = taosGetTimestampUs(); - pthread_t* tids = calloc(numThreads, sizeof(pthread_t)); - SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs)); + for (int i=0; i < numThreads; ++i) { - argsThread[i].lines = linesThread[i]; - argsThread[i].taos = taos; - argsThread[i].numLines = numSuperTables * numChildTables * numRowsPerChildTable; pthread_create(tids+i, NULL, insertLines, argsThread+i); } - for (int i = 0; i < numThreads; ++i) { pthread_join(tids[i], NULL); } int64_t end = taosGetTimestampUs(); - int totalLines = numThreads*numSuperTables*numChildTables*numRowsPerChildTable; - printf("TOTAL LINES: %d\n", totalLines); + uint64_t linesNum = numSuperTables*numChildTables*numRowsPerChildTable; + printf("TOTAL LINES: %zu\n", linesNum); printf("THREADS: %d\n", numThreads); - int64_t sumTime = 0; - for (int i=0; i