提交 42e8a73f 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert interval. stb case passed.

上级 08b272c6
......@@ -2061,7 +2061,7 @@ static int createDatabases() {
for (int i = 0; i < g_Dbs.dbCount; i++) {
if (g_Dbs.db[i].drop) {
sprintf(command, "drop database if exists %s;", g_Dbs.db[i].dbName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos);
return -1;
......@@ -2129,7 +2129,7 @@ static int createDatabases() {
"precision \'%s\';", g_Dbs.db[i].dbCfg.precision);
}
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
taos_close(taos);
printf("\ncreate database %s failed!\n\n", g_Dbs.db[i].dbName);
......@@ -2141,7 +2141,7 @@ static int createDatabases() {
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// describe super table, if exists
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName, g_Dbs.db[i].superTbls[j].sTblName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d command: %s\n", __func__, __LINE__, command);
if (0 != queryDbExec(taos, command, NO_INSERT_TYPE)) {
g_Dbs.db[i].superTbls[j].superTblExists = TBL_NO_EXISTS;
ret = createSuperTable(taos, g_Dbs.db[i].dbName, &g_Dbs.db[i].superTbls[j], g_Dbs.use_metric);
......@@ -2244,7 +2244,7 @@ static void* createTable(void *sarg)
}
if (0 != len) {
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d buffer: %s\n", __func__, __LINE__, buffer);
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
}
......@@ -3777,14 +3777,6 @@ static void syncWriteForNumberOfTblInOneSql(
int64_t st = 0;
int64_t et = 0;
for (int i = 0; i < superTblInfo->insertRows;) {
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
taosMsleep(g_args.insert_interval - (et - st)); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
int32_t tbl_id = 0;
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; ) {
int inserted = i;
......@@ -3921,6 +3913,16 @@ static void syncWriteForNumberOfTblInOneSql(
inserted += superTblInfo->rowsPerTbl;
send_to_server:
if (g_args.insert_interval && (g_args.insert_interval > (et - st))) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
if (0 == strncasecmp(superTblInfo->insertMode,
"taosc",
strlen("taosc"))) {
......@@ -3930,9 +3932,10 @@ send_to_server:
int64_t endTs;
startTs = taosGetTimestampUs();
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() LN%d buff: %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(
winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows) {
goto free_and_statistics;
} else {
......@@ -3967,7 +3970,10 @@ send_to_server:
goto free_and_statistics;
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
break;
}
......@@ -3980,13 +3986,10 @@ send_to_server:
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
free_and_statistics:
free_and_statistics:
tmfree(buffer);
winfo->totalRowsInserted = totalRowsInserted;
winfo->totalAffectedRows = totalAffectedRows;
......@@ -4080,24 +4083,33 @@ static void* syncWrite(void *sarg) {
srand((uint32_t)time(NULL));
int64_t time_counter = winfo->start_time;
uint64_t st = 0;
uint64_t et = 0;
for (int i = 0; i < g_args.num_of_DPT;) {
for (int tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
int inserted = i;
int64_t tmp_time = time_counter;
char *pstr = buffer;
pstr += sprintf(pstr, "insert into %s.%s%d values", winfo->db_name, g_args.tb_prefix, tID);
pstr += sprintf(pstr,
"insert into %s.%s%d values",
winfo->db_name, g_args.tb_prefix, tID);
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int rand_num = rand() % 100;
int len = -1;
if ((g_args.disorderRatio != 0) && (rand_num < g_args.disorderRange)) {
if ((g_args.disorderRatio != 0)
&& (rand_num < g_args.disorderRange)) {
int64_t d = tmp_time - rand() % 1000000 + rand_num;
len = generateData(data, data_type, ncols_per_record, d, len_of_binary);
len = generateData(data, data_type,
ncols_per_record, d, len_of_binary);
} else {
len = generateData(data, data_type, ncols_per_record, tmp_time += 1000, len_of_binary);
len = generateData(data, data_type,
ncols_per_record, tmp_time += 1000, len_of_binary);
}
//assert(len + pstr - buffer < BUFFER_SIZE);
......@@ -4118,24 +4130,41 @@ static void* syncWrite(void *sarg) {
int64_t endTs;
startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer);
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
debugPrint("DEBUG - %s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1);
if (0 <= affectedRows){
endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs;
if (delay > winfo->maxDelay) winfo->maxDelay = delay;
if (delay < winfo->minDelay) winfo->minDelay = delay;
if (delay > winfo->maxDelay)
winfo->maxDelay = delay;
if (delay < winfo->minDelay)
winfo->minDelay = delay;
winfo->cntDelay++;
winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
if (tID == winfo->end_table_id) {
i = inserted;
time_counter = tmp_time;
}
}
}
return NULL;
}
......@@ -4199,19 +4228,12 @@ static void* syncWriteWithStb(void *sarg) {
return NULL;
}
uint64_t time_counter = winfo->start_time;
int64_t time_counter = winfo->start_time;
uint64_t st = 0;
uint64_t et = 0;
debugPrint("DEBUG - %s() LN%d insertRows=%ld\n", __func__, __LINE__, superTblInfo->insertRows);
for (int i = 0; i < superTblInfo->insertRows;) {
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
taosMsleep(g_args.insert_interval - (et - st)); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; tID++) {
uint64_t inserted = i;
......@@ -4219,8 +4241,8 @@ static void* syncWriteWithStb(void *sarg) {
int sampleUsePos = samplePos;
int k = 0;
while (1)
{
debugPrint("DEBUG - %s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (k = 0; k < g_args.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
......@@ -4263,9 +4285,8 @@ static void* syncWriteWithStb(void *sarg) {
tID);
}
for (k = 0; k < g_args.num_of_RPR;) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample(
pstr + len,
superTblInfo->maxSqlLen - len,
......@@ -4277,7 +4298,7 @@ static void* syncWriteWithStb(void *sarg) {
if (retLen < 0) {
goto free_and_statistics_2;
}
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
} else if (0 == strncasecmp(superTblInfo->dataSource, "rand", strlen("rand"))) {
int rand_num = rand_tinyint() % 100;
if (0 != superTblInfo->disorderRatio
&& rand_num < superTblInfo->disorderRatio) {
......@@ -4287,7 +4308,7 @@ static void* syncWriteWithStb(void *sarg) {
superTblInfo->maxSqlLen - len, d,
superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
} else {
} else {
retLen = generateRowData(
pstr + len,
superTblInfo->maxSqlLen - len,
......@@ -4297,17 +4318,31 @@ static void* syncWriteWithStb(void *sarg) {
if (retLen < 0) {
goto free_and_statistics_2;
}
}
len += retLen;
inserted++;
k++;
totalRowsInserted++;
}
/* len += retLen;
*/
inserted++;
k++;
totalRowsInserted++;
if (inserted >= superTblInfo->insertRows
debugPrint("DEBUG %s() LN%d inserted=%ld k=%d totalRowsInserted=%ld superTblInfo->insertRows=%ld\n", __func__, __LINE__, inserted, k, totalRowsInserted, superTblInfo->insertRows);
if (inserted > superTblInfo->insertRows)
break;
/* if (inserted >= superTblInfo->insertRows
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break;
*/
if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st);
debugPrint("DEBUG sleep: %d ms\n", sleep_time);
taosMsleep(sleep_time); // ms
}
if (g_args.insert_interval) {
st = taosGetTimestampMs();
}
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
......@@ -4317,6 +4352,7 @@ static void* syncWriteWithStb(void *sarg) {
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){
goto free_and_statistics_2;
} else {
......@@ -4351,6 +4387,23 @@ static void* syncWriteWithStb(void *sarg) {
goto free_and_statistics_2;
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
/*
if (loop_cnt) {
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
break;
}
*/
}
if (tID == winfo->end_table_id) {
......@@ -4358,14 +4411,12 @@ static void* syncWriteWithStb(void *sarg) {
superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos;
}
i = inserted;
time_counter = tmp_time;
}
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
}
......@@ -4502,7 +4553,6 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
}
}
int64_t start_time;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->startTimestamp, "now", 3)) {
......@@ -4530,20 +4580,23 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision,
t_info->start_time = start_time;
t_info->minDelay = INT16_MAX;
if ((NULL == superTblInfo) || (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
if ((NULL == superTblInfo) ||
(0 == strncasecmp(superTblInfo->insertMode, "taosc", 5))) {
//t_info->taos = taos;
t_info->taos = taos_connect(
g_Dbs.host, g_Dbs.user,
g_Dbs.password, db_name, g_Dbs.port);
if (NULL == t_info->taos) {
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
printf("connect to server fail from insert sub thread, reason: %s\n",
taos_errstr(NULL));
exit(-1);
}
} else {
t_info->taos = NULL;
}
if ((NULL == superTblInfo) || (0 == superTblInfo->multiThreadWriteOneTbl)) {
if ((NULL == superTblInfo)
|| (0 == superTblInfo->multiThreadWriteOneTbl)) {
t_info->start_table_id = last;
t_info->end_table_id = i < b ? last + a : last + a - 1;
last = t_info->end_table_id + 1;
......@@ -5012,7 +5065,7 @@ static int queryTestProcess() {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
(void)queryDbExec(t_info->taos, sqlStr, NO_INSERT_TYPE);
} else {
t_info->taos = NULL;
......@@ -5123,7 +5176,7 @@ void *subSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)){
return NULL;
}
......@@ -5189,7 +5242,7 @@ void *superSubscribeProcess(void *sarg) {
char sqlStr[MAX_TB_NAME_SIZE*2];
sprintf(sqlStr, "use %s", g_queryInfo.dbName);
debugPrint("DEBUG %s() %d \n", __func__, __LINE__);
debugPrint("DEBUG %s() %d sqlStr: %s\n", __func__, __LINE__, sqlStr);
if (0 != queryDbExec(winfo->taos, sqlStr, NO_INSERT_TYPE)) {
return NULL;
}
......@@ -5554,7 +5607,7 @@ void querySqlFile(TAOS* taos, char* sqlFile)
}
memcpy(cmd + cmd_len, line, read_len);
debugPrint("DEBUG %s() LN%d %s\n", __func__, __LINE__, cmd);
debugPrint("DEBUG %s() LN%d cmd: %s\n", __func__, __LINE__, cmd);
queryDbExec(taos, cmd, NO_INSERT_TYPE);
memset(cmd, 0, MAX_SQL_SIZE);
cmd_len = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册