提交 7f2116fb 编写于 作者: S Shuduo Sang

[TD-3192] <feature>: support stb limit and offset. refactor.

上级 405814f8
...@@ -262,7 +262,7 @@ typedef struct SSuperTable_S { ...@@ -262,7 +262,7 @@ typedef struct SSuperTable_S {
int tagUsePos; int tagUsePos;
// statistics // statistics
int64_t totalRowsInserted; int64_t totalInsertRows;
int64_t totalAffectedRows; int64_t totalAffectedRows;
} SSuperTable; } SSuperTable;
...@@ -332,7 +332,7 @@ typedef struct SDbs_S { ...@@ -332,7 +332,7 @@ typedef struct SDbs_S {
SDataBase db[MAX_DB_COUNT]; SDataBase db[MAX_DB_COUNT];
// statistics // statistics
int64_t totalRowsInserted; int64_t totalInsertRows;
int64_t totalAffectedRows; int64_t totalAffectedRows;
} SDbs; } SDbs;
...@@ -403,7 +403,7 @@ typedef struct SThreadInfo_S { ...@@ -403,7 +403,7 @@ typedef struct SThreadInfo_S {
int64_t lastTs; int64_t lastTs;
// statistics // statistics
int64_t totalRowsInserted; int64_t totalInsertRows;
int64_t totalAffectedRows; int64_t totalAffectedRows;
// insert delay statistics // insert delay statistics
...@@ -3986,8 +3986,6 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -3986,8 +3986,6 @@ static void syncWriteForNumberOfTblInOneSql(
int samplePos = 0; int samplePos = 0;
//printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id); //printf("========threadID[%d], table rang: %d - %d \n", winfo->threadID, winfo->start_table_id, winfo->end_table_id);
int64_t totalRowsInserted = 0;
int64_t totalAffectedRows = 0;
int64_t lastPrintTime = taosGetTimestampMs(); int64_t lastPrintTime = taosGetTimestampMs();
char* buffer = calloc(superTblInfo->maxSqlLen+1, 1); char* buffer = calloc(superTblInfo->maxSqlLen+1, 1);
...@@ -4128,7 +4126,7 @@ static void syncWriteForNumberOfTblInOneSql( ...@@ -4128,7 +4126,7 @@ static void syncWriteForNumberOfTblInOneSql(
len += retLen; len += retLen;
//inserted++; //inserted++;
j++; j++;
totalRowsInserted++; winfo->totalInsertRows++;
if (inserted >= superTblInfo->insertRows || if (inserted >= superTblInfo->insertRows ||
(superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) { (superTblInfo->maxSqlLen - len) < (superTblInfo->lenOfOneRow + 128)) {
...@@ -4185,7 +4183,7 @@ send_to_server: ...@@ -4185,7 +4183,7 @@ send_to_server:
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
winfo->totalRowsInserted, winfo->totalInsertRows,
winfo->totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
...@@ -4223,9 +4221,7 @@ send_to_server: ...@@ -4223,9 +4221,7 @@ send_to_server:
free_and_statistics: free_and_statistics:
tmfree(buffer); tmfree(buffer);
winfo->totalRowsInserted = totalRowsInserted; printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, winfo->totalInsertRows, winfo->totalAffectedRows);
winfo->totalAffectedRows = totalAffectedRows;
printf("====thread[%d] completed total inserted rows: %"PRId64 ", affected rows: %"PRId64 "====\n", winfo->threadID, totalRowsInserted, totalAffectedRows);
return; return;
} }
...@@ -4315,6 +4311,39 @@ static int prepareSampleData(SSuperTable *superTblInfo) { ...@@ -4315,6 +4311,39 @@ static int prepareSampleData(SSuperTable *superTblInfo) {
return 0; return 0;
} }
static int execInsert(threadInfo *winfo, char *buffer, int k)
{
int affectedRows;
SSuperTable* superTblInfo = winfo->superTblInfo;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) {
affectedRows = -1;
printf("========restful return fail, threadID[%d]\n", winfo->threadID);
} else {
affectedRows = k;
}
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, 1);
}
if (0 > affectedRows){
return affectedRows;
}
return affectedRows;
}
// sync insertion // sync insertion
/* /*
1 thread: 100 tables * 2000 rows/s 1 thread: 100 tables * 2000 rows/s
...@@ -4324,7 +4353,6 @@ static int prepareSampleData(SSuperTable *superTblInfo) { ...@@ -4324,7 +4353,6 @@ static int prepareSampleData(SSuperTable *superTblInfo) {
2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s 2 taosinsertdata , 1 thread: 10 tables * 20000 rows/s
*/ */
static void* syncWrite(void *sarg) { static void* syncWrite(void *sarg) {
uint64_t lastPrintTime = taosGetTimestampMs();
threadInfo *winfo = (threadInfo *)sarg; threadInfo *winfo = (threadInfo *)sarg;
SSuperTable* superTblInfo = winfo->superTblInfo; SSuperTable* superTblInfo = winfo->superTblInfo;
...@@ -4360,11 +4388,15 @@ static void* syncWrite(void *sarg) { ...@@ -4360,11 +4388,15 @@ static void* syncWrite(void *sarg) {
return NULL; return NULL;
} }
int64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampUs();
int64_t endTs;
int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval; int insert_interval = superTblInfo?superTblInfo->insertInterval:g_args.insert_interval;
uint64_t st = 0; uint64_t st = 0;
uint64_t et = 0xffffffff; uint64_t et = 0xffffffff;
winfo->totalRowsInserted = 0; winfo->totalInsertRows = 0;
winfo->totalAffectedRows = 0; winfo->totalAffectedRows = 0;
int sampleUsePos; int sampleUsePos;
...@@ -4383,12 +4415,12 @@ static void* syncWrite(void *sarg) { ...@@ -4383,12 +4415,12 @@ static void* syncWrite(void *sarg) {
for (int64_t i = 0; i < insertRows;) { for (int64_t i = 0; i < insertRows;) {
int64_t prepared = i; int64_t prepared = i;
sampleUsePos = samplePos;
if (insert_interval) { if (insert_interval) {
st = taosGetTimestampUs(); st = taosGetTimestampUs();
} }
sampleUsePos = samplePos;
memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len); memset(buffer, 0, superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len);
char *pstr = buffer; char *pstr = buffer;
...@@ -4518,35 +4550,12 @@ static void* syncWrite(void *sarg) { ...@@ -4518,35 +4550,12 @@ static void* syncWrite(void *sarg) {
break; break;
} }
winfo->totalRowsInserted += k; int affectedRows = execInsert(winfo, buffer, k);
if (affectedRows < 0)
int64_t startTs = taosGetTimestampUs(); goto free_and_statistics_2;
int64_t endTs;
int affectedRows;
if (superTblInfo) {
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", strlen("taosc"))) {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, INSERT_TYPE);
if (0 > affectedRows){
goto free_and_statistics_2;
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
int retCode = postProceSql(g_Dbs.host, g_Dbs.port, buffer);
if (0 != retCode) { winfo->totalInsertRows += k;
printf("========restful return fail, threadID[%d]\n", winfo->threadID); winfo->totalAffectedRows += affectedRows;
goto free_and_statistics_2;
}
affectedRows = k;
}
} else {
verbosePrint("%s() LN%d %s\n", __func__, __LINE__, buffer);
affectedRows = queryDbExec(winfo->taos, buffer, 1);
}
endTs = taosGetTimestampUs(); endTs = taosGetTimestampUs();
int64_t delay = endTs - startTs; int64_t delay = endTs - startTs;
...@@ -4555,13 +4564,11 @@ static void* syncWrite(void *sarg) { ...@@ -4555,13 +4564,11 @@ static void* syncWrite(void *sarg) {
winfo->cntDelay++; winfo->cntDelay++;
winfo->totalDelay += delay; winfo->totalDelay += delay;
winfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n",
winfo->threadID, winfo->threadID,
winfo->totalRowsInserted, winfo->totalInsertRows,
winfo->totalAffectedRows); winfo->totalAffectedRows);
lastPrintTime = currentPrintTime; lastPrintTime = currentPrintTime;
} }
...@@ -4572,14 +4579,12 @@ static void* syncWrite(void *sarg) { ...@@ -4572,14 +4579,12 @@ static void* syncWrite(void *sarg) {
if (insert_interval) { if (insert_interval) {
et = taosGetTimestampUs(); et = taosGetTimestampUs();
printf("et: %ld ms st: %ld\n", et, st);
if (insert_interval > ((et - st)/1000) ) { if (insert_interval > ((et - st)/1000) ) {
int sleep_time = insert_interval - (et -st)/1000; int sleep_time = insert_interval - (et -st)/1000;
printf("sleep: %d ms for insert interval\n", sleep_time); printf("sleep: %d ms for insert interval\n", sleep_time);
taosMsleep(sleep_time); // ms taosMsleep(sleep_time); // ms
} }
} }
} // num_of_DPT } // num_of_DPT
if ((tID == winfo->end_table_id) && superTblInfo && if ((tID == winfo->end_table_id) && superTblInfo &&
...@@ -4596,7 +4601,7 @@ free_and_statistics_2: ...@@ -4596,7 +4601,7 @@ free_and_statistics_2:
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n", printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
winfo->threadID, winfo->threadID,
winfo->totalRowsInserted, winfo->totalInsertRows,
winfo->totalAffectedRows); winfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -4806,7 +4811,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4806,7 +4811,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (superTblInfo) { if (superTblInfo) {
superTblInfo->totalAffectedRows += t_info->totalAffectedRows; superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
superTblInfo->totalRowsInserted += t_info->totalRowsInserted; superTblInfo->totalInsertRows += t_info->totalInsertRows;
} }
totalDelay += t_info->totalDelay; totalDelay += t_info->totalDelay;
...@@ -4824,16 +4829,16 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -4824,16 +4829,16 @@ static void startMultiThreadInsertData(int threads, char* db_name,
if (superTblInfo) { if (superTblInfo) {
printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted, t, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t); superTblInfo->totalInsertRows / t);
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", "Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n",
t, superTblInfo->totalRowsInserted, t, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
superTblInfo->totalRowsInserted / t); superTblInfo->totalInsertRows/ t);
} }
printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n", printf("insert delay, avg: %10.6fms, max: %10.6fms, min: %10.6fms\n\n",
...@@ -5067,14 +5072,14 @@ static int insertTestProcess() { ...@@ -5067,14 +5072,14 @@ static int insertTestProcess() {
} }
//end = getCurrentTime(); //end = getCurrentTime();
//int64_t totalRowsInserted = 0; //int64_t totalInsertRows = 0;
//int64_t totalAffectedRows = 0; //int64_t totalAffectedRows = 0;
//for (int i = 0; i < g_Dbs.dbCount; i++) { //for (int i = 0; i < g_Dbs.dbCount; i++) {
// for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) { // for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
// totalRowsInserted += g_Dbs.db[i].superTbls[j].totalRowsInserted; // totalInsertRows+= g_Dbs.db[i].superTbls[j].totalInsertRows;
// totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows; // totalAffectedRows += g_Dbs.db[i].superTbls[j].totalAffectedRows;
//} //}
//printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalRowsInserted, totalAffectedRows, g_Dbs.threadCount); //printf("Spent %.4f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s)\n\n", end - start, totalInsertRows, totalAffectedRows, g_Dbs.threadCount);
postFreeResource(); postFreeResource();
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册