提交 5b232fd9 编写于 作者: H Hongze Cheng

Merge branch 'develop' into feature/TD-1925_new

...@@ -173,6 +173,7 @@ typedef struct SSuperTable_S { ...@@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
int childTblCount; int childTblCount;
bool superTblExists; // 0: no, 1: yes bool superTblExists; // 0: no, 1: yes
bool childTblExists; // 0: no, 1: yes bool childTblExists; // 0: no, 1: yes
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
...@@ -808,13 +809,14 @@ static void init_rand_data() { ...@@ -808,13 +809,14 @@ static void init_rand_data() {
static void printfInsertMeta() { static void printfInsertMeta() {
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n"); printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port); printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
printf("user: \033[33m%s\033[0m\n", g_Dbs.user); printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
printf("password: \033[33m%s\033[0m\n", g_Dbs.password); printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile); printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount); printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
printf("database[\033[33m%d\033[0m]:\n", i); printf("database[\033[33m%d\033[0m]:\n", i);
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName); printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
...@@ -944,11 +946,12 @@ static void printfInsertMeta() { ...@@ -944,11 +946,12 @@ static void printfInsertMeta() {
static void printfInsertMetaToFile(FILE* fp) { static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, "================ insert.json parse result START================\n"); fprintf(fp, "================ insert.json parse result START================\n");
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port); fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
fprintf(fp, "user: %s\n", g_Dbs.user); fprintf(fp, "user: %s\n", g_Dbs.user);
fprintf(fp, "password: %s\n", g_Dbs.password); fprintf(fp, "password: %s\n", g_Dbs.password);
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile); fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
fprintf(fp, "thread count: %d\n", g_Dbs.threadCount); fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
fprintf(fp, "database count: %d\n", g_Dbs.dbCount); fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
for (int i = 0; i < g_Dbs.dbCount; i++) { for (int i = 0; i < g_Dbs.dbCount; i++) {
...@@ -1730,19 +1733,27 @@ static int createDatabases() { ...@@ -1730,19 +1733,27 @@ static int createDatabases() {
void * createTable(void *sarg) void * createTable(void *sarg)
{ {
char command[BUFFER_SIZE] = "\0";
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
int64_t lastPrintTime = taosGetTimestampMs(); int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
int len = 0;
int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id); //printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
if (0 == g_Dbs.use_metric) { if (0 == g_Dbs.use_metric) {
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable); snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
} else { } else {
if (0 == len) {
batchNum = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
}
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo); tagsValBuf = generateTagVaulesForStb(superTblInfo);
...@@ -1750,13 +1761,22 @@ void * createTable(void *sarg) ...@@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount); tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
} }
if (NULL == tagsValBuf) { if (NULL == tagsValBuf) {
free(buffer);
return NULL; return NULL;
} }
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
free(tagsValBuf); free(tagsValBuf);
batchNum++;
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
continue;
}
} }
if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){ len = 0;
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
free(buffer);
return NULL; return NULL;
} }
...@@ -1766,7 +1786,12 @@ void * createTable(void *sarg) ...@@ -1766,7 +1786,12 @@ void * createTable(void *sarg)
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
} }
if (0 != len) {
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
}
free(buffer);
return NULL; return NULL;
} }
...@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, auto_create_table not found"); printf("failed to read json, auto_create_table not found");
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
} else if (!batchCreateTbl) {
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
} else {
printf("failed to read json, batch_create_tbl_num not found");
goto PARSE_OVER;
}
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) { if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
...@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
b = ntables % threads; b = ntables % threads;
} }
TAOS* taos; //TAOS* taos;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { //if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port); // taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == taos) { // if (NULL == taos) {
printf("connect to server fail, reason: %s\n", taos_errstr(NULL)); // printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
exit(-1); // exit(-1);
} // }
} //}
int32_t timePrec = TSDB_TIME_PRECISION_MILLI; int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
if (0 != precision[0]) { if (0 != precision[0]) {
...@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
t_info->start_time = start_time; t_info->start_time = start_time;
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
t_info->taos = taos; //t_info->taos = taos;
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
exit(-1);
}
} else { } else {
t_info->taos = NULL; t_info->taos = NULL;
#ifdef TD_LOWA_CURL #ifdef TD_LOWA_CURL
...@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
threadInfo *t_info = infos + i; threadInfo *t_info = infos + i;
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos);
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
...@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu ...@@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
double end = getCurrentTime(); double end = getCurrentTime();
taos_close(taos); //taos_close(taos);
free(pids); free(pids);
free(infos); free(infos);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册