From 8e455df2976685e806a688bea521d65cb1c2a3ec Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Tue, 16 Mar 2021 18:51:11 +0800 Subject: [PATCH] Hotfix/sangshuduo/td 3327 taosdemo segfault sampledata (#5458) * [TD-3316] : add testcase for taosdemo limit and offset. check offset 0. * [TD-3316] : add testcase for taosdemo limit and offset. fix sample file import bug. * [TD-3316] : add test case for limit and offset. fix sample data issue. * [TD-3327] : fix taosdemo segfault when import data from sample data file. Co-authored-by: Shuduo Sang --- src/kit/taosdemo/insert.json | 3 +- src/kit/taosdemo/taosdemo.c | 420 ++++-------------- tests/pytest/fulltest.sh | 1 + tests/pytest/pytest_3.sh | 1 + .../pytest/tools/insert-tblimit-tboffset.json | 1 - tests/pytest/tools/sampledata.csv | 3 + tests/pytest/tools/taosdemo-sampledata.json | 39 ++ tests/pytest/tools/taosdemoTestLimitOffset.py | 9 + tests/pytest/tools/taosdemoTestSampleData.py | 68 +++ 9 files changed, 203 insertions(+), 342 deletions(-) create mode 100644 tests/pytest/tools/sampledata.csv create mode 100644 tests/pytest/tools/taosdemo-sampledata.json create mode 100644 tests/pytest/tools/taosdemoTestSampleData.py diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index 208b5fcb74..e6b1895043 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -41,8 +41,7 @@ "insert_mode": "taosc", "insert_rows": 100000, "multi_thread_write_one_tbl": "no", - "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, + "rows_per_tbl": 0, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index ce88ba5bef..6b1cf5f657 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -23,7 +23,6 @@ #ifdef LINUX #include - #include #include #ifndef _ALPINE #include @@ -39,11 +38,11 @@ #include #include #else - #include #include #include #endif +#include #include #include "cJSON.h" @@ -221,7 +220,6 @@ typedef struct SSuperTable_S { int childTblOffset; int multiThreadWriteOneTbl; // 0: no, 1: yes - int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl int rowsPerTbl; // int disorderRatio; // 0: no disorder, >0: x% int disorderRange; // ms or us by database precision @@ -396,6 +394,8 @@ typedef struct SThreadInfo_S { uint64_t et; int64_t lastTs; + // sample data + int samplePos; // statistics int64_t totalInsertRows; int64_t totalAffectedRows; @@ -1126,8 +1126,6 @@ static int printfInsertMeta() { }else { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } - printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); printf(" disorderRange: \033[33m%d\033[0m\n", @@ -1287,7 +1285,6 @@ static void printfInsertMetaToFile(FILE* fp) { }else { fprintf(fp, " multiThreadWriteOneTbl: yes\n"); } - fprintf(fp, " numberOfTblInOneSql: %d\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); fprintf(fp, " rowsPerTbl: %d\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); fprintf(fp, " disorderRange: %d\n", g_Dbs.db[i].superTbls[j].disorderRange); fprintf(fp, " disorderRatio: %d\n", g_Dbs.db[i].superTbls[j].disorderRatio); @@ -2335,7 +2332,8 @@ static int createDatabases() { " fsync %d", g_Dbs.db[i].dbCfg.fsync); } if ((0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "ms", strlen("ms"))) - || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, "us", strlen("us")))) { + || (0 == strncasecmp(g_Dbs.db[i].dbCfg.precision, + "us", strlen("us")))) { dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, " precision \'%s\';", g_Dbs.db[i].dbCfg.precision); } @@ -2351,14 +2349,17 @@ static int createDatabases() { debugPrint("%s() %d supertbl count:%d\n", __func__, __LINE__, g_Dbs.db[i].superTblCount); for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // describe super table, if exists - sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName); + sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, + g_Dbs.db[i].superTbls[j].sTblName); verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command); if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) { g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS; - ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); + ret = createSuperTable(taos, g_Dbs.db[i].dbName, + &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric); } else { g_Dbs.db[i].superTbls[j].superTblExists = TBL_ALREADY_EXISTS; - ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j]); + ret = getSuperTableFromServer(taos, g_Dbs.db[i].dbName, + &g_Dbs.db[i].superTbls[j]); } if (0 != ret) { @@ -2715,6 +2716,8 @@ static int readSampleFromCsvFileToMem( continue; } + verbosePrint("readLen=%ld stb->lenOfOneRow=%d getRows=%d\n", readLen, superTblInfo->lenOfOneRow, getRows); + memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); getRows++; @@ -3426,24 +3429,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { printf("ERROR: failed to read json, multiThreadWriteOneTbl not found\n"); goto PARSE_OVER; } - - cJSON* numberOfTblInOneSql = cJSON_GetObjectItem(stbInfo, "number_of_tbl_in_one_sql"); - if (numberOfTblInOneSql && numberOfTblInOneSql->type == cJSON_Number) { - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = numberOfTblInOneSql->valueint; - } else if (!numberOfTblInOneSql) { - g_Dbs.db[i].superTbls[j].numberOfTblInOneSql = 0; - } else { - printf("ERROR: failed to read json, numberOfTblInOneSql not found\n"); - goto PARSE_OVER; - } - cJSON* rowsPerTbl = cJSON_GetObjectItem(stbInfo, "rows_per_tbl"); if (rowsPerTbl && rowsPerTbl->type == cJSON_Number) { g_Dbs.db[i].superTbls[j].rowsPerTbl = rowsPerTbl->valueint; } else if (!rowsPerTbl) { - g_Dbs.db[i].superTbls[j].rowsPerTbl = 1; + g_Dbs.db[i].superTbls[j].rowsPerTbl = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req } else { - printf("ERROR: failed to read json, rowsPerTbl not found\n"); + fprintf(stderr, "ERROR: failed to read json, rowsPerTbl input mistake\n"); goto PARSE_OVER; } @@ -3901,7 +3893,7 @@ PARSE_OVER: return ret; } -void prepareSampleData() { +static void prepareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { @@ -3915,7 +3907,7 @@ void prepareSampleData() { } } -void postFreeResource() { +static void postFreeResource() { tmfclose(g_fpOfInsertResult); for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { @@ -3942,16 +3934,18 @@ void postFreeResource() { static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - int ret = readSampleFromCsvFileToMem(superTblInfo); +/* int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { tmfree(superTblInfo->sampleDataBuf); superTblInfo->sampleDataBuf = NULL; return -1; } +*/ *sampleUsePos = 0; } int dataLen = 0; + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, @@ -3967,12 +3961,14 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper int dataLen = 0; dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); for (int i = 0; i < stbInfo->columnCount; i++) { - if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { + if ((0 == strncasecmp(stbInfo->columns[i].dataType, "binary", 6)) + || (0 == strncasecmp(stbInfo->columns[i].dataType, "nchar", 5))) { if (stbInfo->columns[i].dataLen > TSDB_MAX_BINARY_LEN) { - printf("binary or nchar length overflow, max size:%u\n", (uint32_t)TSDB_MAX_BINARY_LEN); + printf("binary or nchar length overflow, max size:%u\n", + (uint32_t)TSDB_MAX_BINARY_LEN); return (-1); } - + char* buf = (char*)calloc(stbInfo->columns[i].dataLen+1, 1); if (NULL == buf) { printf("calloc failed! size:%d\n", stbInfo->columns[i].dataLen); @@ -3981,15 +3977,24 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper rand_string(buf, stbInfo->columns[i].dataLen); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "\'%s\', ", buf); tmfree(buf); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "int", 3)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_int()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "bigint", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%"PRId64", ", rand_bigint()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "float", 5)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_float()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "double", 6)) { - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%f, ", rand_double()); - } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "smallint", 8)) { + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "int", 3)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%d, ", rand_int()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "bigint", 6)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%"PRId64", ", rand_bigint()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "float", 5)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%f, ", rand_float()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "double", 6)) { + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%f, ", rand_double()); + } else if (0 == strncasecmp(stbInfo->columns[i].dataType, + "smallint", 8)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_smallint()); } else if (0 == strncasecmp(stbInfo->columns[i].dataType, "tinyint", 7)) { dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%d, ", rand_tinyint()); @@ -4009,255 +4014,6 @@ static int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuper return dataLen; } -static void syncWriteForNumberOfTblInOneSql( - threadInfo *winfo, char* sampleDataBuf) { - SSuperTable* superTblInfo = winfo->superTblInfo; - - int samplePos = 0; - - //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); - int64_t lastPrintTime = taosGetTimestampMs(); - - char* buffer = calloc(superTblInfo->maxSqlLen+1, 1); - if (NULL == buffer) { - printf("========calloc size[ %d ] fail!\n", superTblInfo->maxSqlLen); - return; - } - - int32_t numberOfTblInOneSql = superTblInfo->numberOfTblInOneSql; - int32_t tbls = winfo->end_table_id - winfo->start_table_id + 1; - if (numberOfTblInOneSql > tbls) { - numberOfTblInOneSql = tbls; - } - - uint64_t time_counter = winfo->start_time; - int sampleUsePos; - - int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; - int64_t st = 0; - int64_t et = 0xffffffff; - - int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; - for (int i = 0; i < insertRows;) { - int32_t tbl_id = 0; - for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) { - int64_t start_time = 0; - int inserted = i; - - for (int k = 0; k < g_args.num_of_RPR;) { - int len = 0; - memset(buffer, 0, superTblInfo->maxSqlLen); - char *pstr = buffer; - - int32_t end_tbl_id = tableSeq + numberOfTblInOneSql; - if (end_tbl_id > winfo->end_table_id) { - end_tbl_id = winfo->end_table_id+1; - } - - for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) { - sampleUsePos = samplePos; - if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { - char* tagsValBuf = NULL; - if (0 == superTblInfo->tagSource) { - tagsValBuf = generateTagVaulesForStb(superTblInfo); - } else { - tagsValBuf = getTagValueFromTagSample( - superTblInfo, tbl_id % superTblInfo->tagSampleCount); - } - if (NULL == tagsValBuf) { - goto free_and_statistics; - } - - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d using %s.%s tags %s values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s%d using %s.%s tags %s values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id, - winfo->db_name, - superTblInfo->sTblName, - tagsValBuf); - } - tmfree(tagsValBuf); - } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s values ", - winfo->db_name, - superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s values ", - winfo->db_name, - superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); - } - } else { // pre-create child table - if (0 == len) { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - "insert into %s.%s%d values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id); - } else { - len += snprintf(pstr + len, - superTblInfo->maxSqlLen - len, - " %s.%s%d values ", - winfo->db_name, - superTblInfo->childTblPrefix, - tbl_id); - } - } - - start_time = time_counter; - for (int j = 0; j < superTblInfo->rowsPerTbl;) { - int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", strlen("sample"))) { - retLen = getRowDataFromSample(pstr + len, - superTblInfo->maxSqlLen - len, - start_time += superTblInfo->timeStampStep, - superTblInfo, - &sampleUsePos); - if (retLen < 0) { - goto free_and_statistics; - } - } else if (0 == strncasecmp( - superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio - && rand_num < superTblInfo->disorderRatio) { - int64_t d = start_time - taosRandom() % superTblInfo->disorderRange; - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - d, - superTblInfo); - } else { - retLen = generateRowData(pstr + len, - superTblInfo->maxSqlLen - len, - start_time += superTblInfo->timeStampStep, - superTblInfo); - } - if (retLen < 0) { - goto free_and_statistics; - } - } - len += retLen; - //inserted++; - j++; - winfo->totalInsertRows++; - - if (inserted >= superTblInfo->insertRows || - (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { - tableSeq = tbl_id + 1; - printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", - superTblInfo->lenOfOneRow); - goto send_to_server; - } - } - } - - tableSeq = tbl_id; - inserted += superTblInfo->rowsPerTbl; - -send_to_server: - if (insert_interval) { - st = taosGetTimestampUs(); - - if (insert_interval > ((et - st)/1000)) { - int sleep_time = insert_interval - (et -st); - printf("sleep: %d ms insert interval\n", sleep_time); - taosMsleep(sleep_time); // ms - } - } - - if (0 == strncasecmp(superTblInfo->insertMode, - "taosc", - strlen("taosc"))) { - //printf("multi table===== sql: %s \n\n", buffer); - //int64_t t1 = taosGetTimestampMs(); - int64_t startTs; - int64_t endTs; - startTs = taosGetTimestampUs(); - - debugPrint("%s() LN%d buff: %s\n", __func__, __LINE__, buffer); - int affectedRows = queryDbExec( - winfo->taos, buffer, INSERT_TYPE); - - if (0 < affectedRows) { - endTs = taosGetTimestampUs(); - int64_t delay = endTs - startTs; - if (delay > winfo->maxDelay) winfo->maxDelay = delay; - if (delay < winfo->minDelay) winfo->minDelay = delay; - winfo->cntDelay++; - winfo->totalDelay += delay; - winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; - winfo->totalAffectedRows += affectedRows; - } else { - fprintf(stderr, "queryDbExec() buffer:\n%s\naffected rows is %d", buffer, affectedRows); - goto free_and_statistics; - } - - int64_t currentPrintTime = taosGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 30*1000) { - printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", - winfo->threadID, - winfo->totalInsertRows, - winfo->totalAffectedRows); - lastPrintTime = currentPrintTime; - } - //int64_t t2 = taosGetTimestampMs(); - //printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0); - } else { - //int64_t t1 = taosGetTimestampMs(); - int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); - //int64_t t2 = taosGetTimestampMs(); - //printf("http insert sql return, Spent %ld ms \n", t2 - t1); - - if (0 != retCode) { - printf("========restful return fail, threadID[%d]\n", winfo->threadID); - goto free_and_statistics; - } - } - if (insert_interval) { - et = taosGetTimestampUs(); - } - - break; - } - - if (tableSeq > winfo->end_table_id) { - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - samplePos = sampleUsePos; - } - i = inserted; - time_counter = start_time; - } - } - - //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); - } - -free_and_statistics: - tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", - winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); - return; -} - int32_t generateData(char *res, char **data_type, int num_of_cols, int64_t timestamp, int lenOfBinary) { memset(res, 0, MAX_DATA_SIZE); @@ -4319,26 +4075,23 @@ int32_t generateData(char *res, char **data_type, static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { char* sampleDataBuf = NULL; - // each thread read sample data from csv file - if (0 == strncasecmp(superTblInfo->dataSource, - "sample", - strlen("sample"))) { - sampleDataBuf = calloc( + sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); - if (sampleDataBuf == NULL) { + if (sampleDataBuf == NULL) { fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); return -1; - } + } - superTblInfo->sampleDataBuf = sampleDataBuf; - int ret = readSampleFromCsvFileToMem(superTblInfo); - if (0 != ret) { + superTblInfo->sampleDataBuf = sampleDataBuf; + int ret = readSampleFromCsvFileToMem(superTblInfo); + + if (0 != ret) { + fprintf(stderr, "read sample from csv file failed.\n"); tmfree(sampleDataBuf); superTblInfo->sampleDataBuf = NULL; return -1; - } } return 0; @@ -4400,7 +4153,8 @@ static int generateDataBuffer(int32_t tableSeq, return -1; } - if (superTblInfo && (superTblInfo->childTblOffset > 0)) { + if (superTblInfo && (superTblInfo->childTblOffset >= 0) + && (superTblInfo->childTblLimit > 0)) { // select tbname from stb limit 1 offset tableSeq getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, pThreadInfo->db_name, superTblInfo->sTblName, @@ -4467,7 +4221,7 @@ static int generateDataBuffer(int32_t tableSeq, verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); for (k = 0; k < g_args.num_of_RPR;) { if (superTblInfo) { - int retLen = 0; + int retLen = 0; if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { @@ -4488,27 +4242,26 @@ static int generateDataBuffer(int32_t tableSeq, superTblInfo->maxSqlLen - len, d, superTblInfo); - //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); - } else { - retLen = generateRowData( + } else { + retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, superTblInfo); - } + } + } - if (retLen < 0) { - free(pChildTblName); - return -1; - } + if (retLen < 0) { + free(pChildTblName); + return -1; + } - len += retLen; + len += retLen; - if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite - k++; - break; - } - } + if (len >= (superTblInfo->maxSqlLen - 256)) { // reserve for overwrite + k++; + break; + } } else { int rand_num = taosRandom() % 100; char data[MAX_DATA_SIZE]; @@ -4529,14 +4282,13 @@ static int generateDataBuffer(int32_t tableSeq, } pstr += sprintf(pstr, " %s", data); - //assert(len + pstr - buffer < BUFFER_SIZE); if (len + pstr - buffer >= (g_args.max_sql_len - 256)) { // too long k++; break; } } - verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%p\n", __func__, __LINE__, len, k, buffer); + verbosePrint("%s() LN%d len=%d k=%d \nbuffer=%s\n", __func__, __LINE__, len, k, buffer); k++; startFrom ++; @@ -4571,21 +4323,7 @@ static void* syncWrite(void *sarg) { strerror(errno)); return NULL; } - - if (superTblInfo) { - if (0 != prepareSampleDataForSTable(superTblInfo)) - return NULL; - - if (superTblInfo->numberOfTblInOneSql > 0) { - syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - return NULL; - } - } - int samplePos = 0; - int64_t lastPrintTime = taosGetTimestampMs(); int64_t startTs = taosGetTimestampUs(); int64_t endTs; @@ -4598,7 +4336,7 @@ static void* syncWrite(void *sarg) { winfo->totalInsertRows = 0; winfo->totalAffectedRows = 0; - int sampleUsePos; + winfo->samplePos = 0; for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; tableSeq ++) { @@ -4612,10 +4350,8 @@ static void* syncWrite(void *sarg) { st = taosGetTimestampUs(); } - sampleUsePos = samplePos; - int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, - i, start_time, &sampleUsePos); + i, start_time, &(winfo->samplePos)); if (generated > 0) i += generated; else @@ -4662,16 +4398,12 @@ static void* syncWrite(void *sarg) { if ((tableSeq == winfo->end_table_id) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { - samplePos = sampleUsePos; + printf("%s() LN%d samplePos=%d\n", __func__, __LINE__, winfo->samplePos); } } // tableSeq free_and_statistics_2: tmfree(buffer); - if (superTblInfo) { - tmfree(superTblInfo->sampleDataBuf); - superTblInfo->sampleDataBuf = NULL; - } printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, @@ -4842,6 +4574,15 @@ static void startMultiThreadInsertData(int threads, char* db_name, else last = 0; + // read sample data from file first + if ((superTblInfo) && (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample")))) { + if (0 != prepareSampleDataForSTable(superTblInfo)) { + fprintf(stderr, "prepare sample data for stable failed!\n"); + exit(-1); + } + } + for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; t_info->threadID = i; @@ -4877,6 +4618,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, t_info->start_time = t_info->start_time + rand_int() % 10000 - rand_tinyint(); } + tsem_init(&(t_info->lock_sem), 0, 0); if (SYNC == g_Dbs.queryMode) { pthread_create(pids + i, NULL, syncWrite, t_info); diff --git a/tests/pytest/fulltest.sh b/tests/pytest/fulltest.sh index d830d96865..a125196b69 100755 --- a/tests/pytest/fulltest.sh +++ b/tests/pytest/fulltest.sh @@ -249,6 +249,7 @@ python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTest2.py python3 test.py -f tools/taosdemoTestWithoutMetric.py python3 test.py -f tools/taosdemoTestLimitOffset.py +python3 test.py -f tools/taosdemoTestSampleData.py # subscribe python3 test.py -f subscribe/singlemeter.py diff --git a/tests/pytest/pytest_3.sh b/tests/pytest/pytest_3.sh index f2462154bc..a8ffff665d 100755 --- a/tests/pytest/pytest_3.sh +++ b/tests/pytest/pytest_3.sh @@ -82,6 +82,7 @@ python3 test.py -f tools/lowaTest.py python3 test.py -f tools/taosdemoTest.py python3 test.py -f tools/taosdemoTestWithoutMetric.py python3 test.py -f tools/taosdemoTestLimitOffset.py +python3 test.py -f tools/taosdemoTestSampleData.py python3 test.py -f tools/taosdumpTest.py #python3 test.py -f tools/taosdemoTest2.py diff --git a/tests/pytest/tools/insert-tblimit-tboffset.json b/tests/pytest/tools/insert-tblimit-tboffset.json index 5f63660347..f3d3e864ba 100644 --- a/tests/pytest/tools/insert-tblimit-tboffset.json +++ b/tests/pytest/tools/insert-tblimit-tboffset.json @@ -44,7 +44,6 @@ "childtable_offset": 33, "multi_thread_write_one_tbl": "no", "number_of_tbl_in_one_sql": 0, - "rows_per_tbl": 100, "max_sql_len": 1024000, "disorder_ratio": 0, "disorder_range": 1000, diff --git a/tests/pytest/tools/sampledata.csv b/tests/pytest/tools/sampledata.csv new file mode 100644 index 0000000000..01e79c32a8 --- /dev/null +++ b/tests/pytest/tools/sampledata.csv @@ -0,0 +1,3 @@ +1 +2 +3 diff --git a/tests/pytest/tools/taosdemo-sampledata.json b/tests/pytest/tools/taosdemo-sampledata.json new file mode 100644 index 0000000000..473c977773 --- /dev/null +++ b/tests/pytest/tools/taosdemo-sampledata.json @@ -0,0 +1,39 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "thread_count": 10, + "confirm_parameter_prompt": "no", + "databases": [{ + "dbinfo": { + "name": "db", + "drop": "yes" + }, + "super_tables": [{ + "name": "stb", + "child_table_exists":"no", + "childtable_count": 20, + "childtable_limit": 10, + "childtable_offset": 0, + "childtable_prefix": "t_", + "auto_create_table": "no", + "data_source": "sample", + "insert_mode": "taosc", + "insert_rate": 0, + "insert_rows": 20, + "multi_thread_write_one_tbl": "no", + "number_of_tbl_in_one_sql": 0, + "max_sql_len": 1048000, + "timestamp_step": 1000, + "start_timestamp": "2020-1-1 00:00:00", + "sample_format": "csv", + "sample_file": "./tools/sampledata.csv", + "columns": [{"type": "INT"}], + "tags": [{"type": "INT", "count":1}] + }] + }] + +} diff --git a/tests/pytest/tools/taosdemoTestLimitOffset.py b/tests/pytest/tools/taosdemoTestLimitOffset.py index 6dbe5a7028..69a81b166b 100644 --- a/tests/pytest/tools/taosdemoTestLimitOffset.py +++ b/tests/pytest/tools/taosdemoTestLimitOffset.py @@ -59,6 +59,15 @@ class TDTestCase: tdSql.query("select count(*) from db.stb") tdSql.checkData(0, 0, 33000) + os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath) + + tdSql.execute("reset query cache") + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 100) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 20000) + def stop(self): tdSql.close() tdLog.success("%s successfully executed" % __file__) diff --git a/tests/pytest/tools/taosdemoTestSampleData.py b/tests/pytest/tools/taosdemoTestSampleData.py new file mode 100644 index 0000000000..893c53984d --- /dev/null +++ b/tests/pytest/tools/taosdemoTestSampleData.py @@ -0,0 +1,68 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 10000 + self.numberOfRecords = 100 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root)-len("/build/bin")] + break + return buildPath + + def run(self): + tdSql.prepare() + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath+ "/build/bin/" + os.system("%staosdemo -f tools/taosdemo-sampledata.json" % binPath) + + tdSql.execute("use db") + tdSql.query("select count(tbname) from db.stb") + tdSql.checkData(0, 0, 20) + tdSql.query("select count(*) from db.stb") + tdSql.checkData(0, 0, 200) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) -- GitLab