From ec3c5d5bef753dd623e7f1a8e5d91027bf6a0ebd Mon Sep 17 00:00:00 2001 From: Yang Zhao Date: Thu, 21 Oct 2021 10:54:53 +0800 Subject: [PATCH] [TD-10465]taosdemo support schemaless insertion (#8341) * [TD-10465]taosdemo support schemaless * fix performance output * tasodemo sml interlace * sml interlace * sml interlace * fix memory leak * taosdemo sml interlace * fix core dump --- src/kit/taosdemo/taosdemo.c | 758 ++++++++++++++++++++++++++++++++---- 1 file changed, 676 insertions(+), 82 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 2bc9665d79..caafcd9c9d 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -147,6 +147,7 @@ enum enum_TAOS_INTERFACE { TAOSC_IFACE, REST_IFACE, STMT_IFACE, + SML_IFACE, INTERFACE_BUT }; @@ -504,6 +505,7 @@ typedef struct SThreadInfo_S { uint64_t querySeq; // sequence number of sql command TAOS_SUB* tsub; + char** lines; int sockfd; } threadInfo; @@ -1055,6 +1057,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->iface = REST_IFACE; } else if (0 == strcasecmp(argv[i+1], "stmt")) { arguments->iface = STMT_IFACE; + } else if (0 == strcasecmp(argv[i+1], "sml")) { + arguments->iface = SML_IFACE; } else { errorWrongValue(argv[0], "-I", argv[i+1]); exit(EXIT_FAILURE); @@ -1067,6 +1071,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->iface = REST_IFACE; } else if (0 == strcasecmp((char *)(argv[i] + strlen("--interface=")), "stmt")) { arguments->iface = STMT_IFACE; + } else if (0 == strcasecmp((char *)(argv[i] + strlen("--interface=")), "sml")) { + arguments->iface = SML_IFACE; } else { errorPrintReqArg3(argv[0], "--interface"); exit(EXIT_FAILURE); @@ -1078,6 +1084,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->iface = REST_IFACE; } else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "stmt")) { arguments->iface = STMT_IFACE; + } else if (0 == strcasecmp((char *)(argv[i] + strlen("-I")), "sml")) { + arguments->iface = SML_IFACE; } else { errorWrongValue(argv[0], "-I", (char *)(argv[i] + strlen("-I"))); @@ -1094,6 +1102,8 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { arguments->iface = REST_IFACE; } else if (0 == strcasecmp(argv[i+1], "stmt")) { arguments->iface = STMT_IFACE; + } else if (0 == strcasecmp(argv[i+1], "sml")) { + arguments->iface = SML_IFACE; } else { errorWrongValue(argv[0], "--interface", argv[i+1]); exit(EXIT_FAILURE); @@ -2611,7 +2621,8 @@ static int printfInsertMeta() { // first time if no iface specified printf("interface: \033[33m%s\033[0m\n", (g_args.iface==TAOSC_IFACE)?"taosc": - (g_args.iface==REST_IFACE)?"rest":"stmt"); + (g_args.iface==REST_IFACE)?"rest": + (g_args.iface==STMT_IFACE)?"stmt":"sml"); } printf("host: \033[33m%s:%u\033[0m\n", @@ -2737,7 +2748,8 @@ static int printfInsertMeta() { g_Dbs.db[i].superTbls[j].dataSource); printf(" iface: \033[33m%s\033[0m\n", (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": - (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest": + (g_Dbs.db[i].superTbls[j].iface==STMT_IFACE)?"stmt":"sml"); if (g_Dbs.db[i].superTbls[j].childTblLimit > 0) { printf(" childTblLimit: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].childTblLimit); @@ -2936,7 +2948,8 @@ static void printfInsertMetaToFile(FILE* fp) { g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " iface: %s\n", (g_Dbs.db[i].superTbls[j].iface==TAOSC_IFACE)?"taosc": - (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest":"stmt"); + (g_Dbs.db[i].superTbls[j].iface==REST_IFACE)?"rest": + (g_Dbs.db[i].superTbls[j].iface==STMT_IFACE)?"stmt":"sml"); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); fprintf(fp, " interlace rows: %u\n", @@ -4467,6 +4480,10 @@ int createDatabasesAndStables(char *command) { int validStbCount = 0; for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) { + if (g_Dbs.db[i].superTbls[j].iface == SML_IFACE) { + goto skip; + } + sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].stbName); ret = queryDbExec(taos, command, NO_INSERT_TYPE, true); @@ -4488,6 +4505,7 @@ int createDatabasesAndStables(char *command) { continue; } } + skip: validStbCount ++; } g_Dbs.db[i].superTblCount = validStbCount; @@ -5667,6 +5685,9 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { g_Dbs.db[i].superTbls[j].iface= REST_IFACE; } else if (0 == strcasecmp(stbIface->valuestring, "stmt")) { g_Dbs.db[i].superTbls[j].iface= STMT_IFACE; + } else if (0 == strcasecmp(stbIface->valuestring, "sml")) { + g_Dbs.db[i].superTbls[j].iface= SML_IFACE; + g_args.iface = SML_IFACE; } else { errorPrint("failed to read json, insert_mode %s not recognized\n", stbIface->valuestring); @@ -7007,7 +7028,7 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) { int32_t affectedRows; SSuperTable* stbInfo = pThreadInfo->stbInfo; - + int32_t code; uint16_t iface; if (stbInfo) iface = stbInfo->iface; @@ -7059,7 +7080,19 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k) } affectedRows = k; break; - + case SML_IFACE: + code = taos_schemaless_insert(pThreadInfo->taos, pThreadInfo->lines, k, 0, pThreadInfo->time_precision == TSDB_TIME_PRECISION_MILLI + ? "ms" + : (pThreadInfo->time_precision == TSDB_TIME_PRECISION_MICRO + ? "us" + : "ns")); + if (code) { + errorPrint2("%s() LN%d, failed to execute schemaless insert. reason: %s\n", + __func__, __LINE__, tstrerror(code)); + exit(EXIT_FAILURE); + } + affectedRows = k; + break; default: errorPrint2("%s() LN%d: unknown insert mode: %d\n", __func__, __LINE__, stbInfo->iface); @@ -9545,6 +9578,441 @@ free_of_interlace_stmt: #endif +static void generateSmlHead(char* smlHead, SSuperTable* stbInfo, threadInfo* pThreadInfo, int tbSeq) { + int64_t dataLen = 0; + dataLen += snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "%s,id=%s%" PRIu64 "", stbInfo->stbName, + stbInfo->childTblPrefix, + tbSeq + pThreadInfo->start_table_from); + for (int j = 0; j < stbInfo->tagCount; j++) { + tstrncpy(smlHead + dataLen, ",", 2); + dataLen += 1; + switch (stbInfo->tags[j].data_type) { + case TSDB_DATA_TYPE_TIMESTAMP: + errorPrint2( + "%s() LN%d, Does not support data type %s as tag\n", + __func__, __LINE__, stbInfo->tags[j].dataType); + exit(EXIT_FAILURE); + case TSDB_DATA_TYPE_BOOL: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%s", j, rand_bool_str()); + break; + case TSDB_DATA_TYPE_TINYINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%si8", j, rand_tinyint_str()); + break; + case TSDB_DATA_TYPE_UTINYINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%su8", j, rand_utinyint_str()); + break; + case TSDB_DATA_TYPE_SMALLINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%si16", j, rand_smallint_str()); + break; + case TSDB_DATA_TYPE_USMALLINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%su16", j, rand_usmallint_str()); + break; + case TSDB_DATA_TYPE_INT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%si32", j, rand_int_str()); + break; + case TSDB_DATA_TYPE_UINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%su32", j, rand_uint_str()); + break; + case TSDB_DATA_TYPE_BIGINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%si64", j, rand_bigint_str()); + break; + case TSDB_DATA_TYPE_UBIGINT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%su64", j, rand_ubigint_str()); + break; + case TSDB_DATA_TYPE_FLOAT: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%sf32", j, rand_float_str()); + break; + case TSDB_DATA_TYPE_DOUBLE: + dataLen += + snprintf(smlHead + dataLen, HEAD_BUFF_LEN - dataLen, + "T%d=%sf64", j, rand_double_str()); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (stbInfo->tags[j].dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint2( + "binary or nchar length overflow, maxsize:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + exit(EXIT_FAILURE); + } + char *buf = (char *)calloc(stbInfo->tags[j].dataLen + 1, 1); + if (NULL == buf) { + errorPrint2("calloc failed! size:%d\n", + stbInfo->tags[j].dataLen); + exit(EXIT_FAILURE); + } + rand_string(buf, stbInfo->tags[j].dataLen); + if (stbInfo->tags[j].data_type == TSDB_DATA_TYPE_BINARY) { + dataLen += snprintf(smlHead + dataLen, + HEAD_BUFF_LEN - dataLen, + "T%d=\"%s\"", j, buf); + } else { + dataLen += snprintf(smlHead + dataLen, + HEAD_BUFF_LEN - dataLen, + "T%d=L\"%s\"", j, buf); + } + tmfree(buf); + break; + + default: + errorPrint2("%s() LN%d, Unknown data type %s\n", __func__, + __LINE__, stbInfo->tags[j].dataType); + exit(EXIT_FAILURE); + } + } +} + +static void generateSmlTail(char* line, char* smlHead, SSuperTable* stbInfo, + threadInfo* pThreadInfo, int64_t timestamp) { + int dataLen = 0; + dataLen = snprintf(line, BUFFER_SIZE, "%s ", smlHead); + for (uint32_t c = 0; c < stbInfo->columnCount; c++) { + if (c != 0) { + tstrncpy(line + dataLen, ",", 2); + dataLen += 1; + } + switch (stbInfo->columns[c].data_type) { + case TSDB_DATA_TYPE_TIMESTAMP: + errorPrint2( + "%s() LN%d, Does not support data type %s as tag\n", + __func__, __LINE__, stbInfo->columns[c].dataType); + exit(EXIT_FAILURE); + case TSDB_DATA_TYPE_BOOL: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, "c%d=%s", + c, rand_bool_str()); + break; + case TSDB_DATA_TYPE_TINYINT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, "c%d=%si8", + c, rand_tinyint_str()); + break; + case TSDB_DATA_TYPE_UTINYINT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, "c%d=%su8", + c, rand_utinyint_str()); + break; + case TSDB_DATA_TYPE_SMALLINT: + dataLen += snprintf( + line + dataLen, BUFFER_SIZE - dataLen, + "c%d=%si16", c, rand_smallint_str()); + break; + case TSDB_DATA_TYPE_USMALLINT: + dataLen += snprintf( + line + dataLen, BUFFER_SIZE - dataLen, + "c%d=%su16", c, rand_usmallint_str()); + break; + case TSDB_DATA_TYPE_INT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%si32", c, rand_int_str()); + break; + case TSDB_DATA_TYPE_UINT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%su32", c, rand_uint_str()); + break; + case TSDB_DATA_TYPE_BIGINT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%si64", c, rand_bigint_str()); + break; + case TSDB_DATA_TYPE_UBIGINT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%su64", c, rand_ubigint_str()); + break; + case TSDB_DATA_TYPE_FLOAT: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%sf32", c, rand_float_str()); + break; + case TSDB_DATA_TYPE_DOUBLE: + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=%sf64", c, rand_double_str()); + break; + case TSDB_DATA_TYPE_BINARY: + case TSDB_DATA_TYPE_NCHAR: + if (stbInfo->columns[c].dataLen > TSDB_MAX_BINARY_LEN) { + errorPrint2( + "binary or nchar length overflow, maxsize:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); + exit(EXIT_FAILURE); + } + char *buf = + (char *)calloc(stbInfo->columns[c].dataLen + 1, 1); + if (NULL == buf) { + errorPrint2("calloc failed! size:%d\n", + stbInfo->columns[c].dataLen); + exit(EXIT_FAILURE); + } + rand_string(buf, stbInfo->columns[c].dataLen); + if (stbInfo->columns[c].data_type == + TSDB_DATA_TYPE_BINARY) { + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=\"%s\"", c, buf); + } else { + dataLen += snprintf(line + dataLen, + BUFFER_SIZE - dataLen, + "c%d=L\"%s\"", c, buf); + } + tmfree(buf); + break; + default: + errorPrint2("%s() LN%d, Unknown data type %s\n", + __func__, __LINE__, + stbInfo->columns[c].dataType); + exit(EXIT_FAILURE); + } + } + dataLen += snprintf(line + dataLen, BUFFER_SIZE - dataLen," %" PRId64 "", timestamp); +} + +static void* syncWriteInterlaceSml(threadInfo *pThreadInfo, uint32_t interlaceRows) { + debugPrint("[%d] %s() LN%d: ### interlace schemaless write\n", + pThreadInfo->threadID, __func__, __LINE__); + int64_t insertRows; + uint64_t maxSqlLen; + int64_t timeStampStep; + uint64_t insert_interval; + + SSuperTable* stbInfo = pThreadInfo->stbInfo; + + if (stbInfo) { + insertRows = stbInfo->insertRows; + maxSqlLen = stbInfo->maxSqlLen; + timeStampStep = stbInfo->timeStampStep; + insert_interval = stbInfo->insertInterval; + } else { + insertRows = g_args.insertRows; + maxSqlLen = g_args.max_sql_len; + timeStampStep = g_args.timestamp_step; + insert_interval = g_args.insert_interval; + } + + debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, + pThreadInfo->start_table_from, + pThreadInfo->ntables, insertRows); + + if (interlaceRows > g_args.reqPerReq) + interlaceRows = g_args.reqPerReq; + + uint32_t batchPerTbl = interlaceRows; + uint32_t batchPerTblTimes; + + if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { + batchPerTblTimes = + g_args.reqPerReq / interlaceRows; + } else { + batchPerTblTimes = 1; + } + + char *smlHead[pThreadInfo->ntables]; + for (int t = 0; t < pThreadInfo->ntables; t++) { + smlHead[t] = (char *)calloc(HEAD_BUFF_LEN, 1); + if (NULL == smlHead[t]) { + errorPrint2("calloc failed! size:%d\n", HEAD_BUFF_LEN); + exit(EXIT_FAILURE); + } + generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t); + + } + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + uint64_t st = 0; + uint64_t et = UINT64_MAX; + + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + uint64_t endTs; + + uint64_t tableSeq = pThreadInfo->start_table_from; + int64_t startTime = pThreadInfo->start_time; + + uint64_t generatedRecPerTbl = 0; + bool flagSleep = true; + uint64_t sleepTimeTotal = 0; + + int percentComplete = 0; + int64_t totalRows = insertRows * pThreadInfo->ntables; + + pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *)); + if (NULL == pThreadInfo->lines) { + errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n", + g_args.reqPerReq * sizeof(char *), + strerror(errno)); + return NULL; + } + + while(pThreadInfo->totalInsertRows < pThreadInfo->ntables * insertRows) { + if ((flagSleep) && (insert_interval)) { + st = taosGetTimestampMs(); + flagSleep = false; + } + + // generate data + + uint32_t recOfBatch = 0; + + for (uint64_t i = 0; i < batchPerTblTimes; i++) { + int64_t timestamp = startTime; + for (int j = recOfBatch; j < recOfBatch + batchPerTbl; j++) { + pThreadInfo->lines[j] = calloc(BUFFER_SIZE, 1); + if (NULL == pThreadInfo->lines[j]) { + errorPrint2("Failed to alloc %d bytes, reason:%s\n", + BUFFER_SIZE, strerror(errno)); + } + generateSmlTail(pThreadInfo->lines[j], smlHead[i], stbInfo, pThreadInfo, timestamp); + timestamp += timeStampStep; + } + tableSeq ++; + recOfBatch += batchPerTbl; + + pThreadInfo->totalInsertRows += batchPerTbl; + + verbosePrint("[%d] %s() LN%d batchPerTbl=%d recOfBatch=%d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl, recOfBatch); + + if (tableSeq == pThreadInfo->start_table_from + pThreadInfo->ntables) { + // turn to first table + tableSeq = pThreadInfo->start_table_from; + generatedRecPerTbl += batchPerTbl; + + startTime = pThreadInfo->start_time + + generatedRecPerTbl * timeStampStep; + + flagSleep = true; + if (generatedRecPerTbl >= insertRows) + break; + + int64_t remainRows = insertRows - generatedRecPerTbl; + if ((remainRows > 0) && (batchPerTbl > remainRows)) + batchPerTbl = remainRows; + + if (pThreadInfo->ntables * batchPerTbl < g_args.reqPerReq) + break; + } + + verbosePrint("[%d] %s() LN%d generatedRecPerTbl=%"PRId64" insertRows=%"PRId64"\n", + pThreadInfo->threadID, __func__, __LINE__, + generatedRecPerTbl, insertRows); + + if ((g_args.reqPerReq - recOfBatch) < batchPerTbl) + break; + } + + verbosePrint("[%d] %s() LN%d recOfBatch=%d totalInsertRows=%"PRIu64"\n", + pThreadInfo->threadID, __func__, __LINE__, recOfBatch, + pThreadInfo->totalInsertRows); + verbosePrint("[%d] %s() LN%d, buffer=%s\n", + pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer); + + startTs = taosGetTimestampUs(); + + if (recOfBatch == 0) { + errorPrint2("[%d] %s() LN%d Failed to insert records of batch %d\n", + pThreadInfo->threadID, __func__, __LINE__, + batchPerTbl); + if (batchPerTbl > 0) { + errorPrint("\tIf the batch is %d, the length of the SQL to insert a row must be less then %"PRId64"\n", + batchPerTbl, maxSqlLen / batchPerTbl); + } + errorPrint("\tPlease check if the buffer length(%"PRId64") or batch(%d) is set with proper value!\n", + maxSqlLen, batchPerTbl); + goto free_of_interlace; + } + int64_t affectedRows = execInsert(pThreadInfo, recOfBatch); + + endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + performancePrint("%s() LN%d, insert execution time is %10.2f ms\n", + __func__, __LINE__, delay / 1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + + if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; + if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + if (recOfBatch != affectedRows) { + errorPrint2("[%d] %s() LN%d execInsert insert %d, affected rows: %"PRId64"\n%s\n", + pThreadInfo->threadID, __func__, __LINE__, + recOfBatch, affectedRows, pThreadInfo->buffer); + goto free_of_interlace; + } + + pThreadInfo->totalAffectedRows += affectedRows; + + int currentPercent = pThreadInfo->totalAffectedRows * 100 / totalRows; + if (currentPercent > percentComplete ) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, currentPercent); + percentComplete = currentPercent; + } + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + if ((insert_interval) && flagSleep) { + et = taosGetTimestampMs(); + + if (insert_interval > (et - st) ) { + uint64_t sleepTime = insert_interval - (et -st); + performancePrint("%s() LN%d sleep: %"PRId64" ms for insert interval\n", + __func__, __LINE__, sleepTime); + taosMsleep(sleepTime); // ms + sleepTimeTotal += insert_interval; + } + } + for (int index = 0; index < g_args.reqPerReq; index++) { + free(pThreadInfo->lines[index]); + } + } + if (percentComplete < 100) + printf("[%d]:%d%%\n", pThreadInfo->threadID, percentComplete); + +free_of_interlace: + tmfree(pThreadInfo->lines); + for (int index = 0; index < pThreadInfo->ntables; index++) { + free(smlHead[index]); + } + printStatPerThread(pThreadInfo); + return NULL; +} + // sync write interlace data static void* syncWriteInterlace(threadInfo *pThreadInfo, uint32_t interlaceRows) { debugPrint("[%d] %s() LN%d: ### interlace write\n", @@ -9947,6 +10415,120 @@ free_of_stmt_progressive: printStatPerThread(pThreadInfo); return NULL; } + +static void* syncWriteProgressiveSml(threadInfo *pThreadInfo) { + debugPrint("%s() LN%d: ### sml progressive write\n", __func__, __LINE__); + + SSuperTable* stbInfo = pThreadInfo->stbInfo; + int64_t timeStampStep = + stbInfo?stbInfo->timeStampStep:g_args.timestamp_step; + int64_t insertRows = + (stbInfo)?stbInfo->insertRows:g_args.insertRows; + verbosePrint("%s() LN%d insertRows=%"PRId64"\n", + __func__, __LINE__, insertRows); + + uint64_t lastPrintTime = taosGetTimestampMs(); + + pThreadInfo->totalInsertRows = 0; + pThreadInfo->totalAffectedRows = 0; + + pThreadInfo->samplePos = 0; + + char *smlHead[pThreadInfo->ntables]; + for (int t = 0; t < pThreadInfo->ntables; t++) { + smlHead[t] = (char *)calloc(HEAD_BUFF_LEN, 1); + if (NULL == smlHead[t]) { + errorPrint2("calloc failed! size:%d\n", HEAD_BUFF_LEN); + exit(EXIT_FAILURE); + } + generateSmlHead(smlHead[t], stbInfo, pThreadInfo, t); + + } + int currentPercent = 0; + int percentComplete = 0; + + if (insertRows < g_args.reqPerReq) { + g_args.reqPerReq = insertRows; + } + pThreadInfo->lines = calloc(g_args.reqPerReq, sizeof(char *)); + if (NULL == pThreadInfo->lines) { + errorPrint2("Failed to alloc %"PRIu64" bytes, reason:%s\n", + g_args.reqPerReq * sizeof(char *), + strerror(errno)); + return NULL; + } + + for (uint64_t i = 0; i < pThreadInfo->ntables; i++) { + int64_t timestamp = pThreadInfo->start_time; + + for (uint64_t j = 0; j < insertRows;) { + for (int k = 0; k < g_args.reqPerReq; k++) { + pThreadInfo->lines[k] = calloc(BUFFER_SIZE, 1); + if (NULL == pThreadInfo->lines[k]) { + errorPrint2("Failed to alloc %d bytes, reason:%s\n", + BUFFER_SIZE, strerror(errno)); + } + generateSmlTail(pThreadInfo->lines[k], smlHead[i], stbInfo, pThreadInfo, timestamp); + timestamp += timeStampStep; + j++; + if (j == insertRows) { + break; + } + } + uint64_t startTs = taosGetTimestampUs(); + int32_t affectedRows = execInsert(pThreadInfo, g_args.reqPerReq); + uint64_t endTs = taosGetTimestampUs(); + uint64_t delay = endTs - startTs; + + performancePrint("%s() LN%d, insert execution time is %10.f ms\n", + __func__, __LINE__, delay/1000.0); + verbosePrint("[%d] %s() LN%d affectedRows=%d\n", + pThreadInfo->threadID, + __func__, __LINE__, affectedRows); + + if (delay > pThreadInfo->maxDelay){ + pThreadInfo->maxDelay = delay; + } + if (delay < pThreadInfo->minDelay){ + pThreadInfo->minDelay = delay; + } + pThreadInfo->cntDelay++; + pThreadInfo->totalDelay += delay; + + pThreadInfo->totalAffectedRows += affectedRows; + pThreadInfo->totalInsertRows += g_args.reqPerReq; + currentPercent = + pThreadInfo->totalAffectedRows * g_Dbs.threadCount / insertRows; + if (currentPercent > percentComplete) { + printf("[%d]:%d%%\n", pThreadInfo->threadID, + currentPercent); + percentComplete = currentPercent; + } + + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 30*1000) { + printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", + pThreadInfo->threadID, + pThreadInfo->totalInsertRows, + pThreadInfo->totalAffectedRows); + lastPrintTime = currentPrintTime; + } + + for (int index = 0; index < g_args.reqPerReq; index++) { + free(pThreadInfo->lines[index]); + } + if (j == insertRows) { + break; + } + } + } + tmfree(pThreadInfo->lines); + for (int index = 0; index < pThreadInfo->ntables; index++) { + free(smlHead[index]); + } + return NULL; +} + // sync insertion progressive data static void* syncWriteProgressive(threadInfo *pThreadInfo) { debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); @@ -10152,6 +10734,8 @@ static void* syncWrite(void *sarg) { #else return syncWriteInterlaceStmt(pThreadInfo, interlaceRows); #endif + } else if (SML_IFACE == stbInfo->iface) { + return syncWriteInterlaceSml(pThreadInfo, interlaceRows); } else { return syncWriteInterlace(pThreadInfo, interlaceRows); } @@ -10161,6 +10745,9 @@ static void* syncWrite(void *sarg) { if (((stbInfo) && (STMT_IFACE == stbInfo->iface)) || (STMT_IFACE == g_args.iface)) { return syncWriteProgressiveStmt(pThreadInfo); + } else if (((stbInfo) && (SML_IFACE == stbInfo->iface)) + || (SML_IFACE == g_args.iface)) { + return syncWriteProgressiveSml(pThreadInfo); } else { return syncWriteProgressive(pThreadInfo); } @@ -10318,7 +10905,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, // read sample data from file first int ret; - if (stbInfo) { + if (stbInfo && stbInfo->iface != SML_IFACE) { ret = prepareSampleForStb(stbInfo); } else { ret = prepareSampleForNtb(); @@ -10341,72 +10928,76 @@ static void startMultiThreadInsertData(int threads, char* db_name, int64_t ntables = 0; uint64_t tableFrom; - + if (stbInfo) { - int64_t limit; - uint64_t offset; + if (stbInfo->iface != SML_IFACE) { + int64_t limit; + uint64_t offset; - if ((NULL != g_args.sqlFile) - && (stbInfo->childTblExists == TBL_NO_EXISTS) - && ((stbInfo->childTblOffset != 0) - || (stbInfo->childTblLimit >= 0))) { - printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); - } + if ((NULL != g_args.sqlFile) + && (stbInfo->childTblExists == TBL_NO_EXISTS) + && ((stbInfo->childTblOffset != 0) + || (stbInfo->childTblLimit >= 0))) { + printf("WARNING: offset and limit will not be used since the child tables not exists!\n"); + } - if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) { - if ((stbInfo->childTblLimit < 0) - || ((stbInfo->childTblOffset - + stbInfo->childTblLimit) - > (stbInfo->childTblCount))) { + if (stbInfo->childTblExists == TBL_ALREADY_EXISTS) { + if ((stbInfo->childTblLimit < 0) + || ((stbInfo->childTblOffset + + stbInfo->childTblLimit) + > (stbInfo->childTblCount))) { - if (stbInfo->childTblCount < stbInfo->childTblOffset) { - printf("WARNING: offset will not be used since the child tables count is less then offset!\n"); + if (stbInfo->childTblCount < stbInfo->childTblOffset) { + printf("WARNING: offset will not be used since the child tables count is less then offset!\n"); - stbInfo->childTblOffset = 0; + stbInfo->childTblOffset = 0; + } + stbInfo->childTblLimit = + stbInfo->childTblCount - stbInfo->childTblOffset; } - stbInfo->childTblLimit = - stbInfo->childTblCount - stbInfo->childTblOffset; + + offset = stbInfo->childTblOffset; + limit = stbInfo->childTblLimit; + } else { + limit = stbInfo->childTblCount; + offset = 0; } - offset = stbInfo->childTblOffset; - limit = stbInfo->childTblLimit; - } else { - limit = stbInfo->childTblCount; - offset = 0; - } + ntables = limit; + tableFrom = offset; - ntables = limit; - tableFrom = offset; + if ((stbInfo->childTblExists != TBL_NO_EXISTS) + && ((stbInfo->childTblOffset + stbInfo->childTblLimit) + > stbInfo->childTblCount)) { + printf("WARNING: specified offset + limit > child table count!\n"); + prompt(); + } - if ((stbInfo->childTblExists != TBL_NO_EXISTS) - && ((stbInfo->childTblOffset + stbInfo->childTblLimit) - > stbInfo->childTblCount)) { - printf("WARNING: specified offset + limit > child table count!\n"); - prompt(); - } + if ((stbInfo->childTblExists != TBL_NO_EXISTS) + && (0 == stbInfo->childTblLimit)) { + printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n"); + prompt(); + } - if ((stbInfo->childTblExists != TBL_NO_EXISTS) - && (0 == stbInfo->childTblLimit)) { - printf("WARNING: specified limit = 0, which cannot find table name to insert or query! \n"); - prompt(); - } + stbInfo->childTblName = (char*)calloc(1, + limit * TSDB_TABLE_NAME_LEN); + if (stbInfo->childTblName == NULL) { + taos_close(taos0); + errorPrint2("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); + exit(EXIT_FAILURE); + } - stbInfo->childTblName = (char*)calloc(1, - limit * TSDB_TABLE_NAME_LEN); - if (stbInfo->childTblName == NULL) { - taos_close(taos0); - errorPrint2("%s() LN%d, alloc memory failed!\n", __func__, __LINE__); - exit(EXIT_FAILURE); + int64_t childTblCount; + getChildNameOfSuperTableWithLimitAndOffset( + taos0, + db_name, stbInfo->stbName, + &stbInfo->childTblName, &childTblCount, + limit, + offset, stbInfo->escapeChar); + ntables = childTblCount; + } else { + ntables = stbInfo->childTblCount; } - - int64_t childTblCount; - getChildNameOfSuperTableWithLimitAndOffset( - taos0, - db_name, stbInfo->stbName, - &stbInfo->childTblName, &childTblCount, - limit, - offset, stbInfo->escapeChar); - ntables = childTblCount; } else { ntables = g_args.ntables; tableFrom = 0; @@ -10990,33 +11581,34 @@ static int insertTestProcess() { double start; double end; - if (g_totalChildTables > 0) { - fprintf(stderr, - "creating %"PRId64" table(s) with %d thread(s)\n\n", - g_totalChildTables, g_Dbs.threadCountForCreateTbl); - if (g_fpOfInsertResult) { - fprintf(g_fpOfInsertResult, - "creating %"PRId64" table(s) with %d thread(s)\n\n", - g_totalChildTables, g_Dbs.threadCountForCreateTbl); - } + if (g_args.iface != SML_IFACE) { + if (g_totalChildTables > 0) { + fprintf(stderr, + "creating %"PRId64" table(s) with %d thread(s)\n\n", + g_totalChildTables, g_Dbs.threadCountForCreateTbl); + if (g_fpOfInsertResult) { + fprintf(g_fpOfInsertResult, + "creating %"PRId64" table(s) with %d thread(s)\n\n", + g_totalChildTables, g_Dbs.threadCountForCreateTbl); + } - // create child tables - start = taosGetTimestampMs(); - createChildTables(); - end = taosGetTimestampMs(); + // create child tables + start = taosGetTimestampMs(); + createChildTables(); + end = taosGetTimestampMs(); - fprintf(stderr, - "\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n", - (end - start)/1000.0, g_totalChildTables, - g_Dbs.threadCountForCreateTbl, g_actualChildTables); - if (g_fpOfInsertResult) { - fprintf(g_fpOfInsertResult, - "\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n", - (end - start)/1000.0, g_totalChildTables, - g_Dbs.threadCountForCreateTbl, g_actualChildTables); + fprintf(stderr, + "\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n", + (end - start)/1000.0, g_totalChildTables, + g_Dbs.threadCountForCreateTbl, g_actualChildTables); + if (g_fpOfInsertResult) { + fprintf(g_fpOfInsertResult, + "\nSpent %.4f seconds to create %"PRId64" table(s) with %d thread(s), actual %"PRId64" table(s) created\n\n", + (end - start)/1000.0, g_totalChildTables, + g_Dbs.threadCountForCreateTbl, g_actualChildTables); + } } } - // create sub threads for inserting data //start = taosGetTimestampMs(); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -12069,10 +12661,12 @@ static void setParaFromArg() { tstrncpy(g_Dbs.db[0].superTbls[0].tags[0].dataType, "INT", min(DATATYPE_BUFF_LEN, strlen("INT") + 1)); + g_Dbs.db[0].superTbls[0].tags[0].data_type = TSDB_DATA_TYPE_INT; g_Dbs.db[0].superTbls[0].tags[0].dataLen = 0; tstrncpy(g_Dbs.db[0].superTbls[0].tags[1].dataType, "BINARY", min(DATATYPE_BUFF_LEN, strlen("BINARY") + 1)); + g_Dbs.db[0].superTbls[0].tags[1].data_type = TSDB_DATA_TYPE_BINARY; g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.binwidth; g_Dbs.db[0].superTbls[0].tagCount = 2; } else { -- GitLab