提交 724fb5e0 编写于 作者: S Shuduo Sang

[TD-3147] <fix>: support insert interval. stable works.

上级 0311febd
...@@ -3753,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable* ...@@ -3753,6 +3753,7 @@ int generateRowData(char* dataBuf, int maxLen, int64_t timestamp, SSuperTable*
return (-1); return (-1);
} }
} }
dataLen -= 2; dataLen -= 2;
dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")"); dataLen += snprintf(dataBuf + dataLen, maxLen - dataLen, ")");
...@@ -3820,7 +3821,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3820,7 +3821,7 @@ static void syncWriteForNumberOfTblInOneSql(
if (0 == len) { if (0 == len) {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
"insert into %s.%s%d using %s.%s tags %s values ", "insert into %s.%s%d using %s.%s tags %s values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tbl_id, tbl_id,
...@@ -3830,7 +3831,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3830,7 +3831,7 @@ static void syncWriteForNumberOfTblInOneSql(
} else { } else {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
" %s.%s%d using %s.%s tags %s values ", " %s.%s%d using %s.%s tags %s values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tbl_id, tbl_id,
...@@ -3843,13 +3844,13 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3843,13 +3844,13 @@ static void syncWriteForNumberOfTblInOneSql(
if (0 == len) { if (0 == len) {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
"insert into %s.%s values ", "insert into %s.%s values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} else { } else {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
" %s.%s values ", " %s.%s values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tbl_id * TSDB_TABLE_NAME_LEN);
} }
...@@ -3857,14 +3858,14 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3857,14 +3858,14 @@ static void syncWriteForNumberOfTblInOneSql(
if (0 == len) { if (0 == len) {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values ", "insert into %s.%s%d values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tbl_id); tbl_id);
} else { } else {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
" %s.%s%d values ", " %s.%s%d values ",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tbl_id); tbl_id);
...@@ -3899,7 +3900,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3899,7 +3900,7 @@ static void syncWriteForNumberOfTblInOneSql(
} else { } else {
retLen = generateRowData(pstr + len, retLen = generateRowData(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
tmp_time += superTblInfo->timeStampStep, tmp_time += superTblInfo->timeStampStep,
superTblInfo); superTblInfo);
} }
if (retLen < 0) { if (retLen < 0) {
...@@ -3957,16 +3958,16 @@ send_to_server: ...@@ -3957,16 +3958,16 @@ send_to_server:
if (delay < winfo->minDelay) winfo->minDelay = delay; if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay; winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
winfo->totalAffectedRows += affectedRows;
} }
totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
//int64_t t2 = taosGetTimestampMs(); //int64_t t2 = taosGetTimestampMs();
...@@ -4108,7 +4109,7 @@ static void* syncWrite(void *sarg) { ...@@ -4108,7 +4109,7 @@ static void* syncWrite(void *sarg) {
char *pstr = buffer; char *pstr = buffer;
pstr += sprintf(pstr, pstr += sprintf(pstr,
"insert into %s.%s%d values", "insert into %s.%s%d values ",
winfo->db_name, g_args.tb_prefix, tID); winfo->db_name, g_args.tb_prefix, tID);
int k; int k;
for (k = 0; k < g_args.num_of_RPR;) { for (k = 0; k < g_args.num_of_RPR;) {
...@@ -4146,16 +4147,16 @@ static void* syncWrite(void *sarg) { ...@@ -4146,16 +4147,16 @@ static void* syncWrite(void *sarg) {
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
//queryDB(winfo->taos, buffer); //queryDB(winfo->taos, buffer);
if (i > 0 && g_args.insert_interval if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) { && (g_args.insert_interval > (et - st) )) {
int sleep_time = g_args.insert_interval - (et -st); int sleep_time = g_args.insert_interval - (et -st);
printf("sleep: %d ms specified by insert_interval\n", sleep_time); printf("sleep: %d ms specified by insert_interval\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
if (g_args.insert_interval) { if (g_args.insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer); verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int affectedRows = queryDbExec(winfo->taos, buffer, 1); int affectedRows = queryDbExec(winfo->taos, buffer, 1);
...@@ -4174,13 +4175,13 @@ static void* syncWrite(void *sarg) { ...@@ -4174,13 +4175,13 @@ static void* syncWrite(void *sarg) {
} }
verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows); verbosePrint("%s() LN%d: totalaffectedRows:%"PRId64"\n", __func__, __LINE__, winfo->totalAffectedRows);
if (g_args.insert_interval) { if (g_args.insert_interval) {
et = taosGetTimestampMs(); et = taosGetTimestampMs();
} }
if (tblInserted >= g_args.num_of_DPT) { if (tblInserted >= g_args.num_of_DPT) {
break; break;
} }
} // num_of_DPT } // num_of_DPT
} // tId } // tId
...@@ -4194,8 +4195,6 @@ static void* syncWrite(void *sarg) { ...@@ -4194,8 +4195,6 @@ static void* syncWrite(void *sarg) {
static void* syncWriteWithStb(void *sarg) { static void* syncWriteWithStb(void *sarg) {
uint64_t totalRowsInserted = 0;
uint64_t totalAffectedRows = 0;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
...@@ -4255,14 +4254,20 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4255,14 +4254,20 @@ static void* syncWriteWithStb(void *sarg) {
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0; uint64_t et = 0;
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, winfo->totalRowsInserted = 0;
superTblInfo->insertRows); winfo->totalAffectedRows = 0;
int sampleUsePos;
uint64_t tmp_time;
debugPrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, superTblInfo->insertRows);
for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id; for (uint32_t tID = winfo->start_table_id; tID <= winfo->end_table_id;
tID++) { tID++) {
for (int i = 0; i < superTblInfo->insertRows;) { for (int i = 0; i < superTblInfo->insertRows;) {
int64_t inserted = i; int64_t tblInserted = i;
uint64_t tmp_time = time_counter; tmp_time = time_counter;
if (i > 0 && g_args.insert_interval if (i > 0 && g_args.insert_interval
&& (g_args.insert_interval > (et - st) )) { && (g_args.insert_interval > (et - st) )) {
...@@ -4275,14 +4280,15 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4275,14 +4280,15 @@ static void* syncWriteWithStb(void *sarg) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
} }
int sampleUsePos = samplePos; sampleUsePos = samplePos;
verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR); verbosePrint("%s() LN%d num_of_RPR=%d\n", __func__, __LINE__, g_args.num_of_RPR);
for (int k = 0; k < g_args.num_of_RPR;) {
int len = 0;
memset(buffer, 0, superTblInfo->maxSqlLen);
char *pstr = buffer;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) { memset(buffer, 0, superTblInfo->maxSqlLen);
int len = 0;
char *pstr = buffer;
if (AUTO_CREATE_SUBTBL == superTblInfo->autoCreateTable) {
char* tagsValBuf = NULL; char* tagsValBuf = NULL;
if (0 == superTblInfo->tagSource) { if (0 == superTblInfo->tagSource) {
tagsValBuf = generateTagVaulesForStb(superTblInfo); tagsValBuf = generateTagVaulesForStb(superTblInfo);
...@@ -4305,21 +4311,23 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4305,21 +4311,23 @@ static void* syncWriteWithStb(void *sarg) {
superTblInfo->sTblName, superTblInfo->sTblName,
tagsValBuf); tagsValBuf);
tmfree(tagsValBuf); tmfree(tagsValBuf);
} else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) { } else if (TBL_ALREADY_EXISTS == superTblInfo->childTblExists) {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
"insert into %s.%s values", "insert into %s.%s values",
winfo->db_name, winfo->db_name,
superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN); superTblInfo->childTblName + tID * TSDB_TABLE_NAME_LEN);
} else { } else {
len += snprintf(pstr + len, len += snprintf(pstr + len,
superTblInfo->maxSqlLen - len, superTblInfo->maxSqlLen - len,
"insert into %s.%s%d values", "insert into %s.%s%d values",
winfo->db_name, winfo->db_name,
superTblInfo->childTblPrefix, superTblInfo->childTblPrefix,
tID); tID);
} }
int k;
for (k = 0; k < g_args.num_of_RPR;) {
int retLen = 0; int retLen = 0;
if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) { if (0 == strncasecmp(superTblInfo->dataSource, "sample", strlen("sample"))) {
retLen = getRowDataFromSample( retLen = getRowDataFromSample(
...@@ -4340,7 +4348,8 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4340,7 +4348,8 @@ static void* syncWriteWithStb(void *sarg) {
int64_t d = tmp_time - rand() % superTblInfo->disorderRange; int64_t d = tmp_time - rand() % superTblInfo->disorderRange;
retLen = generateRowData( retLen = generateRowData(
pstr + len, pstr + len,
superTblInfo->maxSqlLen - len, d, superTblInfo->maxSqlLen - len,
d,
superTblInfo); superTblInfo);
//printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d); //printf("disorder rows, rand_num:%d, last ts:%"PRId64" current ts:%"PRId64"\n", rand_num, tmp_time, d);
} else { } else {
...@@ -4354,24 +4363,21 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4354,24 +4363,21 @@ static void* syncWriteWithStb(void *sarg) {
goto free_and_statistics_2; goto free_and_statistics_2;
} }
} }
/* len += retLen;
*/ len += retLen;
inserted++; verbosePrint("%s() LN%d retLen=%d len=%d k=%d buffer=%s\n", __func__, __LINE__, retLen, len, k, buffer);
tblInserted++;
k++; k++;
i++; i++;
totalRowsInserted++;
verbosePrint("%s() LN%d totalInserted=%"PRId64" inserted=%"PRId64"\n", __func__, __LINE__, totalRowsInserted, inserted); if (tblInserted >= superTblInfo->insertRows)
if (inserted > superTblInfo->insertRows)
break; break;
/* if (inserted >= superTblInfo->insertRows }
|| (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128))
break; winfo->totalRowsInserted += k;
*/
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) { if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
//printf("===== sql: %s \n\n", buffer);
//int64_t t1 = taosGetTimestampMs();
int64_t startTs; int64_t startTs;
int64_t endTs; int64_t endTs;
startTs = taosGetTimestampUs(); startTs = taosGetTimestampUs();
...@@ -4388,76 +4394,54 @@ static void* syncWriteWithStb(void *sarg) { ...@@ -4388,76 +4394,54 @@ static void* syncWriteWithStb(void *sarg) {
if (delay < winfo->minDelay) winfo->minDelay = delay; if (delay < winfo->minDelay) winfo->minDelay = delay;
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
//winfo->avgDelay = (double)winfo->totalDelay / winfo->cntDelay;
} }
totalAffectedRows += affectedRows; winfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
//int64_t t2 = taosGetTimestampMs(); } else {
//printf("taosc insert sql return, Spent %.4f seconds \n", (double)(t2 - t1)/1000.0);
} else {
//int64_t t1 = taosGetTimestampMs();
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer); int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
//int64_t t2 = taosGetTimestampMs();
//printf("http insert sql return, Spent %ld ms \n", t2 - t1);
if (0 != retCode) { if (0 != retCode) {
printf("========restful return fail, threadID[%d]\n", winfo->threadID); printf("========restful return fail, threadID[%d]\n", winfo->threadID);
goto free_and_statistics_2; goto free_and_statistics_2;
} }
}
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
/*
if (loop_cnt) {
loop_cnt--;
if ((1 == loop_cnt) && (0 != nrecords_last_req)) {
nrecords_cur_req = nrecords_last_req;
} else if (0 == loop_cnt){
nrecords_cur_req = nrecords_no_last_req;
loop_cnt = loop_cnt_orig;
break;
}
} else {
break;
}
*/
} }
if (g_args.insert_interval) {
et = taosGetTimestampMs();
}
time_counter = tmp_time;
if (tID == winfo->end_table_id) { if (tblInserted >= superTblInfo->insertRows)
break;
} // num_of_DPT
if (tID == winfo->end_table_id) {
if (0 == strncasecmp( if (0 == strncasecmp(
superTblInfo->dataSource, "sample", strlen("sample"))) { superTblInfo->dataSource, "sample", strlen("sample"))) {
samplePos = sampleUsePos; samplePos = sampleUsePos;
} }
i = inserted;
time_counter = tmp_time;
}
} }
//printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i); //printf("========loop %d childTables duration:%"PRId64 "========inserted rows:%d\n", winfo->end_table_id - winfo->start_table_id, et - st, i);
} } // tID
free_and_statistics_2: free_and_statistics_2:
tmfree(buffer); tmfree(buffer);
tmfree(sampleDataBuf); tmfree(sampleDataBuf);
tmfclose(fp); tmfclose(fp);
winfo->totalRowsInserted = totalRowsInserted;
winfo->totalAffectedRows = totalAffectedRows;
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
totalRowsInserted, winfo->totalRowsInserted,
totalAffectedRows); winfo->totalAffectedRows);
return NULL; return NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册