From e4d619538e62446780bf051f3154837f1eea5ea0 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 12 Mar 2021 17:34:44 +0800 Subject: [PATCH] [TD-3192] : support stb limit and offset. verified. --- src/kit/taosdemo/taosdemo.c | 155 +++++++++++++++++++----------------- 1 file changed, 84 insertions(+), 71 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index b34757432d..d0f98dd66e 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -1982,7 +1982,7 @@ static int getChildNameOfSuperTableWithLimitAndOffset(TAOS * taos, exit(-1); } - int childTblCount = 10000; + int childTblCount = (limit < 0)?10000:limit; int count = 0; childTblName = (char*)calloc(1, childTblCount * TSDB_TABLE_NAME_LEN); char* pTblName = childTblName; @@ -2100,7 +2100,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName, static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls, bool use_metric) { char command[BUFFER_SIZE] = "\0"; - + char cols[STRING_LEN] = "\0"; int colIndex; int len = 0; @@ -2525,14 +2525,8 @@ static void createChildTables() { verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable); - int startFrom; - if (g_Dbs.db[i].superTbls[j].childTblOffset) { - startFrom = g_Dbs.db[i].superTbls[j].childTblOffset; - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblLimit; - } else { - startFrom = 0; - g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; - } + int startFrom = 0; + g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, g_totalChildTables, startFrom); @@ -3882,7 +3876,7 @@ PARSE_OVER: return ret; } -void prePareSampleData() { +void prepareSampleData() { for (int i = 0; i < g_Dbs.dbCount; i++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { //if (0 == strncasecmp(g_Dbs.db[i].superTbls[j].dataSource, "sample", 6)) { @@ -4018,7 +4012,7 @@ static void syncWriteForNumberOfTblInOneSql( int64_t et = 0xffffffff; for (int i = 0; i < superTblInfo->insertRows;) { int32_t tbl_id = 0; - for (int tableID = winfo->start_table_id; tableID <= winfo->end_table_id; ) { + for (int tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; ) { int64_t start_time = 0; int inserted = i; @@ -4027,12 +4021,12 @@ static void syncWriteForNumberOfTblInOneSql( memset(buffer, 0, superTblInfo->maxSqlLen); char *pstr = buffer; - int32_t end_tbl_id = tableID + numberOfTblInOneSql; + int32_t end_tbl_id = tableSeq + numberOfTblInOneSql; if (end_tbl_id > winfo->end_table_id) { end_tbl_id = winfo->end_table_id+1; } - for (tbl_id = tableID; tbl_id < end_tbl_id; tbl_id++) { + for (tbl_id = tableSeq ; tbl_id < end_tbl_id; tbl_id++) { sampleUsePos = samplePos; if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { char* tagsValBuf = NULL; @@ -4140,7 +4134,7 @@ static void syncWriteForNumberOfTblInOneSql( if (inserted >= superTblInfo->insertRows || (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { - tableID = tbl_id + 1; + tableSeq = tbl_id + 1; printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n", superTblInfo->lenOfOneRow); goto send_to_server; @@ -4148,7 +4142,7 @@ static void syncWriteForNumberOfTblInOneSql( } } - tableID = tbl_id; + tableSeq = tbl_id; inserted += superTblInfo->rowsPerTbl; send_to_server: @@ -4217,7 +4211,7 @@ send_to_server: break; } - if (tableID > winfo->end_table_id) { + if (tableSeq > winfo->end_table_id) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { samplePos = sampleUsePos; } @@ -4231,7 +4225,8 @@ send_to_server: free_and_statistics: tmfree(buffer); - printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); + printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", + winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows); return; } @@ -4293,7 +4288,7 @@ int32_t generateData(char *res, char **data_type, return (int32_t)(pstr - res); } -static int prepareSampleData(SSuperTable *superTblInfo) { +static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { char* sampleDataBuf = NULL; // each thread read sample data from csv file @@ -4327,11 +4322,10 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) SSuperTable* superTblInfo = winfo->superTblInfo; if (superTblInfo) { - if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { + if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE); - - } else { + } else { verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); @@ -4341,7 +4335,7 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) } else { affectedRows = k; } - } + } } else { verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); affectedRows = queryDbExec(winfo->taos, buffer, 1); @@ -4354,7 +4348,8 @@ static int execInsert(threadInfo *winfo, char *buffer, int k) return affectedRows; } -static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *buffer, +static int generateDataBuffer(int32_t tableSeq, + threadInfo *pThreadInfo, char *buffer, int64_t insertRows, int64_t startFrom, int64_t startTime, int *pSampleUsePos) { @@ -4372,6 +4367,26 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu assert(buffer != NULL); + char *pChildTblName; + int childTblCount; + + if (superTblInfo && (superTblInfo->childTblOffset > 0)) { + // TODO + // select tbname from stb limit 1 offset tableSeq + getChildNameOfSuperTableWithLimitAndOffset(pThreadInfo->taos, + pThreadInfo->db_name, superTblInfo->sTblName, + &pChildTblName, &childTblCount, + 1, tableSeq); + } else { + pChildTblName = calloc(TSDB_TABLE_NAME_LEN, 1); + if (NULL == pChildTblName) { + fprintf(stderr, "failed to alloc memory %d\n", TSDB_TABLE_NAME_LEN); + return -1; + } + snprintf(pChildTblName, TSDB_TABLE_NAME_LEN, "%s%d", + superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, tableSeq); + } + memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); char *pstr = buffer; @@ -4384,19 +4399,19 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu } else { tagsValBuf = getTagValueFromTagSample( superTblInfo, - tableID % superTblInfo->tagSampleCount); + tableSeq % superTblInfo->tagSampleCount); } if (NULL == tagsValBuf) { fprintf(stderr, "tag buf failed to allocate memory\n"); + free(pChildTblName); return -1; } pstr += snprintf(pstr, superTblInfo->maxSqlLen, - "insert into %s.%s%d using %s.%s tags %s values", + "insert into %s.%s using %s.%s tags %s values", pThreadInfo->db_name, - superTblInfo->childTblPrefix, - tableID, + pChildTblName, pThreadInfo->db_name, superTblInfo->sTblName, tagsValBuf); @@ -4406,22 +4421,20 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu superTblInfo->maxSqlLen, "insert into %s.%s values", pThreadInfo->db_name, - superTblInfo->childTblName + tableID * TSDB_TABLE_NAME_LEN); + superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN); } else { pstr += snprintf(pstr, (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), - "insert into %s.%s%d values", + "insert into %s.%s values", pThreadInfo->db_name, - superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tableID); + pChildTblName); } } else { pstr += snprintf(pstr, (superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len), - "insert into %s.%s%d values", + "insert into %s.%s values", pThreadInfo->db_name, - superTblInfo?superTblInfo->childTblPrefix:g_args.tb_prefix, - tableID); + pChildTblName); } int k; @@ -4432,26 +4445,28 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu if (superTblInfo) { int retLen = 0; - if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { - retLen = getRowDataFromSample( + if (0 == strncasecmp(superTblInfo->dataSource, + "sample", strlen("sample"))) { + retLen = getRowDataFromSample( pstr + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, superTblInfo, pSampleUsePos); - } else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) { - int rand_num = rand_tinyint() % 100; - if (0 != superTblInfo->disorderRatio + } else if (0 == strncasecmp(superTblInfo->dataSource, + "rand", strlen("rand"))) { + int rand_num = rand_tinyint() % 100; + if (0 != superTblInfo->disorderRatio && rand_num < superTblInfo->disorderRatio) { - int64_t d = startTime - rand() % superTblInfo->disorderRange; - retLen = generateRowData( + int64_t d = startTime - rand() % superTblInfo->disorderRange; + retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, d, superTblInfo); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, start_time, d); } else { - retLen = generateRowData( + retLen = generateRowData( pstr + len, superTblInfo->maxSqlLen - len, startTime + superTblInfo->timeStampStep * startFrom, @@ -4459,7 +4474,8 @@ static int generateDataBuffer(int32_t tableID, threadInfo *pThreadInfo, char *bu } if (retLen < 0) { - return -1; + free(pChildTblName); + return -1; } len += retLen; @@ -4518,7 +4534,7 @@ static void* syncWrite(void *sarg) { if (superTblInfo) { - if (0 != prepareSampleData(superTblInfo)) + if (0 != prepareSampleDataForSTable(superTblInfo)) return NULL; if (superTblInfo->numberOfTblInOneSql > 0) { @@ -4553,12 +4569,8 @@ static void* syncWrite(void *sarg) { int sampleUsePos; - if (superTblInfo && superTblInfo->childTblOffset) { - // TODO - } - - for (uint32_t tableID = winfo->start_table_id; tableID <= winfo->end_table_id; - tableID++) { + for (uint32_t tableSeq = winfo->start_table_id; tableSeq <= winfo->end_table_id; + tableSeq ++) { int64_t start_time = winfo->start_time; int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; @@ -4571,7 +4583,7 @@ static void* syncWrite(void *sarg) { sampleUsePos = samplePos; - int generated = generateDataBuffer(tableID, winfo, buffer, insertRows, + int generated = generateDataBuffer(tableSeq, winfo, buffer, insertRows, i, start_time, &sampleUsePos); if (generated > 0) i += generated; @@ -4615,12 +4627,12 @@ static void* syncWrite(void *sarg) { } } // num_of_DPT - if ((tableID == winfo->end_table_id) && superTblInfo && + if ((tableSeq == winfo->end_table_id) && superTblInfo && (0 == strncasecmp( superTblInfo->dataSource, "sample", strlen("sample")))) { samplePos = sampleUsePos; } - } // tableID + } // tableSeq free_and_statistics_2: tmfree(buffer); @@ -5069,7 +5081,7 @@ static int insertTestProcess() { } // pretreatement - prePareSampleData(); + prepareSampleData(); double start; double end; @@ -5244,10 +5256,10 @@ static int queryTestProcess() { } if (0 != g_queryInfo.subQueryInfo.sqlCount) { - getAllChildNameOfSuperTable(taos, - g_queryInfo.dbName, - g_queryInfo.subQueryInfo.sTblName, - &g_queryInfo.subQueryInfo.childTblName, + getAllChildNameOfSuperTable(taos, + g_queryInfo.dbName, + g_queryInfo.subQueryInfo.sTblName, + &g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblCount); } @@ -5264,7 +5276,7 @@ static int queryTestProcess() { threadInfo *infos = NULL; //==== create sub threads for query from specify table if (g_queryInfo.superQueryInfo.sqlCount > 0 && g_queryInfo.superQueryInfo.concurrent > 0) { - + pids = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(pthread_t)); infos = malloc(g_queryInfo.superQueryInfo.concurrent * sizeof(threadInfo)); if ((NULL == pids) || (NULL == infos)) { @@ -5272,14 +5284,14 @@ static int queryTestProcess() { taos_close(taos); exit(-1); } - - for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { + + for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { threadInfo *t_info = infos + i; - t_info->threadID = i; + t_info->threadID = i; if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { t_info->taos = taos; - + char sqlStr[MAX_TB_NAME_SIZE*2]; sprintf(sqlStr, "use %s", g_queryInfo.dbName); verbosePrint("%s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr); @@ -5288,16 +5300,17 @@ static int queryTestProcess() { t_info->taos = NULL; } - pthread_create(pids + i, NULL, superQueryProcess, t_info); - } + pthread_create(pids + i, NULL, superQueryProcess, t_info); + } }else { g_queryInfo.superQueryInfo.concurrent = 0; } - + pthread_t *pidsOfSub = NULL; threadInfo *infosOfSub = NULL; //==== create sub threads for query from all sub table of the super table - if ((g_queryInfo.subQueryInfo.sqlCount > 0) && (g_queryInfo.subQueryInfo.threadCnt > 0)) { + if ((g_queryInfo.subQueryInfo.sqlCount > 0) + && (g_queryInfo.subQueryInfo.threadCnt > 0)) { pidsOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(pthread_t)); infosOfSub = malloc(g_queryInfo.subQueryInfo.threadCnt * sizeof(threadInfo)); if ((NULL == pidsOfSub) || (NULL == infosOfSub)) { @@ -5305,7 +5318,7 @@ static int queryTestProcess() { taos_close(taos); exit(-1); } - + int ntables = g_queryInfo.subQueryInfo.childTblCount; int threads = g_queryInfo.subQueryInfo.threadCnt; @@ -5314,12 +5327,12 @@ static int queryTestProcess() { threads = ntables; a = 1; } - + int b = 0; if (threads != 0) { b = ntables % threads; } - + int last = 0; for (int i = 0; i < threads; i++) { threadInfo *t_info = infosOfSub + i; -- GitLab