提交 867f443c 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. fix sleep time.

...@@ -232,7 +232,8 @@ typedef struct SSuperTable_S { ...@@ -232,7 +232,8 @@ typedef struct SSuperTable_S {
int disorderRatio; // 0: no disorder, >0: x% int disorderRatio; // 0: no disorder, >0: x%
int disorderRange; // ms or us by database precision int disorderRange; // ms or us by database precision
int maxSqlLen; // int maxSqlLen; //
int insertInterval; // insert interval, will override global insert interval
int64_t insertRows; // 0: no limit int64_t insertRows; // 0: no limit
int timeStampStep; int timeStampStep;
char startTimestamp[MAX_TB_NAME_SIZE]; // char startTimestamp[MAX_TB_NAME_SIZE]; //
...@@ -478,8 +479,8 @@ char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)", ...@@ -478,8 +479,8 @@ char *aggreFunc[] = {"*", "count(*)", "avg(col0)", "sum(col0)",
"max(col0)", "min(col0)", "first(col0)", "last(col0)"}; "max(col0)", "min(col0)", "first(col0)", "last(col0)"};
SArguments g_args = { SArguments g_args = {
NULL, // metaFile NULL, // metaFile
0, // test_mode 0, // test_mode
"127.0.0.1", // host "127.0.0.1", // host
6030, // port 6030, // port
"root", // user "root", // user
...@@ -743,7 +744,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) { ...@@ -743,7 +744,8 @@ void parse_args(int argc, char *argv[], SArguments *arguments) {
} }
} }
if (arguments->debug_print || arguments->verbose_print) { if (((arguments->debug_print) && (arguments->metaFile == NULL))
|| arguments->verbose_print) {
printf("###################################################################\n"); printf("###################################################################\n");
printf("# meta file: %s\n", arguments->metaFile); printf("# meta file: %s\n", arguments->metaFile);
printf("# Server IP: %s:%hu\n", printf("# Server IP: %s:%hu\n",
...@@ -1263,6 +1265,7 @@ static void printfInsertMetaToFile(FILE* fp) { ...@@ -1263,6 +1265,7 @@ static void printfInsertMetaToFile(FILE* fp) {
fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource); fprintf(fp, " dataSource: %s\n", g_Dbs.db[i].superTbls[j].dataSource);
fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode); fprintf(fp, " insertMode: %s\n", g_Dbs.db[i].superTbls[j].insertMode);
fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows); fprintf(fp, " insertRows: %"PRId64"\n", g_Dbs.db[i].superTbls[j].insertRows);
fprintf(fp, " insert interval: %d\n", g_Dbs.db[i].superTbls[j].insertInterval);
if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) { if (0 == g_Dbs.db[i].superTbls[j].multiThreadWriteOneTbl) {
fprintf(fp, " multiThreadWriteOneTbl: no\n"); fprintf(fp, " multiThreadWriteOneTbl: no\n");
...@@ -2913,13 +2916,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -2913,13 +2916,13 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
cJSON* insertInterval = cJSON_GetObjectItem(root, "insert_interval"); cJSON* gInsertInterval = cJSON_GetObjectItem(root, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) { if (gInsertInterval && gInsertInterval->type == cJSON_Number) {
g_args.insert_interval = insertInterval->valueint; g_args.insert_interval = gInsertInterval->valueint;
} else if (!insertInterval) { } else if (!gInsertInterval) {
g_args.insert_interval = 0; g_args.insert_interval = 0;
} else { } else {
printf("ERROR: failed to read json, insert_interval not found\n"); fprintf(stderr, "ERROR: failed to read json, insert_interval input mistake\n");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3438,13 +3441,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) { ...@@ -3438,13 +3441,22 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows"); cJSON* insertRows = cJSON_GetObjectItem(stbInfo, "insert_rows");
if (insertRows && insertRows->type == cJSON_Number) { if (insertRows && insertRows->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint; g_Dbs.db[i].superTbls[j].insertRows = insertRows->valueint;
//if (0 == g_Dbs.db[i].superTbls[j].insertRows) {
// g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
//}
} else if (!insertRows) { } else if (!insertRows) {
g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF; g_Dbs.db[i].superTbls[j].insertRows = 0x7FFFFFFFFFFFFFFF;
} else { } else {
printf("ERROR: failed to read json, insert_rows not found\n"); fprintf(stderr, "failed to read json, insert_rows input mistake");
goto PARSE_OVER;
}
cJSON* insertInterval = cJSON_GetObjectItem(stbInfo, "insert_interval");
if (insertInterval && insertInterval->type == cJSON_Number) {
g_Dbs.db[i].superTbls[j].insertInterval = insertInterval->valueint;
} else if (!insertInterval) {
debugPrint("%s() LN%d: stable insert interval be overrided by global %d.\n",
__func__, __LINE__, g_args.insert_interval);
g_Dbs.db[i].superTbls[j].insertInterval = g_args.insert_interval;
} else {
fprintf(stderr, "failed to read json, insert_interval input mistake");
goto PARSE_OVER; goto PARSE_OVER;
} }
...@@ -3993,8 +4005,9 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3993,8 +4005,9 @@ static void syncWriteForNumberOfTblInOneSql(
uint64_t time_counter = winfo->start_time; uint64_t time_counter = winfo->start_time;
int sampleUsePos; int sampleUsePos;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
int64_t st = 0; int64_t st = 0;
int64_t et = 0; int64_t et = 0xffffffff;
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
int32_t tbl_id = 0; int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) { for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
...@@ -4131,14 +4144,14 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4131,14 +4144,14 @@ static void syncWriteForNumberOfTblInOneSql(
inserted += superTblInfo->rowsPerTbl; inserted += superTblInfo->rowsPerTbl;
send_to_server: send_to_server:
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) { if (insert_interval) {
int sleep_time = g_args.insert_interval - (et -st); st = taosGetTimestampUs();
printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) { if (insert_interval > ((et - st)/1000)) {
st = taosGetTimestampMs(); int sleep_time = insert_interval - (et -st);
printf("sleep: %d ms insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
} }
if (0 == strncasecmp(superTblInfo->insertMode, if (0 == strncasecmp(superTblInfo->insertMode,
...@@ -4189,8 +4202,8 @@ send_to_server: ...@@ -4189,8 +4202,8 @@ send_to_server:
goto free_and_statistics; goto free_and_statistics;
} }
} }
if (g_args.insert_interval) { if (insert_interval) {
et = taosGetTimestampMs(); et = taosGetTimestampUs();
} }
break; break;
...@@ -4347,8 +4360,9 @@ static void* syncWrite(void *sarg) { ...@@ -4347,8 +4360,9 @@ static void* syncWrite(void *sarg) {
return NULL; return NULL;
} }
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0xffffffff;
winfo->totalRowsInserted = 0; winfo->totalRowsInserted = 0;
winfo->totalAffectedRows = 0; winfo->totalAffectedRows = 0;
...@@ -4371,6 +4385,10 @@ static void* syncWrite(void *sarg) { ...@@ -4371,6 +4385,10 @@ static void* syncWrite(void *sarg) {
sampleUsePos = samplePos; sampleUsePos = samplePos;
if (insert_interval) {
st = taosGetTimestampUs();
}
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
...@@ -4502,17 +4520,6 @@ static void* syncWrite(void *sarg) { ...@@ -4502,17 +4520,6 @@ static void* syncWrite(void *sarg) {
winfo->totalRowsInserted += k; winfo->totalRowsInserted += k;
if (g_args.insert_interval) {
st = taosGetTimestampMs();
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
printf("sleep: %d ms for insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
}
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
int64_t endTs; int64_t endTs;
int affectedRows; int affectedRows;
...@@ -4559,12 +4566,20 @@ static void* syncWrite(void *sarg) { ...@@ -4559,12 +4566,20 @@ static void* syncWrite(void *sarg) {
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
if (prepared >= insertRows) if (prepared >= insertRows)
break; break;
if (insert_interval) {
et = taosGetTimestampUs();
printf("et: %ld ms st: %ld\n", et, st);
if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000;
printf("sleep: %d ms for insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms
}
}
} // num_of_DPT } // num_of_DPT
if ((tID == winfo->end_table_id) && superTblInfo && if ((tID == winfo->end_table_id) && superTblInfo &&
...@@ -4588,11 +4603,13 @@ free_and_statistics_2: ...@@ -4588,11 +4603,13 @@ free_and_statistics_2:
void callBack(void *param, TAOS_RES *res, int code) { void callBack(void *param, TAOS_RES *res, int code) {
threadInfo* winfo = (threadInfo*)param; threadInfo* winfo = (threadInfo*)param;
SSuperTable* superTblInfo = winfo->superTblInfo;
if (g_args.insert_interval) { int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
winfo->et = taosGetTimestampMs(); if (insert_interval) {
if (winfo->et - winfo->st < 1000) { winfo->et = taosGetTimestampUs();
taosMsleep(1000 - (winfo->et - winfo->st)); // ms if (((winfo->et - winfo->st)/1000) < insert_interval) {
taosMsleep(insert_interval - (winfo->et - winfo->st)/1000); // ms
} }
} }
...@@ -4632,8 +4649,8 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4632,8 +4649,8 @@ void callBack(void *param, TAOS_RES *res, int code) {
} }
} }
if (g_args.insert_interval) { if (insert_interval) {
winfo->st = taosGetTimestampMs(); winfo->st = taosGetTimestampUs();
} }
taos_query_a(winfo->taos, buffer, callBack, winfo); taos_query_a(winfo->taos, buffer, callBack, winfo);
free(buffer); free(buffer);
...@@ -4644,13 +4661,15 @@ void callBack(void *param, TAOS_RES *res, int code) { ...@@ -4644,13 +4661,15 @@ void callBack(void *param, TAOS_RES *res, int code) {
void *asyncWrite(void *sarg) { void *asyncWrite(void *sarg) {
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo;
winfo->st = 0; winfo->st = 0;
winfo->et = 0; winfo->et = 0;
winfo->lastTs = winfo->start_time; winfo->lastTs = winfo->start_time;
if (g_args.insert_interval) { int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
winfo->st = taosGetTimestampMs(); if (insert_interval) {
winfo->st = taosGetTimestampUs();
} }
taos_query_a(winfo->taos, "show databases", callBack, winfo); taos_query_a(winfo->taos, "show databases", callBack, winfo);
...@@ -5076,7 +5095,7 @@ void *superQueryProcess(void *sarg) { ...@@ -5076,7 +5095,7 @@ void *superQueryProcess(void *sarg) {
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
} }
st = taosGetTimestampMs(); st = taosGetTimestampUs();
for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) { for (int i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) { if (0 == strncasecmp(g_queryInfo.queryMode, "taosc", 5)) {
int64_t t1 = taosGetTimestampUs(); int64_t t1 = taosGetTimestampUs();
...@@ -5102,7 +5121,7 @@ void *superQueryProcess(void *sarg) { ...@@ -5102,7 +5121,7 @@ void *superQueryProcess(void *sarg) {
} }
} }
} }
et = taosGetTimestampMs(); et = taosGetTimestampUs();
printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n", printf("==thread[%"PRId64"] complete all sqls to specify tables once queries duration:%.6fs\n\n",
taosGetSelfPthreadId(), (double)(et - st)/1000.0); taosGetSelfPthreadId(), (double)(et - st)/1000.0);
} }
...@@ -5142,7 +5161,7 @@ static void *subQueryProcess(void *sarg) { ...@@ -5142,7 +5161,7 @@ static void *subQueryProcess(void *sarg) {
//printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id); //printf("========sleep duration:%"PRId64 "========inserted rows:%d, table range:%d - %d\n", (1000 - (et - st)), i, winfo->start_table_id, winfo->end_table_id);
} }
st = taosGetTimestampMs(); st = taosGetTimestampUs();
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) { for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) { for (int j = 0; j < g_queryInfo.subQueryInfo.sqlCount; j++) {
memset(sqlstr,0,sizeof(sqlstr)); memset(sqlstr,0,sizeof(sqlstr));
...@@ -5156,12 +5175,12 @@ static void *subQueryProcess(void *sarg) { ...@@ -5156,12 +5175,12 @@ static void *subQueryProcess(void *sarg) {
selectAndGetResult(winfo->taos, sqlstr, tmpFile); selectAndGetResult(winfo->taos, sqlstr, tmpFile);
} }
} }
et = taosGetTimestampMs(); et = taosGetTimestampUs();
printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n", printf("####thread[%"PRId64"] complete all sqls to allocate all sub-tables[%d - %d] once queries duration:%.4fs\n\n",
taosGetSelfPthreadId(), taosGetSelfPthreadId(),
winfo->start_table_id, winfo->start_table_id,
winfo->end_table_id, winfo->end_table_id,
(double)(et - st)/1000.0); (double)(et - st)/1000000.0);
} }
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册