提交 95c801e6 编写于 作者: H Hui Li

[TD-2735]<feature>support batch create table

上级 1fdc1232
......@@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
int childTblCount;
bool superTblExists; // 0: no, 1: yes
bool childTblExists; // 0: no, 1: yes
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
......@@ -808,13 +809,14 @@ static void init_rand_data() {
static void printfInsertMeta() {
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
......@@ -944,11 +946,12 @@ static void printfInsertMeta() {
static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "================ insert.json parse result START================\n");
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
fprintf(fp, "user: %s\n", g_Dbs.user);
fprintf(fp, "password: %s\n", g_Dbs.password);
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread count: %d\n", g_Dbs.threadCount);
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
fprintf(fp, "user: %s\n", g_Dbs.user);
fprintf(fp, "password: %s\n", g_Dbs.password);
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl)
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -1730,19 +1733,27 @@ static int createDatabases() {
void * createTable(void *sarg)
{
char command[BUFFER_SIZE] = "\0";
{
threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
int len = 0;
int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
if (0 == g_Dbs.use_metric) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
} else {
if (0 == len) {
batchNum = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
}
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
......@@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
}
if (NULL == tagsValBuf) {
free(buffer);
return NULL;
}
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
free(tagsValBuf);
batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
continue;
}
}
if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){
len = 0;
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer);
return NULL;
}
......@@ -1766,7 +1786,12 @@ void * createTable(void *sarg)
lastPrintTime = currentPrintTime;
}
}
if (0 != len) {
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
}
free(buffer);
return NULL;
}
......@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, auto_create_table not found");
goto PARSE_OVER;
}
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
} else {
printf("failed to read json, batch_create_tbl_num not found");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
......@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
b = ntables % threads;
}
TAOS* taos;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) {
printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
exit(-1);
}
}
//TAOS* taos;
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
// if (NULL == taos) {
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
// exit(-1);
// }
//}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) {
......@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
t_info->start_time = start_time;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
t_info->taos = taos;
//t_info->taos = taos;
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
exit(-1);
}
} else {
t_info->taos = NULL;
#ifdef TD_LOWA_CURL
......@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
......@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
double end = getCurrentTime();
taos_close(taos);
//taos_close(taos);
free(pids);
free(infos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册