提交 e891ecc8 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: change insert rate to insert interval.

上级 069b9888
...@@ -266,7 +266,7 @@ typedef struct SSuperTable_S { ...@@ -266,7 +266,7 @@ typedef struct SSuperTable_S {
char childTblPrefix[MAX_TB_NAME_SIZE]; char childTblPrefix[MAX_TB_NAME_SIZE];
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful char insertMode[MAX_TB_NAME_SIZE]; // taosc, restful
int insertRate; // 0: unlimit > 0 rows/s int insertInterval; // interval time between insert twice
int multiThreadWriteOneTbl; // 0: no, 1: yes int multiThreadWriteOneTbl; // 0: no, 1: yes
int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl int numberOfTblInOneSql; // 0/1: one table, > 1: number of tbl
...@@ -1015,7 +1015,7 @@ static int printfInsertMeta() { ...@@ -1015,7 +1015,7 @@ static int printfInsertMeta() {
printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix); printf(" childTblPrefix: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource); printf(" dataSource: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].dataSource);
printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode); printf(" insertMode: \033[33m%s\033[0m\n", g_Dbs.db[i].superTbls[j].insertMode);
printf(" insertRate: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertRate); printf(" insertInterval: \033[33m%d\033[0m\n", g_Dbs.db[i].superTbls[j].insertInterval);
printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows); printf(" insertRows: \033[33m%"PRId64"\033[0m\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
...@@ -1153,7 +1153,7 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1153,7 +1153,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix); fprintf(fp, " childTblPrefix: %s\n", g_Dbs.db[i].superTbls[j].childTblPrefix);
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRate: %d\n", g_Dbs.db[i].superTbls[j].insertRate); fprintf(fp, " insertInterval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
...@@ -2997,11 +2997,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2997,11 +2997,11 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertRate = cJSON_GetObjectItem(stbInfo, "insert_rate"); cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertRate && insertRate->type == cJSON_Number) { if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRate = insertRate->valueint; g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
} else if (!insertRate) { } else if (!insertInterval) {
g_Dbs.db[i].superTbls[j].insertRate = 0; g_Dbs.db[i].superTbls[j].insertInterval = 0;
} else { } else {
printf("failed to read json, insert_rate not found"); printf("failed to read json, insert_rate not found");
goto PARSE_OVER; goto PARSE_OVER;
...@@ -3558,12 +3558,11 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa ...@@ -3558,12 +3558,11 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
if (superTblInfo->insertRate && (et - st) < 1000) { if (superTblInfo->insertInterval && (superTblInfo->insertInterval > (et - st))) {
taosMsleep(1000 - (et - st)); // ms taosMsleep(superTblInfo->insertInterval - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
} }
if (superTblInfo->insertRate) { if (superTblInfo->insertInterval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
...@@ -3709,7 +3708,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa ...@@ -3709,7 +3708,7 @@ void syncWriteForNumberOfTblInOneSql(threadInfo *winfo, FILE *fp, char* sampleDa
} }
} }
if (superTblInfo->insertRate) { if (superTblInfo->insertInterval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
...@@ -3817,12 +3816,11 @@ void *syncWrite(void *sarg) { ...@@ -3817,12 +3816,11 @@ void *syncWrite(void *sarg) {
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
if (superTblInfo->insertRate && (et - st) < 1000) { if (superTblInfo->insertInterval && (superTblInfo->insertInterval > (et - st) )) {
taosMsleep(1000 - (et - st)); // ms taosMsleep(superTblInfo->insertInterval - (et - st)); // ms
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
} }
if (superTblInfo->insertRate) { if (superTblInfo->insertInterval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
...@@ -3950,7 +3948,7 @@ void *syncWrite(void *sarg) { ...@@ -3950,7 +3948,7 @@ void *syncWrite(void *sarg) {
} }
} }
if (superTblInfo->insertRate) { if (superTblInfo->insertInterval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
...@@ -3971,7 +3969,7 @@ void *syncWrite(void *sarg) { ...@@ -3971,7 +3969,7 @@ void *syncWrite(void *sarg) {
void callBack(void *param, TAOS_RES *res, int code) { void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param; threadInfo* winfo = (threadInfo*)param;
if (winfo->superTblInfo->insertRate) { if (winfo->superTblInfo->insertInterval) {
winfo->et = taosGetTimestampMs(); winfo->et = taosGetTimestampMs();
if (winfo->et - winfo->st < 1000) { if (winfo->et - winfo->st < 1000) {
taosMsleep(1000 - (winfo->et - winfo->st)); // ms taosMsleep(1000 - (winfo->et - winfo->st)); // ms
...@@ -4013,7 +4011,7 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4013,7 +4011,7 @@ void callBack(void *param, TAOS_RES *res, int code) {
} }
} }
if (winfo->superTblInfo->insertRate) { if (winfo->superTblInfo->insertInterval) {
winfo->st = taosGetTimestampMs(); winfo->st = taosGetTimestampMs();
} }
taos_query_a(winfo->taos, buffer, callBack, winfo); taos_query_a(winfo->taos, buffer, callBack, winfo);
...@@ -4033,9 +4031,9 @@ void *asyncWrite(void *sarg) { ...@@ -4033,9 +4031,9 @@ void *asyncWrite(void *sarg) {
// winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280) / winfo->superTblInfo->lenOfOneRow; // winfo->nrecords_per_request = (winfo->superTblInfo->maxSqlLen - 1280) / winfo->superTblInfo->lenOfOneRow;
//} //}
if (0 != winfo->superTblInfo->insertRate) { if (0 != winfo->superTblInfo->insertInterval) {
if (winfo->nrecords_per_request >= winfo->superTblInfo->insertRate) { if (winfo->nrecords_per_request >= winfo->superTblInfo->insertInterval) {
winfo->nrecords_per_request = winfo->superTblInfo->insertRate; winfo->nrecords_per_request = winfo->superTblInfo->insertInterval;
} }
} }
...@@ -4055,7 +4053,7 @@ void *asyncWrite(void *sarg) { ...@@ -4055,7 +4053,7 @@ void *asyncWrite(void *sarg) {
winfo->et = 0; winfo->et = 0;
winfo->lastTs = winfo->start_time; winfo->lastTs = winfo->start_time;
if (winfo->superTblInfo->insertRate) { if (winfo->superTblInfo->insertInterval) {
winfo->st = taosGetTimestampMs(); winfo->st = taosGetTimestampMs();
} }
taos_query_a(winfo->taos, "show databases", callBack, winfo); taos_query_a(winfo->taos, "show databases", callBack, winfo);
...@@ -4951,7 +4949,7 @@ void setParaFromArg(){ ...@@ -4951,7 +4949,7 @@ void setParaFromArg(){
g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL; g_Dbs.db[0].superTbls[0].autoCreateTable = PRE_CREATE_SUBTBL;
g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].superTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS; g_Dbs.db[0].superTbls[0].childTblExists = TBL_NO_EXISTS;
g_Dbs.db[0].superTbls[0].insertRate = 0; g_Dbs.db[0].superTbls[0].insertInterval = 0;
g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange; g_Dbs.db[0].superTbls[0].disorderRange = g_args.disorderRange;
g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio; g_Dbs.db[0].superTbls[0].disorderRatio = g_args.disorderRatio;
tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE); tstrncpy(g_Dbs.db[0].superTbls[0].childTblPrefix, g_args.tb_prefix, MAX_TB_NAME_SIZE);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册