From 9e7ed32f40dd862ad39090184346d9e22a4ccd4b Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Thu, 11 Mar 2021 16:16:54 +0800 Subject: [PATCH] [TD-3192] : support stb limit and offset. refactor. --- src/kit/taosdemo/insert.json | 1 + src/kit/taosdemo/taosdemo.c | 298 +++++++++++++---------- tests/pytest/tools/taosdemoWithMetric.py | 72 ++++++ 3 files changed, 245 insertions(+), 126 deletions(-) create mode 100644 tests/pytest/tools/taosdemoWithMetric.py diff --git a/src/kit/taosdemo/insert.json b/src/kit/taosdemo/insert.json index 4e908444da..5ed744cde9 100644 --- a/src/kit/taosdemo/insert.json +++ b/src/kit/taosdemo/insert.json @@ -11,6 +11,7 @@ "confirm_parameter_prompt": "no", "insert_interval": 0, "num_of_records_per_req": 100, + "max_sql_len": 1024000, "databases": [{ "dbinfo": { "name": "db", diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 957e7b2751..bb5305256f 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -194,6 +194,7 @@ typedef struct SArguments_S { int num_of_threads; int insert_interval; int num_of_RPR; + int max_sql_len; int num_of_tables; int num_of_DPT; int abort; @@ -513,6 +514,7 @@ SArguments g_args = { 10, // num_of_connections/thread 0, // insert_interval 100, // num_of_RPR + TSDB_PAYLOAD_SIZE, // max_sql_len 10000, // num_of_tables 10000, // num_of_DPT 0, // abort @@ -759,6 +761,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { } printf("# Insertion interval: %d\n", arguments->insert_interval); printf("# Number of records per req: %d\n", arguments->num_of_RPR); + printf("# Max SQL length: %d\n", arguments->max_sql_len); printf("# Number of Threads: %d\n", arguments->num_of_threads); printf("# Number of Tables: %d\n", arguments->num_of_tables); printf("# Number of Data per Table: %d\n", arguments->num_of_DPT); @@ -1005,6 +1008,7 @@ static int printfInsertMeta() { printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl); printf("insert interval: \033[33m%d\033[0m\n", g_args.insert_interval); printf("number of records per req: \033[33m%d\033[0m\n", g_args.num_of_RPR); + printf("max sql length: \033[33m%d\033[0m\n", g_args.max_sql_len); printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount); for (int i = 0; i < g_Dbs.dbCount; i++) { @@ -1102,25 +1106,37 @@ static int printfInsertMeta() { }else { printf(" multiThreadWriteOneTbl: \033[33myes\033[0m\n"); } - printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); - printf(" rowsPerTbl: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].rowsPerTbl); - printf(" disorderRange: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRange); - printf(" disorderRatio: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].disorderRatio); - printf(" maxSqlLen: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].maxSqlLen); - - printf(" timeStampStep: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].timeStampStep); - printf(" startTimestamp: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].startTimestamp); - printf(" sampleFormat: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFormat); - printf(" sampleFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].sampleFile); - printf(" tagsFile: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].tagsFile); - - printf(" columnCount: \033[33m%d\033[0m\n ", g_Dbs.db[i].superTbls[j].columnCount); + printf(" numberOfTblInOneSql: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].numberOfTblInOneSql); + printf(" rowsPerTbl: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].rowsPerTbl); + printf(" disorderRange: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].disorderRange); + printf(" disorderRatio: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].disorderRatio); + printf(" maxSqlLen: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].maxSqlLen); + printf(" timeStampStep: \033[33m%d\033[0m\n", + g_Dbs.db[i].superTbls[j].timeStampStep); + printf(" startTimestamp: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].startTimestamp); + printf(" sampleFormat: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].sampleFormat); + printf(" sampleFile: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].sampleFile); + printf(" tagsFile: \033[33m%s\033[0m\n", + g_Dbs.db[i].superTbls[j].tagsFile); + printf(" columnCount: \033[33m%d\033[0m\n ", + g_Dbs.db[i].superTbls[j].columnCount); for (int k = 0; k < g_Dbs.db[i].superTbls[j].columnCount; k++) { //printf("dataType:%s, dataLen:%d\t", g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); - if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "binary", 6)) - || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, "nchar", 5))) { + if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + "binary", 6)) + || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].columns[k].dataType, + "nchar", 5))) { printf("column[\033[33m%d\033[0m]:\033[33m%s(%d)\033[0m ", k, - g_Dbs.db[i].superTbls[j].columns[k].dataType, g_Dbs.db[i].superTbls[j].columns[k].dataLen); + g_Dbs.db[i].superTbls[j].columns[k].dataType, + g_Dbs.db[i].superTbls[j].columns[k].dataLen); } else { printf("column[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].columns[k].dataType); @@ -1135,7 +1151,8 @@ static int printfInsertMeta() { if ((0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "binary", 6)) || (0 == strncasecmp(g_Dbs.db[i].superTbls[j].tags[k].dataType, "nchar", 5))) { printf("tag[%d]:\033[33m%s(%d)\033[0m ", k, - g_Dbs.db[i].superTbls[j].tags[k].dataType, g_Dbs.db[i].superTbls[j].tags[k].dataLen); + g_Dbs.db[i].superTbls[j].tags[k].dataType, + g_Dbs.db[i].superTbls[j].tags[k].dataLen); } else { printf("tag[%d]:\033[33m%s\033[0m ", k, g_Dbs.db[i].superTbls[j].tags[k].dataType); @@ -1904,7 +1921,6 @@ static int getAllChildNameOfSuperTable(TAOS * taos, char* dbName, return getChildNameOfSuperTableWithLimitAndOffset(taos, dbName, sTblName, childTblNameOfSuperTbl, childTblCountOfSuperTbl, -1, -1); - } static int getSuperTableFromServer(TAOS * taos, char* dbName, @@ -2392,7 +2408,7 @@ static void createChildTables() { char tblColsBuf[MAX_SQL_SIZE]; int len; - for (int i = 0; i < g_Dbs.dbCount; i++) { + for (int i = 0; i < g_Dbs.dbCount; i++) { if (g_Dbs.db[i].superTblCount > 0) { // with super table for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { @@ -2416,10 +2432,13 @@ static void createChildTables() { int j = 0; while (g_args.datatype[j]) { if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0) - || (strncasecmp(g_args.datatype[j], "NCHAR", strlen("NCHAR")) == 0)) { - len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s(60)", j, g_args.datatype[j]); + || (strncasecmp(g_args.datatype[j], + "NCHAR", strlen("NCHAR")) == 0)) { + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, + ", COL%d %s(60)", j, g_args.datatype[j]); } else { - len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, ", COL%d %s", j, g_args.datatype[j]); + len = snprintf(tblColsBuf + len, MAX_SQL_SIZE, + ", COL%d %s", j, g_args.datatype[j]); } len = strlen(tblColsBuf); j++; @@ -2427,7 +2446,8 @@ static void createChildTables() { len = snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")"); - verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", __func__, __LINE__, + verbosePrint("%s() LN%d: dbName: %s num of tb: %d schema: %s\n", + __func__, __LINE__, g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf); startMultiThreadCreateChildTable( tblColsBuf, @@ -2540,19 +2560,29 @@ int readSampleFromJsonFileToMem(SSuperTable * superTblInfo) { /* Read 10000 lines at most. If more than 10000 lines, continue to read after using */ -int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sampleBuf) { +static int readSampleFromCsvFileToMem( + SSuperTable* superTblInfo) { size_t n = 0; ssize_t readLen = 0; char * line = NULL; int getRows = 0; + + FILE* fp = fopen(superTblInfo->sampleFile, "r"); + if (fp == NULL) { + fprintf(stderr, "Failed to open sample file: %s, reason:%s\n", + superTblInfo->sampleFile, strerror(errno)); + return -1; + } - memset(sampleBuf, 0, MAX_SAMPLES_ONCE_FROM_FILE* superTblInfo->lenOfOneRow); + memset(superTblInfo->sampleDataBuf, 0, + MAX_SAMPLES_ONCE_FROM_FILE * superTblInfo->lenOfOneRow); while (1) { readLen = tgetline(&line, &n, fp); if (-1 == readLen) { if(0 != fseek(fp, 0, SEEK_SET)) { - printf("Failed to fseek file: %s, reason:%s\n", + fprintf(stderr, "Failed to fseek file: %s, reason:%s\n", superTblInfo->sampleFile, strerror(errno)); + fclose(fp); return -1; } continue; @@ -2572,7 +2602,8 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample continue; } - memcpy(sampleBuf + getRows * superTblInfo->lenOfOneRow, line, readLen); + memcpy(superTblInfo->sampleDataBuf + getRows * superTblInfo->lenOfOneRow, + line, readLen); getRows++; if (getRows == MAX_SAMPLES_ONCE_FROM_FILE) { @@ -2580,6 +2611,7 @@ int readSampleFromCsvFileToMem(FILE *fp, SSuperTable* superTblInfo, char* sample } } + fclose(fp); tmfree(line); return 0; } @@ -2810,6 +2842,17 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { goto PARSE_OVER; } + cJSON* maxSqlLen = cJSON_GetObjectItem(root, "max_sql_len"); + if (maxSqlLen && maxSqlLen->type == cJSON_Number) { + g_args.max_sql_len = maxSqlLen->valueint; + } else if (!maxSqlLen) { + g_args.max_sql_len = TSDB_PAYLOAD_SIZE; + } else { + fprintf(stderr, "ERROR: failed to read json, max_sql_len input mistake\n"); + goto PARSE_OVER; + } + + cJSON* numRecPerReq = cJSON_GetObjectItem(root, "num_of_records_per_req"); if (numRecPerReq && numRecPerReq->type == cJSON_Number) { g_args.num_of_RPR = numRecPerReq->valueint; @@ -3774,22 +3817,26 @@ void postFreeResource() { } } -int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* superTblInfo, int* sampleUsePos, FILE *fp, char* sampleBuf) { +static int getRowDataFromSample(char* dataBuf, int maxLen, int64_t timestamp, + SSuperTable* superTblInfo, int* sampleUsePos) { if ((*sampleUsePos) == MAX_SAMPLES_ONCE_FROM_FILE) { - int ret = readSampleFromCsvFileToMem(fp, superTblInfo, sampleBuf); + int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { + tmfree(superTblInfo->sampleDataBuf); return -1; } *sampleUsePos = 0; } int dataLen = 0; - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "(%" PRId64 ", ", timestamp); - dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, "%s", sampleBuf + superTblInfo->lenOfOneRow * (*sampleUsePos)); + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "(%" PRId64 ", ", timestamp); + dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, + "%s", superTblInfo->sampleDataBuf + superTblInfo->lenOfOneRow * (*sampleUsePos)); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); (*sampleUsePos)++; - + return dataLen; } @@ -3840,7 +3887,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* } static void syncWriteForNumberOfTblInOneSql( - threadInfo *winfo, FILE *fp, char* sampleDataBuf) { + threadInfo *winfo, char* sampleDataBuf) { SSuperTable* superTblInfo = winfo->superTblInfo; int samplePos = 0; @@ -3960,9 +4007,7 @@ static void syncWriteForNumberOfTblInOneSql( superTblInfo->maxSqlLen - len, tmp_time += superTblInfo->timeStampStep, superTblInfo, - &sampleUsePos, - fp, - sampleDataBuf); + &sampleUsePos); if (retLen < 0) { goto free_and_statistics; } @@ -4159,7 +4204,14 @@ static void* syncWrite(void *sarg) { threadInfo *winfo = (threadInfo *)sarg; - char buffer[BUFFER_SIZE] = "\0"; + char* buffer = calloc(g_args.max_sql_len, 1); + if (NULL == buffer) { + fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", + g_args.max_sql_len, + strerror(errno)); + return NULL; + } + char data[MAX_DATA_SIZE]; char **data_type = g_args.datatype; int len_of_binary = g_args.len_of_binary; @@ -4181,11 +4233,13 @@ static void* syncWrite(void *sarg) { winfo->totalAffectedRows = 0; for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { + int64_t tmp_time = time_counter; + for (int i = 0; i < g_args.num_of_DPT;) { int tblInserted = i; - int64_t tmp_time = time_counter; + memset(buffer, 0, g_args.max_sql_len); char *pstr = buffer; pstr += sprintf(pstr, "insert into %s.%s%d values ", @@ -4264,6 +4318,7 @@ static void* syncWrite(void *sarg) { } // num_of_DPT } // tId + tmfree(buffer); printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, winfo->totalRowsInserted, @@ -4272,16 +4327,8 @@ static void* syncWrite(void *sarg) { return NULL; } - -static void* syncWriteWithStb(void *sarg) { - uint64_t lastPrintTime = taosGetTimestampMs(); - - threadInfo *winfo = (threadInfo *)sarg; - SSuperTable* superTblInfo = winfo->superTblInfo; - - FILE *fp = NULL; +static int prepareSampleData(SSuperTable *superTblInfo) { char* sampleDataBuf = NULL; - int samplePos = 0; // each thread read sample data from csv file if (0 == strncasecmp(superTblInfo->dataSource, @@ -4290,42 +4337,49 @@ static void* syncWriteWithStb(void *sarg) { sampleDataBuf = calloc( superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, 1); if (sampleDataBuf == NULL) { - printf("Failed to calloc %d Bytes, reason:%s\n", + fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", superTblInfo->lenOfOneRow * MAX_SAMPLES_ONCE_FROM_FILE, strerror(errno)); - return NULL; + return -1; } - fp = fopen(superTblInfo->sampleFile, "r"); - if (fp == NULL) { - printf("Failed to open sample file: %s, reason:%s\n", - superTblInfo->sampleFile, strerror(errno)); - tmfree(sampleDataBuf); - return NULL; - } - int ret = readSampleFromCsvFileToMem(fp, - superTblInfo, sampleDataBuf); + int ret = readSampleFromCsvFileToMem(superTblInfo); if (0 != ret) { - tmfree(sampleDataBuf); - tmfclose(fp); - return NULL; + tmfree(superTblInfo->sampleDataBuf); + return -1; } } - if (superTblInfo->numberOfTblInOneSql > 0) { - syncWriteForNumberOfTblInOneSql(winfo, fp, sampleDataBuf); - tmfree(sampleDataBuf); - tmfclose(fp); - return NULL; + superTblInfo->sampleDataBuf = sampleDataBuf; + + return 0; +} + +static void* syncWriteWithStb(void *sarg) { + uint64_t lastPrintTime = taosGetTimestampMs(); + + threadInfo *winfo = (threadInfo *)sarg; + SSuperTable* superTblInfo = winfo->superTblInfo; + + int samplePos = 0; + + if (superTblInfo) { + if (0 != prepareSampleData(superTblInfo)) + return NULL; + + if (superTblInfo->numberOfTblInOneSql > 0) { + syncWriteForNumberOfTblInOneSql(winfo, superTblInfo->sampleDataBuf); + tmfree(superTblInfo->sampleDataBuf); + return NULL; + } } char* buffer = calloc(superTblInfo->maxSqlLen, 1); if (NULL == buffer) { - printf("Failed to calloc %d Bytes, reason:%s\n", + fprintf(stderr, "Failed to calloc %d Bytes, reason:%s\n", superTblInfo->maxSqlLen, strerror(errno)); - tmfree(sampleDataBuf); - tmfclose(fp); + tmfree(superTblInfo->sampleDataBuf); return NULL; } @@ -4339,6 +4393,10 @@ static void* syncWriteWithStb(void *sarg) { debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows); + if (superTblInfo->childTblLimit ) { + // CBD + } + for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) { int64_t start_time = winfo->start_time; @@ -4413,9 +4471,7 @@ static void* syncWriteWithStb(void *sarg) { superTblInfo->maxSqlLen - len, start_time + superTblInfo->timeStampStep * i, superTblInfo, - &sampleUsePos, - fp, - sampleDataBuf); + &sampleUsePos); if (retLen < 0) { goto free_and_statistics_2; } @@ -4511,8 +4567,7 @@ static void* syncWriteWithStb(void *sarg) { free_and_statistics_2: tmfree(buffer); - tmfree(sampleDataBuf); - tmfclose(fp); + tmfree(superTblInfo->sampleDataBuf); printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", winfo->threadID, @@ -4602,13 +4657,6 @@ static void startMultiThreadInsertData(int threads, char* db_name, memset(pids, 0, threads * sizeof(pthread_t)); memset(infos, 0, threads * sizeof(threadInfo)); - /* -xxx getAllChildNameOfSuperTable(taos, - g_queryInfo.dbName, - g_queryInfo.subQueryInfo.sTblName, - &g_queryInfo.subQueryInfo.childTblName, - &g_queryInfo.subQueryInfo.childTblCount); - */ int ntables = 0; if (superTblInfo) ntables = superTblInfo->childTblCount; @@ -4642,7 +4690,7 @@ xxx getAllChildNameOfSuperTable(taos, } else if (0 == strncasecmp(precision, "us", 2)) { timePrec = TSDB_TIME_PRECISION_MICRO; } else { - printf("No support precision: %s\n", precision); + fprintf(stderr, "No support precision: %s\n", precision); exit(-1); } } @@ -4651,11 +4699,11 @@ xxx getAllChildNameOfSuperTable(taos, if (superTblInfo) { if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) { start_time = taosGetTimestamp(timePrec); - } else { + } else { if (TSDB_CODE_SUCCESS != taosParseTime( - superTblInfo->startTimestamp, - &start_time, - strlen(superTblInfo->startTimestamp), + superTblInfo->startTimestamp, + &start_time, + strlen(superTblInfo->startTimestamp), timePrec, 0)) { printf("ERROR to parse time!\n"); exit(-1); @@ -4666,7 +4714,7 @@ xxx getAllChildNameOfSuperTable(taos, } double start = getCurrentTime(); - + int last = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infos + i; @@ -4681,7 +4729,7 @@ xxx getAllChildNameOfSuperTable(taos, (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) { //t_info->taos = taos; t_info->taos = taos_connect( - g_Dbs.host, g_Dbs.user, + g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); if (NULL == t_info->taos) { printf("connect to server fail from insert sub thread, reason: %s\n", @@ -4710,7 +4758,7 @@ xxx getAllChildNameOfSuperTable(taos, } else { pthread_create(pids + i, NULL, syncWrite, t_info); } - } else { + } else { pthread_create(pids + i, NULL, asyncWrite, t_info); } } @@ -4739,7 +4787,7 @@ xxx getAllChildNameOfSuperTable(taos, totalDelay += t_info->totalDelay; cntDelay += t_info->cntDelay; if (t_info->maxDelay > maxDelay) maxDelay = t_info->maxDelay; - if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; + if (t_info->minDelay < minDelay) minDelay = t_info->minDelay; } cntDelay -= 1; @@ -4750,13 +4798,13 @@ xxx getAllChildNameOfSuperTable(taos, double t = end - start; if (superTblInfo) { - printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, superTblInfo->totalRowsInserted / t); fprintf(g_fpOfInsertResult, - "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", + "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", t, superTblInfo->totalRowsInserted, superTblInfo->totalAffectedRows, threads, db_name, superTblInfo->sTblName, @@ -4771,10 +4819,9 @@ xxx getAllChildNameOfSuperTable(taos, //taos_close(taos); free(pids); - free(infos); + free(infos); } - void *readTable(void *sarg) { #if 1 threadInfo *rinfo = (threadInfo *)sarg; @@ -4933,10 +4980,10 @@ static int insertTestProcess() { if (NULL == g_fpOfInsertResult) { fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile); return -1; - } { - printfInsertMetaToFile(g_fpOfInsertResult); } + printfInsertMetaToFile(g_fpOfInsertResult); + if (!g_args.answer_yes) { printf("Press enter key to continue\n\n"); (void)getchar(); @@ -4972,27 +5019,27 @@ static int insertTestProcess() { taosMsleep(1000); // create sub threads for inserting data //start = getCurrentTime(); - for (int i = 0; i < g_Dbs.dbCount; i++) { - if (g_Dbs.db[i].superTblCount > 0) { - for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { - SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; - if (0 == g_Dbs.db[i].superTbls[j].insertRows) { - continue; - } - startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, - superTblInfo); - } - } else { - startMultiThreadInsertData( - g_Dbs.threadCount, - g_Dbs.db[i].dbName, - g_Dbs.db[i].dbCfg.precision, - NULL); + for (int i = 0; i < g_Dbs.dbCount; i++) { + if (g_Dbs.db[i].superTblCount > 0) { + for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { + SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j]; + if (0 == g_Dbs.db[i].superTbls[j].insertRows) { + continue; } + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + superTblInfo); + } + } else { + startMultiThreadInsertData( + g_Dbs.threadCount, + g_Dbs.db[i].dbName, + g_Dbs.db[i].dbCfg.precision, + NULL); } + } //end = getCurrentTime(); //int64_t totalRowsInserted = 0; @@ -5056,7 +5103,7 @@ void *superQueryProcess(void *sarg) { return NULL; } -void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { +static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { char sourceString[32] = "xxxx"; char subTblName[MAX_TB_NAME_SIZE*3]; sprintf(subTblName, "%s.%s", @@ -5078,7 +5125,7 @@ void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { //printf("3: %s\n", outSql); } -void *subQueryProcess(void *sarg) { +static void *subQueryProcess(void *sarg) { char sqlstr[1024]; threadInfo *winfo = (threadInfo *)sarg; int64_t st = 0; @@ -5430,7 +5477,6 @@ static int subscribeTestProcess() { &g_queryInfo.subQueryInfo.childTblCount); } - pthread_t *pids = NULL; threadInfo *infos = NULL; //==== create sub threads for query from super table @@ -5451,7 +5497,7 @@ static int subscribeTestProcess() { pthread_create(pids + i, NULL, superSubscribeProcess, t_info); } } - + //==== create sub threads for query from sub table pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; @@ -5466,26 +5512,26 @@ static int subscribeTestProcess() { taos_close(taos); exit(-1); } - + int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; - + int a = ntables / threads; if (a < 1) { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } - + int last = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; t_info->threadID = i; - + t_info->start_table_id = last; t_info->end_table_id = i < b ? last + a : last + a - 1; t_info->taos = taos; @@ -5493,7 +5539,7 @@ static int subscribeTestProcess() { } g_queryInfo.subQueryInfo.threadCnt = threads; } - + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { pthread_join(pids[i], NULL); } @@ -5601,7 +5647,7 @@ void setParaFromArg(){ "2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE); g_Dbs.db[0].superTbls[0].timeStampStep = 10; - g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; + g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT; g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE; g_Dbs.db[0].superTbls[0].columnCount = 0; diff --git a/tests/pytest/tools/taosdemoWithMetric.py b/tests/pytest/tools/taosdemoWithMetric.py new file mode 100644 index 0000000000..647d6a37cb --- /dev/null +++ b/tests/pytest/tools/taosdemoWithMetric.py @@ -0,0 +1,72 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import os +from util.log import * +from util.cases import * +from util.sql import * +from util.dnodes import * + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.numberOfTables = 100 + self.numberOfRecords = 1000 + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def run(self): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + binPath = buildPath + "/build/bin/" + os.system("%staosdemo -y -t %d -n %d -x" % + (binPath, self.numberOfTables, self.numberOfRecords)) + + tdSql.query("show databases") + for i in range(18): + print(tdSql.getData(0, i) ) + tdSql.checkData(0, 2, self.numberOfTables) + + tdSql.execute("use test") + tdSql.query( + "select count(*) from test.t%d" % (self.numberOfTables -1)) + tdSql.checkData(0, 0, self.numberOfRecords) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) -- GitLab