提交 471dc330 编写于 作者: sangshuduo's avatar sangshuduo

[TD-3147] <fix>: support insert interval. refactor.

上级 17dc360f
......@@ -458,9 +458,9 @@ void resetAfterAnsiEscape(void) {
}
#endif
static int createDatabases();
static void createChildTables();
static int queryDbExec(TAOS *taos, char *command, int type);
int createDatabases();
void createChildTables();
int queryDbExec(TAOS *taos, char *command, int type);
/* ************ Global variables ************ */
......@@ -774,7 +774,7 @@ void tmfree(char *buf) {
}
}
static int queryDbExec(TAOS *taos, char *command, int type) {
int queryDbExec(TAOS *taos, char *command, int type) {
int i;
TAOS_RES *res = NULL;
int32_t code = -1;
......@@ -1921,10 +1921,14 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
char* dataType = superTbls->columns[colIndex].dataType;
if (strcasecmp(dataType, "BINARY") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "BINARY", superTbls->columns[colIndex].dataLen);
len += snprintf(cols + len, STRING_LEN - len,
", col%d %s(%d)", colIndex, "BINARY",
superTbls->columns[colIndex].dataLen);
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "NCHAR") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s(%d)", colIndex, "NCHAR", superTbls->columns[colIndex].dataLen);
len += snprintf(cols + len, STRING_LEN - len,
", col%d %s(%d)", colIndex, "NCHAR",
superTbls->columns[colIndex].dataLen);
lenOfOneRow += superTbls->columns[colIndex].dataLen + 3;
} else if (strcasecmp(dataType, "INT") == 0) {
len += snprintf(cols + len, STRING_LEN - len, ", col%d %s", colIndex, "INT");
......@@ -2028,65 +2032,77 @@ static int createSuperTable(TAOS * taos, char* dbName, SSuperTable* superTbls,
}
static int createDatabases() {
int createDatabases() {
TAOS * taos = NULL;
int ret = 0;
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, NULL, g_Dbs.port);
if (taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
exit(-1);
return -1;
}
char command[BUFFER_SIZE] = "\0";
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos);
return -1;
}
}
int dataLen = 0;
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "create database if not exists %s ", g_Dbs.db[i].dbName);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "create database if not exists %s ", g_Dbs.db[i].dbName);
if (g_Dbs.db[i].dbCfg.blocks > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "blocks %d ", g_Dbs.db[i].dbCfg.blocks);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "blocks %d ", g_Dbs.db[i].dbCfg.blocks);
}
if (g_Dbs.db[i].dbCfg.cache > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "cache %d ", g_Dbs.db[i].dbCfg.cache);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "cache %d ", g_Dbs.db[i].dbCfg.cache);
}
if (g_Dbs.db[i].dbCfg.days > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "days %d ", g_Dbs.db[i].dbCfg.days);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "days %d ", g_Dbs.db[i].dbCfg.days);
}
if (g_Dbs.db[i].dbCfg.keep > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "keep %d ", g_Dbs.db[i].dbCfg.keep);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "keep %d ", g_Dbs.db[i].dbCfg.keep);
}
if (g_Dbs.db[i].dbCfg.replica > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "replica %d ", g_Dbs.db[i].dbCfg.replica);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "replica %d ", g_Dbs.db[i].dbCfg.replica);
}
if (g_Dbs.db[i].dbCfg.update > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "update %d ", g_Dbs.db[i].dbCfg.update);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "update %d ", g_Dbs.db[i].dbCfg.update);
}
//if (g_Dbs.db[i].dbCfg.maxtablesPerVnode > 0) {
// dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
// dataLen += snprintf(command + dataLen,
// BUFFER_SIZE - dataLen, "tables %d ", g_Dbs.db[i].dbCfg.maxtablesPerVnode);
//}
if (g_Dbs.db[i].dbCfg.minRows > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "minrows %d ", g_Dbs.db[i].dbCfg.minRows);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "minrows %d ", g_Dbs.db[i].dbCfg.minRows);
}
if (g_Dbs.db[i].dbCfg.maxRows > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "maxrows %d ", g_Dbs.db[i].dbCfg.maxRows);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "maxrows %d ", g_Dbs.db[i].dbCfg.maxRows);
}
if (g_Dbs.db[i].dbCfg.comp > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "comp %d ", g_Dbs.db[i].dbCfg.comp);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "comp %d ", g_Dbs.db[i].dbCfg.comp);
}
if (g_Dbs.db[i].dbCfg.walLevel > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "wal %d ", g_Dbs.db[i].dbCfg.walLevel);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "wal %d ", g_Dbs.db[i].dbCfg.walLevel);
}
if (g_Dbs.db[i].dbCfg.cacheLast > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "cachelast %d ", g_Dbs.db[i].dbCfg.cacheLast);
dataLen += snprintf(command + dataLen,
BUFFER_SIZE - dataLen, "cachelast %d ", g_Dbs.db[i].dbCfg.cacheLast);
}
if (g_Dbs.db[i].dbCfg.fsync > 0) {
dataLen += snprintf(command + dataLen, BUFFER_SIZE - dataLen, "fsync %d ", g_Dbs.db[i].dbCfg.fsync);
......@@ -2100,6 +2116,7 @@ static int createDatabases() {
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos);
printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
return -1;
}
printf("\ncreate database %s success!\n\n", g_Dbs.db[i].dbName);
......@@ -2108,7 +2125,7 @@ static int createDatabases() {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
......@@ -2118,8 +2135,11 @@ static int createDatabases() {
}
if (0 != ret) {
printf("\ncreate super table %d failed!\n\n", j);
taos_close(taos);
return -1;
} else {
printf("\ncreate super table %d success!\n\n", j);
}
}
}
......@@ -2135,14 +2155,24 @@ void * createTable(void *sarg)
int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
int buff_len;
if (superTblInfo)
buff_len = superTblInfo->maxSqlLen;
else
buff_len = BUFFER_SIZE;
char *buffer = calloc(superTblInfo->maxSqlLen, 1);
if (buffer == NULL) {
fprintf(stderr, "Memory allocated failed!");
exit(-1);
}
int len = 0;
int batchNum = 0;
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
if (0 == g_Dbs.use_metric) {
snprintf(buffer, BUFFER_SIZE,
snprintf(buffer, buff_len,
"create table if not exists %s.%s%d %s;",
winfo->db_name,
superTblInfo->childTblPrefix, i,
......@@ -2150,11 +2180,11 @@ void * createTable(void *sarg)
} else {
if (0 == len) {
batchNum = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
memset(buffer, 0, buff_len);
len += snprintf(buffer + len,
superTblInfo->maxSqlLen - len, "create table ");
buff_len - len, "create table ");
}
char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo);
......@@ -2208,7 +2238,7 @@ void * createTable(void *sarg)
return NULL;
}
void startMultiThreadCreateChildTable(
int startMultiThreadCreateChildTable(
char* cols, int threads, int ntables,
char* db_name, SSuperTable* superTblInfo) {
pthread_t *pids = malloc(threads * sizeof(pthread_t));
......@@ -2231,7 +2261,7 @@ void startMultiThreadCreateChildTable(
int b = 0;
b = ntables % threads;
int last = 0;
for (int i = 0; i < threads; i++) {
threadInfo *t_info = infos + i;
......@@ -2244,6 +2274,10 @@ void startMultiThreadCreateChildTable(
g_Dbs.password,
db_name,
g_Dbs.port);
if (t_info->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
return -1;
}
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
......@@ -2264,23 +2298,35 @@ void startMultiThreadCreateChildTable(
free(pids);
free(infos);
return 0;
}
static void createChildTables() {
void createChildTables() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue;
}
startMultiThreadCreateChildTable(
continue;
}
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable,
g_Dbs.threadCountByCreateTbl,
g_Dbs.db[i].superTbls[j].childTblCount,
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
}
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
}
} else {
startMultiThreadCreateChildTable(
g_Dbs.db[i].superTbls[j].colsOfCreatChildTable,
g_Dbs.threadCountByCreateTbl,
g_args.num_of_DPT,
g_Dbs.db[i].dbName,
NULL);
}
}
}
......@@ -4524,14 +4570,6 @@ void *readMetric(void *sarg) {
int insertTestProcess() {
debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
return 1;
};
setupForAnsiEscape();
int ret = printfInsertMeta();
resetAfterAnsiEscape();
......@@ -4539,7 +4577,14 @@ int insertTestProcess() {
if (ret == -1)
exit(EXIT_FAILURE);
printfInsertMetaToFile(g_fpOfInsertResult);
debugPrint("DEBUG - %d result file: %s\n", __LINE__, g_Dbs.resultFile);
g_fpOfInsertResult = fopen(g_Dbs.resultFile, "a");
if (NULL == g_fpOfInsertResult) {
fprintf(stderr, "Failed to open %s for save result\n", g_Dbs.resultFile);
return -1;
} {
printfInsertMetaToFile(g_fpOfInsertResult);
}
if (!g_args.answer_yes) {
printf("Press enter key to continue\n\n");
......@@ -4549,7 +4594,10 @@ int insertTestProcess() {
init_rand_data();
// create database and super tables
(void)createDatabases();
if( createDatabases() != 0) {
fclose(g_fpOfInsertResult);
return -1;
}
// pretreatement
prePareSampleData();
......@@ -4561,6 +4609,7 @@ int insertTestProcess() {
start = getCurrentTime();
createChildTables();
end = getCurrentTime();
if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
end - start, g_totalChildTables, g_Dbs.threadCount);
......@@ -4570,7 +4619,6 @@ int insertTestProcess() {
}
taosMsleep(1000);
// create sub threads for inserting data
//start = getCurrentTime();
for (int i = 0; i < g_Dbs.dbCount; i++) {
......@@ -4604,36 +4652,8 @@ int insertTestProcess() {
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount);
if (NULL == g_args.metaFile && false == g_Dbs.insert_only) {
// query data
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_id = 0;
rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
rInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
strcpy(rInfo->tb_prefix, g_Dbs.db[0].superTbls[0].childTblPrefix);
strcpy(rInfo->fp, g_Dbs.resultFile);
if (!g_Dbs.use_metric) {
pthread_create(&read_id, NULL, readTable, rInfo);
} else {
pthread_create(&read_id, NULL, readMetric, rInfo);
}
pthread_join(read_id, NULL);
taos_close(rInfo->taos);
}
postFreeResource();
return 0;
}
......@@ -5259,6 +5279,7 @@ void setParaFromArg(){
g_Dbs.db[0].superTbls[0].tags[1].dataLen = g_args.len_of_binary;
g_Dbs.db[0].superTbls[0].tagCount = 2;
} else {
g_Dbs.threadCountByCreateTbl = 1;
g_Dbs.db[0].superTbls[0].tagCount = 0;
}
......@@ -5306,7 +5327,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
printf("failed to open file %s, reason:%s\n", sqlFile, strerror(errno));
return;
}
int read_len = 0;
char * cmd = calloc(1, MAX_SQL_SIZE);
size_t cmd_len = 0;
......@@ -5314,7 +5335,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
size_t line_len = 0;
double t = getCurrentTime();
while ((read_len = tgetline(&line, &line_len, fp)) != -1) {
if (read_len >= MAX_SQL_SIZE) continue;
line[--read_len] = '\0';
......@@ -5346,23 +5367,12 @@ void querySqlFile(TAOS* taos, char* sqlFile)
return;
}
int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args);
debugPrint("DEBUG - meta file: %s\n", g_args.metaFile);
if (g_args.metaFile) {
initOfInsertMeta();
initOfQueryMeta();
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
return 1;
}
void testMetaFile() {
if (INSERT_MODE == g_args.test_mode) {
if (g_Dbs.cfgDir[0]) taos_options(TSDB_OPTION_CONFIGDIR, g_Dbs.cfgDir);
(void)insertTestProcess();
insertTestProcess();
} else if (QUERY_MODE == g_args.test_mode) {
if (g_queryInfo.cfgDir[0])
taos_options(TSDB_OPTION_CONFIGDIR, g_queryInfo.cfgDir);
......@@ -5374,46 +5384,48 @@ int main(int argc, char *argv[]) {
} else {
;
}
} else {
memset(&g_Dbs, 0, sizeof(SDbs));
g_args.test_mode = INSERT_MODE;
setParaFromArg();
}
if (NULL != g_args.sqlFile) {
TAOS* qtaos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
querySqlFile(qtaos, g_args.sqlFile);
taos_close(qtaos);
return 0;
}
void testCmdLine() {
(void)insertTestProcess();
if (g_Dbs.insert_only) return 0;
g_args.test_mode = INSERT_MODE;
insertTestProcess();
if (g_Dbs.insert_only)
return;
// select
if (false == g_Dbs.insert_only) {
// query data
pthread_t read_id;
threadInfo *rInfo = malloc(sizeof(threadInfo));
rInfo->start_time = 1500000000000; // 2017-07-14 10:40:00.000
rInfo->start_table_id = 0;
rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1;
//rInfo->do_aggreFunc = g_Dbs.do_aggreFunc;
//rInfo->nrecords_per_table = g_Dbs.db[0].superTbls[0].insertRows;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
if (g_args.use_metric) {
rInfo->end_table_id = g_Dbs.db[0].superTbls[0].childTblCount - 1;
rInfo->superTblInfo = &g_Dbs.db[0].superTbls[0];
strcpy(rInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix);
} else {
rInfo->end_table_id = g_args.num_of_tables -1;
strcpy(rInfo->tb_prefix, g_args.tb_prefix);
}
rInfo->taos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
strcpy(rInfo->tb_prefix,
g_Dbs.db[0].superTbls[0].childTblPrefix);
if (rInfo->taos == NULL) {
fprintf(stderr, "Failed to connect to TDengine, reason:%s\n", taos_errstr(NULL));
free(rInfo);
exit(-1);
}
strcpy(rInfo->fp, g_Dbs.resultFile);
if (!g_Dbs.use_metric) {
......@@ -5425,9 +5437,42 @@ int main(int argc, char *argv[]) {
taos_close(rInfo->taos);
free(rInfo);
}
}
int main(int argc, char *argv[]) {
parse_args(argc, argv, &g_args);
debugPrint("DEBUG - meta file: %s\n", g_args.metaFile);
if (g_args.metaFile) {
initOfInsertMeta();
initOfQueryMeta();
if (false == getInfoFromJsonFile(g_args.metaFile)) {
printf("Failed to read %s\n", g_args.metaFile);
return 1;
}
testMetaFile();
} else {
memset(&g_Dbs, 0, sizeof(SDbs));
setParaFromArg();
if (NULL != g_args.sqlFile) {
TAOS* qtaos = taos_connect(
g_Dbs.host,
g_Dbs.user,
g_Dbs.password,
g_Dbs.db[0].dbName,
g_Dbs.port);
querySqlFile(qtaos, g_args.sqlFile);
taos_close(qtaos);
} else {
testCmdLine();
}
}
taos_cleanup();
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册