未验证 提交 49b674c3 编写于 作者: sangshuduo's avatar sangshuduo 提交者: GitHub

[TD-3433] <fix>: fix child table count issue. (#5552)

Co-authored-by: NShuduo Sang <sdsang@taosdata.com>
上级 96fec551
...@@ -2541,6 +2541,7 @@ static void* createTable(void *sarg) ...@@ -2541,6 +2541,7 @@ static void* createTable(void *sarg)
static int startMultiThreadCreateChildTable( static int startMultiThreadCreateChildTable(
char* cols, int threads, int startFrom, int ntables, char* cols, int threads, int startFrom, int ntables,
char* db_name, SSuperTable* superTblInfo) { char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t)); pthread_t *pids = malloc(threads * sizeof(pthread_t));
threadInfo *infos = malloc(threads * sizeof(threadInfo)); threadInfo *infos = malloc(threads * sizeof(threadInfo));
...@@ -2625,12 +2626,12 @@ static void createChildTables() { ...@@ -2625,12 +2626,12 @@ static void createChildTables() {
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__, verbosePrint("%s() LN%d: create %d child tables from %d\n", __func__, __LINE__,
g_totalChildTables, startFrom); g_totalChildTables, startFrom);
startMultiThreadCreateChildTable( startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl, g_Dbs.threadCountByCreateTbl,
startFrom, startFrom,
g_totalChildTables, g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j])); g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
} }
} else { } else {
...@@ -5314,16 +5315,16 @@ static int insertTestProcess() { ...@@ -5314,16 +5315,16 @@ static int insertTestProcess() {
continue; continue;
} }
startMultiThreadInsertData( startMultiThreadInsertData(
g_Dbs.threadCount, g_Dbs.threadCount,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision, g_Dbs.db[i].dbCfg.precision,
superTblInfo); superTblInfo);
} }
} else { } else {
startMultiThreadInsertData( startMultiThreadInsertData(
g_Dbs.threadCount, g_Dbs.threadCount,
g_Dbs.db[i].dbName, g_Dbs.db[i].dbName,
g_Dbs.db[i].dbCfg.precision, g_Dbs.db[i].dbCfg.precision,
NULL); NULL);
} }
} }
...@@ -5343,7 +5344,7 @@ static int insertTestProcess() { ...@@ -5343,7 +5344,7 @@ static int insertTestProcess() {
} }
static void *superQueryProcess(void *sarg) { static void *superQueryProcess(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
//char sqlStr[MAX_TB_NAME_SIZE*2]; //char sqlStr[MAX_TB_NAME_SIZE*2];
//sprintf(sqlStr, "use %s", g_queryInfo.dbName); //sprintf(sqlStr, "use %s", g_queryInfo.dbName);
...@@ -5352,39 +5353,41 @@ static void *superQueryProcess(void *sarg) { ...@@ -5352,39 +5353,41 @@ static void *superQueryProcess(void *sarg) {
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0;
while (1) { while (1) {
if (g_queryInfo.superQueryInfo.rate && (et - st) < (int64_t)g_queryInfo.superQueryInfo.rate*1000) { if (g_queryInfo.superQueryInfo.rate && (et - st) <
(int64_t)g_queryInfo.superQueryInfo.rate*1000) {
taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms taosMsleep(g_queryInfo.superQueryInfo.rate*1000 - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_from, winfo->end_table_to);
} }
st = taosGetTimestampUs(); st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.superQueryInfo.result[i][0] != 0) { if (g_queryInfo.superQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", g_queryInfo.superQueryInfo.result[i], winfo->threadID); sprintf(tmpFile, "%s-%d",
g_queryInfo.superQueryInfo.result[i], winfo->threadID);
} }
selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile); selectAndGetResult(winfo->taos, g_queryInfo.superQueryInfo.sql[i], tmpFile);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[taosc] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
} else { } else {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
int retCode = postProceSql(g_queryInfo.host, int retCode = postProceSql(g_queryInfo.host,
g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]); g_queryInfo.port, g_queryInfo.superQueryInfo.sql[i]);
int64_t t2 = taosGetTimestampUs(); int64_t t2 = taosGetTimestampUs();
printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n", printf("=[restful] thread[%"PRId64"] complete one sql, Spent %f s\n",
taosGetSelfPthreadId(), (t2 - t1)/1000000.0); taosGetSelfPthreadId(), (t2 - t1)/1000000.0);
if (0 != retCode) { if (0 != retCode) {
printf("====restful return fail, threadID[%d]\n", winfo->threadID); printf("====restful return fail, threadID[%d]\n", winfo->threadID);
return NULL; return NULL;
} }
} }
} }
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0); taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} }
return NULL; return NULL;
...@@ -5393,28 +5396,28 @@ static void *superQueryProcess(void *sarg) { ...@@ -5393,28 +5396,28 @@ static void *superQueryProcess(void *sarg) {
static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) { static void replaceSubTblName(char* inSql, char* outSql, int tblIndex) {
char sourceString[32] = "xxxx"; char sourceString[32] = "xxxx";
char subTblName[MAX_TB_NAME_SIZE*3]; char subTblName[MAX_TB_NAME_SIZE*3];
sprintf(subTblName, "%s.%s", sprintf(subTblName, "%s.%s",
g_queryInfo.dbName, g_queryInfo.dbName,
g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN); g_queryInfo.subQueryInfo.childTblName + tblIndex*TSDB_TABLE_NAME_LEN);
//printf("inSql: %s\n", inSql); //printf("inSql: %s\n", inSql);
char* pos = strstr(inSql, sourceString); char* pos = strstr(inSql, sourceString);
if (0 == pos) { if (0 == pos) {
return; return;
} }
tstrncpy(outSql, inSql, pos - inSql + 1); tstrncpy(outSql, inSql, pos - inSql + 1);
//printf("1: %s\n", outSql); //printf("1: %s\n", outSql);
strcat(outSql, subTblName); strcat(outSql, subTblName);
//printf("2: %s\n", outSql); //printf("2: %s\n", outSql);
strcat(outSql, pos+strlen(sourceString)); strcat(outSql, pos+strlen(sourceString));
//printf("3: %s\n", outSql); //printf("3: %s\n", outSql);
} }
static void *subQueryProcess(void *sarg) { static void *subQueryProcess(void *sarg) {
char sqlstr[1024]; char sqlstr[1024];
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
int64_t st = 0; int64_t st = 0;
int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000; int64_t et = (int64_t)g_queryInfo.subQueryInfo.rate*1000;
while (1) { while (1) {
...@@ -5431,29 +5434,29 @@ static void *subQueryProcess(void *sarg) { ...@@ -5431,29 +5434,29 @@ static void *subQueryProcess(void *sarg) {
replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i); replaceSubTblName(g_queryInfo.subQueryInfo.sql[j], sqlstr, i);
char tmpFile[MAX_FILE_NAME_LEN*2] = {0}; char tmpFile[MAX_FILE_NAME_LEN*2] = {0};
if (g_queryInfo.subQueryInfo.result[i][0] != 0) { if (g_queryInfo.subQueryInfo.result[i][0] != 0) {
sprintf(tmpFile, "%s-%d", sprintf(tmpFile, "%s-%d",
g_queryInfo.subQueryInfo.result[i], g_queryInfo.subQueryInfo.result[i],
winfo->threadID); winfo->threadID);
} }
selectAndGetResult(winfo->taos, sqlstr, tmpFile); selectAndGetResult(winfo->taos, sqlstr, tmpFile);
} }
} }
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(), taosGetSelfPthreadId(),
winfo->start_table_from, winfo->start_table_from,
winfo->end_table_to, winfo->end_table_to,
(double)(et - st)/1000000.0); (double)(et - st)/1000000.0);
} }
return NULL; return NULL;
} }
static int queryTestProcess() { static int queryTestProcess() {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
g_queryInfo.user, g_queryInfo.user,
g_queryInfo.password, g_queryInfo.password,
NULL, NULL,
g_queryInfo.port); g_queryInfo.port);
if (taos == NULL) { if (taos == NULL) {
errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL)); errorPrint( "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
...@@ -5466,7 +5469,7 @@ static int queryTestProcess() { ...@@ -5466,7 +5469,7 @@ static int queryTestProcess() {
g_queryInfo.subQueryInfo.sTblName, g_queryInfo.subQueryInfo.sTblName,
&g_queryInfo.subQueryInfo.childTblName, &g_queryInfo.subQueryInfo.childTblName,
&g_queryInfo.subQueryInfo.childTblCount); &g_queryInfo.subQueryInfo.childTblCount);
} }
printfQueryMeta(); printfQueryMeta();
...@@ -5563,7 +5566,7 @@ static int queryTestProcess() { ...@@ -5563,7 +5566,7 @@ static int queryTestProcess() {
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
threadInfo *t_info = infosOfSub + i; threadInfo *t_info = infosOfSub + i;
t_info->threadID = i; t_info->threadID = i;
t_info->start_table_from = startFrom; t_info->start_table_from = startFrom;
t_info->ntables = i<b?a+1:a; t_info->ntables = i<b?a+1:a;
t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1; t_info->end_table_to = i < b ? startFrom + a : startFrom + a - 1;
...@@ -5575,7 +5578,7 @@ static int queryTestProcess() { ...@@ -5575,7 +5578,7 @@ static int queryTestProcess() {
g_queryInfo.subQueryInfo.threadCnt = threads; g_queryInfo.subQueryInfo.threadCnt = threads;
} else { } else {
g_queryInfo.subQueryInfo.threadCnt = 0; g_queryInfo.subQueryInfo.threadCnt = 0;
} }
for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.concurrent; i++) {
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册