提交 b8bc8759 编写于 作者: sangshuduo's avatar sangshuduo

[TD-3147] <fix>: support insert interval.

上级 9dbde396
......@@ -227,7 +227,7 @@ typedef struct SSuperTable_S {
int disorderRange; // ms or us by database precision
int maxSqlLen; //
int64_t insertRows; // 0: no limit
// int64_t insertRows; // 0: no limit
int timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; //
char sampleFormat[MAX_TB_NAME_SIZE]; // csv, json
......@@ -748,7 +748,7 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
printf("\n");
}
printf("# Insertion interval: %d\n", arguments->insert_interval);
printf("# Number of Columns per record: %d\n", arguments->num_of_RPR);
printf("# Number of records per req: %d\n", arguments->num_of_RPR);
printf("# Number of Threads: %d\n", arguments->num_of_threads);
printf("# Number of Tables: %d\n", arguments->num_of_tables);
printf("# Number of Data per Table: %d\n", arguments->num_of_DPT);
......@@ -1078,7 +1078,7 @@ static int printfInsertMeta() {
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
// printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
printf(" multiThreadWriteOneTbl: \033[33mno\033[0m\n");
......@@ -1225,7 +1225,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
// fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
fprintf(fp, " multiThreadWriteOneTbl: no\n");
......@@ -3223,8 +3223,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, disorderRange not found");
goto PARSE_OVER;
}
/*
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
......@@ -3237,7 +3236,7 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
printf("failed to read json, insert_rows not found");
goto PARSE_OVER;
}
*/
if (NO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
continue;
......@@ -3781,7 +3780,8 @@ static void syncWriteForNumberOfTblInOneSql(
int64_t st = 0;
int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) {
// for (int i = 0; i < superTblInfo->insertRows;) {
for (int i = 0; i < g_args.num_of_RPR;) {
int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int inserted = i;
......@@ -3904,7 +3904,8 @@ static void syncWriteForNumberOfTblInOneSql(
k++;
totalRowsInserted++;
if (inserted >= superTblInfo->insertRows ||
// if (inserted >= superTblInfo->insertRows ||
if (inserted >= g_args.num_of_RPR ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
tID = tbl_id + 1;
printf("config rowsPerTbl and numberOfTblInOneSql not match with max_sql_lenth, please reconfig![lenOfOneRow:%d]\n",
......@@ -4235,13 +4236,16 @@ static void* syncWriteWithStb(void *sarg) {
int64_t time_counter = winfo->start_time;
uint64_t st = 0;
uint64_t et = 0;
debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
/*
debugPrint("DEBUG - %s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__,
superTblInfo->insertRows);
for (int i = 0; i < superTblInfo->insertRows;) {
*/
for (int i = 0; i < g_args.num_of_RPR;) {
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
uint64_t inserted = i;
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) {
int64_t inserted = i;
uint64_t tmp_time = time_counter;
if (i > 0 && g_args.insert_interval
......@@ -4341,8 +4345,10 @@ static void* syncWriteWithStb(void *sarg) {
k++;
i++;
totalRowsInserted++;
debugPrint("DEBUG - %s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted);
if (inserted > superTblInfo->insertRows)
// if (inserted > superTblInfo->insertRows)
if (inserted > g_args.num_of_RPR)
break;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
......@@ -4455,7 +4461,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
char *data = calloc(1, MAX_DATA_SIZE);
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, winfo->tb_prefix, winfo->start_table_id);
if (winfo->counter >= winfo->superTblInfo->insertRows) {
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if (winfo->counter >= g_args.num_of_RPR) {
winfo->start_table_id++;
winfo->counter = 0;
}
......@@ -4481,7 +4488,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
pstr += sprintf(pstr, "%s", data);
winfo->counter++;
if (winfo->counter >= winfo->superTblInfo->insertRows) {
// if (winfo->counter >= winfo->superTblInfo->insertRows) {
if (winfo->counter >= g_args.num_of_RPR) {
break;
}
}
......@@ -4700,11 +4708,12 @@ void *readTable(void *sarg) {
}
int num_of_DPT;
if (rinfo->superTblInfo) {
/* if (rinfo->superTblInfo) {
num_of_DPT = rinfo->superTblInfo->insertRows; // nrecords_per_table;
} else {
*/
num_of_DPT = g_args.num_of_DPT;
}
// }
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables;
......@@ -4767,7 +4776,8 @@ void *readMetric(void *sarg) {
return NULL;
}
int num_of_DPT = rinfo->superTblInfo->insertRows;
// int num_of_DPT = rinfo->superTblInfo->insertRows;
int num_of_DPT = g_args.num_of_DPT;
int num_of_tables = rinfo->end_table_id - rinfo->start_table_id + 1;
int totalData = num_of_DPT * num_of_tables;
bool do_aggreFunc = g_Dbs.do_aggreFunc;
......@@ -4886,7 +4896,8 @@ int insertTestProcess() {
if (g_Dbs.db[i].superTblCount > 0) {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
// if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
if (0 == g_args.num_of_DPT) {
continue;
}
startMultiThreadInsertData(
......@@ -5511,7 +5522,7 @@ void setParaFromArg(){
"2017-07-14 10:40:00.000", MAX_TB_NAME_SIZE);
g_Dbs.db[0].superTbls[0].timeStampStep = 10;
g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
// g_Dbs.db[0].superTbls[0].insertRows = g_args.num_of_DPT;
g_Dbs.db[0].superTbls[0].maxSqlLen = TSDB_PAYLOAD_SIZE;
g_Dbs.db[0].superTbls[0].columnCount = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册