提交 c23043c0 编写于 作者: S Shuduo Sang

fix child table exists or auto create logic.

上级 6fe3b972
...@@ -240,13 +240,13 @@ typedef struct SColumn_S { ...@@ -240,13 +240,13 @@ typedef struct SColumn_S {
typedef struct SSuperTable_S { typedef struct SSuperTable_S {
char sTblName[MAX_TB_NAME_SIZE+1]; char sTblName[MAX_TB_NAME_SIZE+1];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char childTblPrefix[MAX_TB_NAME_SIZE];
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
uint16_t childTblExists;
int64_t childTblCount; int64_t childTblCount;
bool childTblExists; // 0: no, 1: yes
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
int64_t childTblLimit; int64_t childTblLimit;
uint64_t childTblOffset; uint64_t childTblOffset;
...@@ -793,7 +793,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -793,7 +793,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
if ((argc == i+1) if ((argc == i+1)
|| (!isStringNumber(argv[i+1]))) { || (!isStringNumber(argv[i+1]))) {
printHelp(); printHelp();
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n"); errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
arguments->async_mode = atoi(argv[++i]); arguments->async_mode = atoi(argv[++i]);
...@@ -1417,7 +1417,8 @@ static int printfInsertMeta() { ...@@ -1417,7 +1417,8 @@ static int printfInsertMeta() {
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no"); printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
} else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { } else if (AUTO_CREATE_SUBTBL ==
g_Dbs.db[i].superTbls[j].autoCreateTable) {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes"); printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes");
} else { } else {
printf(" autoCreateTable: \033[33m%s\033[0m\n", "error"); printf(" autoCreateTable: \033[33m%s\033[0m\n", "error");
...@@ -3007,60 +3008,61 @@ static void createChildTables() { ...@@ -3007,60 +3008,61 @@ static void createChildTables() {
char tblColsBuf[MAX_SQL_SIZE]; char tblColsBuf[MAX_SQL_SIZE];
int len; int len;
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.use_metric) { if (g_Dbs.use_metric) {
if (g_Dbs.db[i].superTblCount > 0) { if (g_Dbs.db[i].superTblCount > 0) {
// with super table // with super table
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) { if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
continue; || (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
} continue;
}
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0;
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
__func__, __LINE__, g_totalChildTables, startFrom);
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
g_Dbs.threadCountByCreateTbl,
startFrom,
g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
}
}
} else {
// normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
for (int j = 0; j < g_args.num_of_CPR; j++) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary);
} else {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s", j, g_args.datatype[j]);
}
len = strlen(tblColsBuf);
}
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__, snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
uint64_t startFrom = 0; verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n",
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount; __func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n", startMultiThreadCreateChildTable(
__func__, __LINE__, g_totalChildTables, startFrom); tblColsBuf,
startMultiThreadCreateChildTable( g_Dbs.threadCountByCreateTbl,
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable, 0,
g_Dbs.threadCountByCreateTbl, g_args.num_of_tables,
startFrom, g_Dbs.db[i].dbName,
g_Dbs.db[i].superTbls[j].childTblCount, NULL);
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
} }
}
} else {
// normal table
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
for (int j = 0; j < g_args.num_of_CPR; j++) {
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|| (strncasecmp(g_args.datatype[j],
"NCHAR", strlen("NCHAR")) == 0)) {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary);
} else {
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
", COL%d %s", j, g_args.datatype[j]);
}
len = strlen(tblColsBuf);
}
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n",
__func__, __LINE__,
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
startMultiThreadCreateChildTable(
tblColsBuf,
g_Dbs.threadCountByCreateTbl,
0,
g_args.num_of_tables,
g_Dbs.db[i].dbName,
NULL);
} }
}
} }
/* /*
...@@ -3773,19 +3775,19 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3773,19 +3775,19 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
if (autoCreateTbl if (autoCreateTbl
&& autoCreateTbl->type == cJSON_String && autoCreateTbl->type == cJSON_String
&& autoCreateTbl->valuestring != NULL) { && autoCreateTbl->valuestring != NULL) {
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) { && (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL; g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) { } else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else { } else {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} }
} else if (!autoCreateTbl) { } else if (!autoCreateTbl) {
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
} else { } else {
printf("ERROR: failed to read json, auto_create_table not found\n"); printf("ERROR: failed to read json, auto_create_table not found\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num"); cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
...@@ -4863,26 +4865,29 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k) ...@@ -4863,26 +4865,29 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
static void getTableName(char *pTblName, static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq) threadInfo* pThreadInfo, uint64_t tableSeq)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if ((superTblInfo) if (superTblInfo) {
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) { if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) {
if (superTblInfo->childTblLimit > 0) { if (superTblInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + superTblInfo->childTblName +
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN); (tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
} else {
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
}
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
superTblInfo->childTblPrefix, tableSeq);
}
} else { } else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n", g_args.tb_prefix, tableSeq);
pThreadInfo->threadID, __func__, __LINE__,
pThreadInfo->start_table_from,
pThreadInfo->ntables, tableSeq);
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
} }
} else {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
g_args.tb_prefix, tableSeq);
}
} }
static int64_t generateDataTail( static int64_t generateDataTail(
...@@ -6195,7 +6200,7 @@ static int insertTestProcess() { ...@@ -6195,7 +6200,7 @@ static int insertTestProcess() {
} }
} }
taosMsleep(1000); //taosMsleep(1000);
// create sub threads for inserting data // create sub threads for inserting data
//start = taosGetTimestampMs(); //start = taosGetTimestampMs();
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
...@@ -6839,11 +6844,14 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6839,11 +6844,14 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume( g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]); g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) { if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0]
!= 0) {
sprintf(pThreadInfo->filePath, "%s-%d", sprintf(pThreadInfo->filePath, "%s-%d",
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo); fetchResult(
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
pThreadInfo);
} }
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
...@@ -6856,16 +6864,17 @@ static void *specifiedSubscribe(void *sarg) { ...@@ -6856,16 +6864,17 @@ static void *specifiedSubscribe(void *sarg) {
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0; g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID], taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl( g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] =
SPECIFIED_CLASS, subscribeImpl(
pThreadInfo, SPECIFIED_CLASS,
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq], pThreadInfo,
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID], g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
g_queryInfo.specifiedQueryInfo.subscribeRestart, g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
g_queryInfo.specifiedQueryInfo.subscribeInterval); g_queryInfo.specifiedQueryInfo.subscribeRestart,
g_queryInfo.specifiedQueryInfo.subscribeInterval);
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) { if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
...@@ -7078,7 +7087,7 @@ static void setParaFromArg(){ ...@@ -7078,7 +7087,7 @@ static void setParaFromArg(){
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads; g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
g_Dbs.dbCount = 1; g_Dbs.dbCount = 1;
g_Dbs.db[0].drop = 1; g_Dbs.db[0].drop = true;
tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE); tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE);
g_Dbs.db[0].dbCfg.replica = g_args.replica; g_Dbs.db[0].dbCfg.replica = g_args.replica;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册