diff --git a/tests/examples/c/schemaless.c b/tests/examples/c/schemaless.c index 830e59880198b0ef2c21cd78448fd6cb80f98b2f..6d1b257d4c495e72103dad9613f6e1bc3738bfd8 100644 --- a/tests/examples/c/schemaless.c +++ b/tests/examples/c/schemaless.c @@ -8,8 +8,9 @@ #include #include +int numThreads = 8; int numSuperTables = 8; -int numChildTables = 4; +int numChildTablesPerThread = 4; int numRowsPerChildTable = 2048; void shuffle(char** lines, size_t n) { @@ -24,12 +25,37 @@ void shuffle(char** lines, size_t n) { } } +void printThreadId(pthread_t id, char* buf) +{ + size_t i; + for (i = sizeof(i); i; --i) + sprintf(buf + strlen(buf), "%02x", *(((unsigned char*) &id) + i - 1)); +} + static int64_t getTimeInUs() { struct timeval systemTime; gettimeofday(&systemTime, NULL); return (int64_t)systemTime.tv_sec * 1000000L + (int64_t)systemTime.tv_usec; } +typedef struct { + TAOS* taos; + char** lines; + int numLines; +} SThreadInsertArgs; + +static void* insertLines(void* args) { + SThreadInsertArgs* insertArgs = (SThreadInsertArgs*) args; + char tidBuf[32] = {0}; + printThreadId(pthread_self(), tidBuf); + printf("%s, thread: 0x%s\n", "begin taos_insert_lines", tidBuf); + int64_t begin = getTimeInUs(); + int32_t code = taos_insert_lines(insertArgs->taos, insertArgs->lines, insertArgs->numLines); + int64_t end = getTimeInUs(); + printf("code: %d, %s. time used:%"PRId64", thread: 0x%s\n", code, tstrerror(code), end - begin, tidBuf); + return NULL; +} + int main(int argc, char* argv[]) { TAOS_RES* result; const char* host = "127.0.0.1"; @@ -60,25 +86,77 @@ int main(int argc, char* argv[]) { int64_t ts = ct * 1000; char* lineFormat = "sta%d,t0=true,t1=127i8,t2=32767i16,t3=%di32,t4=9223372036854775807i64,t9=11.12345f32,t10=22.123456789f64,t11=\"binaryTagValue\",t12=L\"ncharTagValue\" c0=true,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=254u8,c6=32770u16,c7=2147483699u32,c8=9223372036854775899u64,c9=11.12345f32,c10=22.123456789f64,c11=\"binaryValue\",c12=L\"ncharValue\" %lldms"; - char** lines = calloc(numSuperTables * numChildTables * numRowsPerChildTable, sizeof(char*)); - int l = 0; - for (int i = 0; i < numSuperTables; ++i) { - for (int j = 0; j < numChildTables; ++j) { - for (int k = 0; k < numRowsPerChildTable; ++k) { - char* line = calloc(512, 1); - snprintf(line, 512, lineFormat, i, j, ts + 10 * l); - lines[l] = line; - ++l; + { + char** linesStb = calloc(numSuperTables, sizeof(char*)); + for (int i = 0; i < numSuperTables; i++) { + char* lineStb = calloc(512, 1); + snprintf(lineStb, 512, lineFormat, i, + numThreads * numSuperTables * numChildTablesPerThread, + ts + numThreads * numSuperTables * numChildTablesPerThread * numRowsPerChildTable); + linesStb[i] = lineStb; + } + SThreadInsertArgs args = {0}; + args.taos = taos; + args.lines = linesStb; + args.numLines = numSuperTables; + insertLines(&args); + for (int i = 0; i < numSuperTables; ++i) { + free(linesStb[i]); + } + free(linesStb); + } + + printf("generate lines...\n"); + char*** linesThread = calloc(numThreads, sizeof(char**)); + for (int i = 0; i < numThreads; ++i) { + char** lines = calloc(numSuperTables * numChildTablesPerThread * numRowsPerChildTable, sizeof(char*)); + linesThread[i] = lines; + } + + for (int t = 0; t < numThreads; ++t) { + int l = 0; + char** lines = linesThread[t]; + for (int i = 0; i < numSuperTables; ++i) { + for (int j = 0; j < numChildTablesPerThread; ++j) { + for (int k = 0; k < numRowsPerChildTable; ++k) { + int stIdx = i; + int ctIdx = t*numSuperTables*numChildTablesPerThread + j; + char* line = calloc(512, 1); + snprintf(line, 512, lineFormat, stIdx, ctIdx, ts + 10 * l); + lines[l] = line; + ++l; + } } } } - //shuffle(lines, numSuperTables * numChildTables * numRowsPerChildTable); - printf("%s\n", "begin taos_insert_lines"); - int64_t begin = getTimeInUs(); - int32_t code = taos_insert_lines(taos, lines, numSuperTables * numChildTables * numRowsPerChildTable); - int64_t end = getTimeInUs(); - printf("code: %d, %s. time used: %" PRId64 "\n", code, tstrerror(code), end - begin); + printf("shuffle lines...\n"); + for (int t = 0; t < numThreads; ++t) { + shuffle(linesThread[t], numSuperTables * numChildTablesPerThread * numRowsPerChildTable); + } + + printf("begin multi-thread insertion..."); + pthread_t* tids = calloc(numThreads, sizeof(pthread_t)); + SThreadInsertArgs* argsThread = calloc(numThreads, sizeof(SThreadInsertArgs)); + for (int i=0; i < numThreads; ++i) { + argsThread[i].lines = linesThread[i]; + argsThread[i].taos = taos; + argsThread[i].numLines = numSuperTables * numChildTablesPerThread * numRowsPerChildTable; + pthread_create(tids+i, NULL, insertLines, argsThread+i); + } + + + for (int i = 0; i < numThreads; ++i) { + pthread_join(tids[i], NULL); + } + + free(argsThread); + free(tids); + for (int i = 0; i < numThreads; ++i) { + free(linesThread[i]); + } + free(linesThread); + taos_close(taos); return 0; }