提交 d30a7df7 编写于 作者: L liuyq-617

Merge branch 'master' into test/TD-4059

...@@ -7,27 +7,19 @@ platform: ...@@ -7,27 +7,19 @@ platform:
arch: amd64 arch: amd64
steps: steps:
- name: build
image: gcc
commands:
- apt-get update
- apt-get install -y cmake build-essential
- mkdir debug
- cd debug
- cmake ..
- make
when:
branch:
- develop
- master
- name: smoke_test - name: smoke_test
image: python:3.8 image: python:3.8
commands: commands:
- apt-get update
- apt-get install -y cmake build-essential gcc
- pip3 install psutil - pip3 install psutil
- pip3 install guppy3 - pip3 install guppy3
- pip3 install src/connector/python/linux/python3/ - pip3 install src/connector/python/linux/python3/
- cd tests - mkdir debug
- cd debug
- cmake ..
- make
- cd ../tests
- ./test-all.sh smoke - ./test-all.sh smoke
when: when:
branch: branch:
......
...@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) { ...@@ -4672,7 +4672,7 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
return 0; return 0;
} }
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, int k) static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
{ {
int affectedRows; int affectedRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -4744,7 +4744,7 @@ static int64_t generateDataTail( ...@@ -4744,7 +4744,7 @@ static int64_t generateDataTail(
verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch); verbosePrint("%s() LN%d batch=%"PRIu64"\n", __func__, __LINE__, batch);
uint64_t k = 0; int64_t k = 0;
for (k = 0; k < batch;) { for (k = 0; k < batch;) {
char data[MAX_DATA_SIZE]; char data[MAX_DATA_SIZE];
memset(data, 0, MAX_DATA_SIZE); memset(data, 0, MAX_DATA_SIZE);
...@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer( ...@@ -4959,7 +4959,7 @@ static int64_t generateInterlaceDataBuffer(
return k; return k;
} }
static int generateProgressiveDataBuffer( static int64_t generateProgressiveDataBuffer(
char *tableName, char *tableName,
int64_t tableSeq, int64_t tableSeq,
threadInfo *pThreadInfo, char *buffer, threadInfo *pThreadInfo, char *buffer,
...@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer( ...@@ -5004,12 +5004,21 @@ static int generateProgressiveDataBuffer(
return k; return k;
} }
static void printStatPerThread(threadInfo *pThreadInfo)
{
fprintf(stderr, "====thread[%d] completed total inserted rows: %"PRIu64 ", total affected rows: %"PRIu64". %.2f records/second====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows,
(double)(pThreadInfo->totalAffectedRows / (pThreadInfo->totalDelay/1000.0)));
}
static void* syncWriteInterlace(threadInfo *pThreadInfo) { static void* syncWriteInterlace(threadInfo *pThreadInfo) {
debugPrint("[%d] %s() LN%d: ### interlace write\n", debugPrint("[%d] %s() LN%d: ### interlace write\n",
pThreadInfo->threadID, __func__, __LINE__); pThreadInfo->threadID, __func__, __LINE__);
int64_t insertRows; uint64_t insertRows;
int64_t interlaceRows; uint64_t interlaceRows;
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
...@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5078,9 +5087,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
assert(pThreadInfo->ntables > 0); assert(pThreadInfo->ntables > 0);
int64_t batchPerTbl = interlaceRows; uint64_t batchPerTbl = interlaceRows;
uint64_t batchPerTblTimes;
int64_t batchPerTblTimes;
if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) { if ((interlaceRows > 0) && (pThreadInfo->ntables > 1)) {
batchPerTblTimes = batchPerTblTimes =
g_args.num_of_RPR / interlaceRows; g_args.num_of_RPR / interlaceRows;
...@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5088,9 +5097,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
batchPerTblTimes = 1; batchPerTblTimes = 1;
} }
int64_t generatedRecPerTbl = 0; uint64_t generatedRecPerTbl = 0;
bool flagSleep = true; bool flagSleep = true;
int64_t sleepTimeTotal = 0; uint64_t sleepTimeTotal = 0;
char *strInsertInto = "insert into "; char *strInsertInto = "insert into ";
int nInsertBufLen = strlen(strInsertInto); int nInsertBufLen = strlen(strInsertInto);
...@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5110,9 +5119,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int64_t recOfBatch = 0; uint64_t recOfBatch = 0;
for (int64_t i = 0; i < batchPerTblTimes; i ++) { for (uint64_t i = 0; i < batchPerTblTimes; i ++) {
getTableName(tableName, pThreadInfo, tableSeq); getTableName(tableName, pThreadInfo, tableSeq);
if (0 == strlen(tableName)) { if (0 == strlen(tableName)) {
errorPrint("[%d] %s() LN%d, getTableName return null\n", errorPrint("[%d] %s() LN%d, getTableName return null\n",
...@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5130,10 +5139,12 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
startTime, startTime,
&remainderBufLen); &remainderBufLen);
debugPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated);
if (generated < 0) { if (generated < 0) {
debugPrint("[%d] %s() LN%d, generated data is %"PRId64"\n", errorPrint("[%d] %s() LN%d, generated records is %"PRId64"\n",
pThreadInfo->threadID, __func__, __LINE__, generated); pThreadInfo->threadID, __func__, __LINE__, generated);
goto free_and_statistics_interlace; goto free_of_interlace;
} else if (generated == 0) { } else if (generated == 0) {
break; break;
} }
...@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5177,7 +5188,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
break; break;
} }
verbosePrint("[%d] %s() LN%d recOfBatch=%"PRId64" totalInsertRows=%"PRId64"\n", verbosePrint("[%d] %s() LN%d recOfBatch=%"PRIu64" totalInsertRows=%"PRIu64"\n",
pThreadInfo->threadID, __func__, __LINE__, recOfBatch, pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
pThreadInfo->totalInsertRows); pThreadInfo->totalInsertRows);
verbosePrint("[%d] %s() LN%d, buffer=%s\n", verbosePrint("[%d] %s() LN%d, buffer=%s\n",
...@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5188,30 +5199,30 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch); int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRIu64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n", if (recOfBatch != affectedRows) {
pThreadInfo->threadID, errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
__func__, __LINE__, affectedRows);
if ((affectedRows < 0) || (recOfBatch != affectedRows)) {
errorPrint("[%d] %s() LN%d execInsert insert %"PRId64", affected rows: %"PRId64"\n%s\n",
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->threadID, __func__, __LINE__,
recOfBatch, affectedRows, buffer); recOfBatch, affectedRows, buffer);
goto free_and_statistics_interlace; goto free_of_interlace;
} }
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently inserted rows: %"PRId64 ", affected rows: %"PRId64 "\n", printf("thread[%d] has currently inserted rows: %"PRIu64 ", affected rows: %"PRIu64 "\n",
pThreadInfo->threadID, pThreadInfo->threadID,
pThreadInfo->totalInsertRows, pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows); pThreadInfo->totalAffectedRows);
...@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) { ...@@ -5231,13 +5242,9 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
} }
} }
free_and_statistics_interlace: free_of_interlace:
tmfree(buffer); tmfree(buffer);
printStatPerThread(pThreadInfo);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5253,19 +5260,19 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__); debugPrint("%s() LN%d: ### progressive write\n", __func__, __LINE__);
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
int maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len; uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
char* buffer = calloc(maxSqlLen, 1); char* buffer = calloc(maxSqlLen, 1);
if (NULL == buffer) { if (NULL == buffer) {
errorPrint( "Failed to alloc %d Bytes, reason:%s\n", errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
maxSqlLen, maxSqlLen,
strerror(errno)); strerror(errno));
return NULL; return NULL;
} }
int64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
int64_t endTs; uint64_t endTs;
int64_t timeStampStep = int64_t timeStampStep =
superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP; superTblInfo?superTblInfo->timeStampStep:DEFAULT_TIMESTAMP_STEP;
...@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5280,15 +5287,15 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pThreadInfo->samplePos = 0; pThreadInfo->samplePos = 0;
for (int64_t tableSeq = for (uint64_t tableSeq =
pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to; pThreadInfo->start_table_from; tableSeq <= pThreadInfo->end_table_to;
tableSeq ++) { tableSeq ++) {
int64_t start_time = pThreadInfo->start_time; int64_t start_time = pThreadInfo->start_time;
int64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT; uint64_t insertRows = (superTblInfo)?superTblInfo->insertRows:g_args.num_of_DPT;
verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows); verbosePrint("%s() LN%d insertRows=%"PRId64"\n", __func__, __LINE__, insertRows);
for (int64_t i = 0; i < insertRows;) { for (uint64_t i = 0; i < insertRows;) {
/* /*
if (insert_interval) { if (insert_interval) {
st = taosGetTimestampMs(); st = taosGetTimestampMs();
...@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5310,7 +5317,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
pstr += len; pstr += len;
remainderBufLen -= len; remainderBufLen -= len;
int generated = generateProgressiveDataBuffer( int64_t generated = generateProgressiveDataBuffer(
tableName, tableSeq, pThreadInfo, pstr, insertRows, tableName, tableSeq, pThreadInfo, pstr, insertRows,
i, start_time, i, start_time,
&(pThreadInfo->samplePos), &(pThreadInfo->samplePos),
...@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5318,7 +5325,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
if (generated > 0) if (generated > 0)
i += generated; i += generated;
else else
goto free_and_statistics_2; goto free_of_progressive;
start_time += generated * timeStampStep; start_time += generated * timeStampStep;
pThreadInfo->totalInsertRows += generated; pThreadInfo->totalInsertRows += generated;
...@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5328,17 +5335,23 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated); int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
endTs = taosGetTimestampMs(); endTs = taosGetTimestampMs();
int64_t delay = endTs - startTs; uint64_t delay = endTs - startTs;
performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n", performancePrint("%s() LN%d, insert execution time is %"PRId64"ms\n",
__func__, __LINE__, delay); __func__, __LINE__, delay);
verbosePrint("[%d] %s() LN%d affectedRows=%"PRId64"\n",
pThreadInfo->threadID,
__func__, __LINE__, affectedRows);
if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay; if (delay > pThreadInfo->maxDelay) pThreadInfo->maxDelay = delay;
if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay; if (delay < pThreadInfo->minDelay) pThreadInfo->minDelay = delay;
pThreadInfo->cntDelay++; pThreadInfo->cntDelay++;
pThreadInfo->totalDelay += delay; pThreadInfo->totalDelay += delay;
if (affectedRows < 0) if (affectedRows < 0) {
goto free_and_statistics_2; errorPrint("%s() LN%d, affected rows: %"PRId64"\n",
__func__, __LINE__, affectedRows);
goto free_of_progressive;
}
pThreadInfo->totalAffectedRows += affectedRows; pThreadInfo->totalAffectedRows += affectedRows;
...@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) { ...@@ -5377,13 +5390,9 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
} }
} // tableSeq } // tableSeq
free_and_statistics_2: free_of_progressive:
tmfree(buffer); tmfree(buffer);
printStatPerThread(pThreadInfo);
printf("====thread[%d] completed total inserted rows: %"PRId64 ", total affected rows: %"PRId64 "====\n",
pThreadInfo->threadID,
pThreadInfo->totalInsertRows,
pThreadInfo->totalAffectedRows);
return NULL; return NULL;
} }
...@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) { ...@@ -5412,6 +5421,7 @@ static void* syncWrite(void *sarg) {
// progressive mode // progressive mode
return syncWriteProgressive(pThreadInfo); return syncWriteProgressive(pThreadInfo);
} }
} }
static void callBack(void *param, TAOS_RES *res, int code) { static void callBack(void *param, TAOS_RES *res, int code) {
...@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5737,10 +5747,10 @@ static void startMultiThreadInsertData(int threads, char* db_name,
pthread_join(pids[i], NULL); pthread_join(pids[i], NULL);
} }
int64_t totalDelay = 0; uint64_t totalDelay = 0;
int64_t maxDelay = 0; uint64_t maxDelay = 0;
int64_t minDelay = UINT64_MAX; uint64_t minDelay = UINT64_MAX;
int64_t cntDelay = 1; uint64_t cntDelay = 1;
double avgDelay = 0; double avgDelay = 0;
for (int i = 0; i < threads; i++) { for (int i = 0; i < threads; i++) {
...@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5749,7 +5759,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
tsem_destroy(&(t_info->lock_sem)); tsem_destroy(&(t_info->lock_sem));
taos_close(t_info->taos); taos_close(t_info->taos);
debugPrint("%s() LN%d, [%d] totalInsert=%"PRId64" totalAffected=%"PRId64"\n", debugPrint("%s() LN%d, [%d] totalInsert=%"PRIu64" totalAffected=%"PRIu64"\n",
__func__, __LINE__, __func__, __LINE__,
t_info->threadID, t_info->totalInsertRows, t_info->threadID, t_info->totalInsertRows,
t_info->totalAffectedRows); t_info->totalAffectedRows);
...@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name, ...@@ -5775,35 +5785,42 @@ static void startMultiThreadInsertData(int threads, char* db_name,
int64_t t = end - start; int64_t t = end - start;
if (superTblInfo) { if (superTblInfo) {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (double)superTblInfo->totalInsertRows / (t / 1000.0));
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s.%s. %2.f records/second\n\n", "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s.%s. %.2f records/second\n\n",
t / 1000.0, superTblInfo->totalInsertRows, t / 1000.0, superTblInfo->totalInsertRows,
superTblInfo->totalAffectedRows, superTblInfo->totalAffectedRows,
threads, db_name, superTblInfo->sTblName, threads, db_name, superTblInfo->sTblName,
(double)superTblInfo->totalInsertRows / (t / 1000.0)); (double)superTblInfo->totalInsertRows / (t / 1000.0));
}
} else { } else {
printf("Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", fprintf(stderr, "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t / 1000.0, g_args.totalInsertRows, t / 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (double)g_args.totalInsertRows / (t / 1000.0));
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.2f seconds to insert rows: %"PRId64", affected rows: %"PRId64" with %d thread(s) into %s %2.f records/second\n\n", "Spent %.2f seconds to insert rows: %"PRIu64", affected rows: %"PRIu64" with %d thread(s) into %s %.2f records/second\n\n",
t * 1000.0, g_args.totalInsertRows, t * 1000.0, g_args.totalInsertRows,
g_args.totalAffectedRows, g_args.totalAffectedRows,
threads, db_name, threads, db_name,
(double)g_args.totalInsertRows / (t / 1000.0)); (double)g_args.totalInsertRows / (t / 1000.0));
} }
}
printf("insert delay, avg: %10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n", fprintf(stderr, "insert delay, avg: %10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay); avgDelay, maxDelay, minDelay);
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRId64"ms, min: %"PRId64"ms\n\n", if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, "insert delay, avg:%10.2fms, max: %"PRIu64"ms, min: %"PRIu64"ms\n\n",
avgDelay, maxDelay, minDelay); avgDelay, maxDelay, minDelay);
}
//taos_close(taos); //taos_close(taos);
...@@ -5973,6 +5990,7 @@ static int insertTestProcess() { ...@@ -5973,6 +5990,7 @@ static int insertTestProcess() {
return -1; return -1;
} }
if (g_fpOfInsertResult)
printfInsertMetaToFile(g_fpOfInsertResult); printfInsertMetaToFile(g_fpOfInsertResult);
if (!g_args.answer_yes) { if (!g_args.answer_yes) {
...@@ -5984,6 +6002,7 @@ static int insertTestProcess() { ...@@ -5984,6 +6002,7 @@ static int insertTestProcess() {
// create database and super tables // create database and super tables
if(createDatabasesAndStables() != 0) { if(createDatabasesAndStables() != 0) {
if (g_fpOfInsertResult)
fclose(g_fpOfInsertResult); fclose(g_fpOfInsertResult);
return -1; return -1;
} }
...@@ -6000,12 +6019,14 @@ static int insertTestProcess() { ...@@ -6000,12 +6019,14 @@ static int insertTestProcess() {
end = taosGetTimestampMs(); end = taosGetTimestampMs();
if (g_totalChildTables > 0) { if (g_totalChildTables > 0) {
printf("Spent %.4f seconds to create %d tables with %d thread(s)\n\n", fprintf(stderr, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
if (g_fpOfInsertResult) {
fprintf(g_fpOfInsertResult, fprintf(g_fpOfInsertResult,
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n", "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl); (end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
} }
}
taosMsleep(1000); taosMsleep(1000);
// create sub threads for inserting data // create sub threads for inserting data
...@@ -6077,12 +6098,12 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6077,12 +6098,12 @@ static void *specifiedTableQuery(void *sarg) {
return NULL; return NULL;
} }
int64_t st = 0; uint64_t st = 0;
int64_t et = 0; uint64_t et = 0;
int queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes; uint64_t queryTimes = g_queryInfo.specifiedQueryInfo.queryTimes;
int totalQueried = 0; uint64_t totalQueried = 0;
uint64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
uint64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
...@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) { ...@@ -6135,7 +6156,7 @@ static void *specifiedTableQuery(void *sarg) {
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n", debugPrint("%s() LN%d, endTs=%"PRIu64"ms, startTs=%"PRIu64"ms\n",
__func__, __LINE__, endTs, startTs); __func__, __LINE__, endTs, startTs);
printf("thread[%d] has currently completed queries: %d, QPS: %10.6f\n", printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.6f\n",
pThreadInfo->threadID, pThreadInfo->threadID,
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
...@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) { ...@@ -6187,14 +6208,14 @@ static void *superTableQuery(void *sarg) {
} }
} }
int64_t st = 0; uint64_t st = 0;
int64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval; uint64_t et = (int64_t)g_queryInfo.superQueryInfo.queryInterval;
int queryTimes = g_queryInfo.superQueryInfo.queryTimes; uint64_t queryTimes = g_queryInfo.superQueryInfo.queryTimes;
int totalQueried = 0; uint64_t totalQueried = 0;
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
int64_t lastPrintTime = taosGetTimestampMs(); uint64_t lastPrintTime = taosGetTimestampMs();
while(queryTimes --) { while(queryTimes --) {
if (g_queryInfo.superQueryInfo.queryInterval if (g_queryInfo.superQueryInfo.queryInterval
&& (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) { && (et - st) < (int64_t)g_queryInfo.superQueryInfo.queryInterval) {
...@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) { ...@@ -6221,7 +6242,7 @@ static void *superTableQuery(void *sarg) {
int64_t currentPrintTime = taosGetTimestampMs(); int64_t currentPrintTime = taosGetTimestampMs();
int64_t endTs = taosGetTimestampMs(); int64_t endTs = taosGetTimestampMs();
if (currentPrintTime - lastPrintTime > 30*1000) { if (currentPrintTime - lastPrintTime > 30*1000) {
printf("thread[%d] has currently completed queries: %d, QPS: %10.3f\n", printf("thread[%d] has currently completed queries: %"PRIu64", QPS: %10.3f\n",
pThreadInfo->threadID, pThreadInfo->threadID,
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
...@@ -6285,7 +6306,7 @@ static int queryTestProcess() { ...@@ -6285,7 +6306,7 @@ static int queryTestProcess() {
int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent; int nConcurrent = g_queryInfo.specifiedQueryInfo.concurrent;
int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount; int nSqlCount = g_queryInfo.specifiedQueryInfo.sqlCount;
int64_t startTs = taosGetTimestampMs(); uint64_t startTs = taosGetTimestampMs();
if ((nSqlCount > 0) && (nConcurrent > 0)) { if ((nSqlCount > 0) && (nConcurrent > 0)) {
...@@ -6345,16 +6366,16 @@ static int queryTestProcess() { ...@@ -6345,16 +6366,16 @@ static int queryTestProcess() {
ERROR_EXIT("memory allocation failed for create threads\n"); ERROR_EXIT("memory allocation failed for create threads\n");
} }
int ntables = g_queryInfo.superQueryInfo.childTblCount; uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
int threads = g_queryInfo.superQueryInfo.threadCnt; int threads = g_queryInfo.superQueryInfo.threadCnt;
int a = ntables / threads; uint64_t a = ntables / threads;
if (a < 1) { if (a < 1) {
threads = ntables; threads = ntables;
a = 1; a = 1;
} }
int b = 0; uint64_t b = 0;
if (threads != 0) { if (threads != 0) {
b = ntables % threads; b = ntables % threads;
} }
...@@ -6396,12 +6417,12 @@ static int queryTestProcess() { ...@@ -6396,12 +6417,12 @@ static int queryTestProcess() {
tmfree((char*)infosOfSub); tmfree((char*)infosOfSub);
// taos_close(taos);// TODO: workaround to use separate taos connection; // taos_close(taos);// TODO: workaround to use separate taos connection;
int64_t endTs = taosGetTimestampMs(); uint64_t endTs = taosGetTimestampMs();
int totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried + uint64_t totalQueried = g_queryInfo.specifiedQueryInfo.totalQueried +
g_queryInfo.superQueryInfo.totalQueried; g_queryInfo.superQueryInfo.totalQueried;
printf("==== completed total queries: %d, the QPS of all threads: %10.3f====\n", fprintf(stderr, "==== completed total queries: %"PRIu64", the QPS of all threads: %10.3f====\n",
totalQueried, totalQueried,
(double)(totalQueried/((endTs-startTs)/1000.0))); (double)(totalQueried/((endTs-startTs)/1000.0)));
return 0; return 0;
......
...@@ -72,6 +72,7 @@ enum _show_db_index { ...@@ -72,6 +72,7 @@ enum _show_db_index {
TSDB_SHOW_DB_WALLEVEL_INDEX, TSDB_SHOW_DB_WALLEVEL_INDEX,
TSDB_SHOW_DB_FSYNC_INDEX, TSDB_SHOW_DB_FSYNC_INDEX,
TSDB_SHOW_DB_COMP_INDEX, TSDB_SHOW_DB_COMP_INDEX,
TSDB_SHOW_DB_CACHELAST_INDEX,
TSDB_SHOW_DB_PRECISION_INDEX, TSDB_SHOW_DB_PRECISION_INDEX,
TSDB_SHOW_DB_UPDATE_INDEX, TSDB_SHOW_DB_UPDATE_INDEX,
TSDB_SHOW_DB_STATUS_INDEX, TSDB_SHOW_DB_STATUS_INDEX,
...@@ -134,6 +135,7 @@ typedef struct { ...@@ -134,6 +135,7 @@ typedef struct {
int8_t wallevel; int8_t wallevel;
int32_t fsync; int32_t fsync;
int8_t comp; int8_t comp;
int8_t cachelast;
char precision[8]; // time resolution char precision[8]; // time resolution
int8_t update; int8_t update;
char status[16]; char status[16];
...@@ -976,6 +978,7 @@ int taosDumpOut(struct arguments *arguments) { ...@@ -976,6 +978,7 @@ int taosDumpOut(struct arguments *arguments) {
dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]); dbInfos[count]->wallevel = *((int8_t *)row[TSDB_SHOW_DB_WALLEVEL_INDEX]);
dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]); dbInfos[count]->fsync = *((int32_t *)row[TSDB_SHOW_DB_FSYNC_INDEX]);
dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX])); dbInfos[count]->comp = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_COMP_INDEX]));
dbInfos[count]->cachelast = (int8_t)(*((int8_t *)row[TSDB_SHOW_DB_CACHELAST_INDEX]));
strncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes); strncpy(dbInfos[count]->precision, (char *)row[TSDB_SHOW_DB_PRECISION_INDEX], fields[TSDB_SHOW_DB_PRECISION_INDEX].bytes);
//dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]); //dbInfos[count]->precision = *((int8_t *)row[TSDB_SHOW_DB_PRECISION_INDEX]);
...@@ -1282,9 +1285,10 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) { ...@@ -1282,9 +1285,10 @@ void taosDumpCreateDbClause(SDbInfo *dbInfo, bool isDumpProperty, FILE *fp) {
pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name); pstr += sprintf(pstr, "CREATE DATABASE IF NOT EXISTS %s ", dbInfo->name);
if (isDumpProperty) { if (isDumpProperty) {
pstr += sprintf(pstr, pstr += sprintf(pstr,
"TABLES %d VGROUPS %d REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d WALLEVEL %d FYNC %d COMP %d PRECISION '%s' UPDATE %d", "REPLICA %d QUORUM %d DAYS %d KEEP %s CACHE %d BLOCKS %d MINROWS %d MAXROWS %d FSYNC %d CACHELAST %d COMP %d PRECISION '%s' UPDATE %d",
dbInfo->ntables, dbInfo->vgroups, dbInfo->replica, dbInfo->quorum, dbInfo->days, dbInfo->keeplist, dbInfo->cache, dbInfo->replica, dbInfo->quorum, dbInfo->days, dbInfo->keeplist, dbInfo->cache,
dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows, dbInfo->wallevel, dbInfo->fsync, dbInfo->comp, dbInfo->precision, dbInfo->update); dbInfo->blocks, dbInfo->minrows, dbInfo->maxrows, dbInfo->fsync, dbInfo->cachelast,
dbInfo->comp, dbInfo->precision, dbInfo->update);
} }
pstr += sprintf(pstr, ";"); pstr += sprintf(pstr, ";");
......
...@@ -171,7 +171,7 @@ typedef struct HttpThread { ...@@ -171,7 +171,7 @@ typedef struct HttpThread {
EpollFd pollFd; EpollFd pollFd;
int32_t numOfContexts; int32_t numOfContexts;
int32_t threadId; int32_t threadId;
char label[HTTP_LABEL_SIZE]; char label[HTTP_LABEL_SIZE << 1];
bool (*processData)(HttpContext *pContext); bool (*processData)(HttpContext *pContext);
} HttpThread; } HttpThread;
......
...@@ -539,7 +539,7 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) { ...@@ -539,7 +539,7 @@ static void taosNetTestServer(char *host, int32_t startPort, int32_t pkgLen) {
} }
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) { void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
// tscEmbedded = 1; tscEmbedded = 1;
if (host == NULL) host = tsLocalFqdn; if (host == NULL) host = tsLocalFqdn;
if (port == 0) port = tsServerPort; if (port == 0) port = tsServerPort;
if (pkgLen <= 10) pkgLen = 1000; if (pkgLen <= 10) pkgLen = 1000;
...@@ -550,6 +550,7 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) { ...@@ -550,6 +550,7 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
} else if (0 == strcmp("server", role)) { } else if (0 == strcmp("server", role)) {
taosNetTestServer(host, port, pkgLen); taosNetTestServer(host, port, pkgLen);
} else if (0 == strcmp("rpc", role)) { } else if (0 == strcmp("rpc", role)) {
tscEmbedded = 0;
taosNetTestRpc(host, port, pkgLen); taosNetTestRpc(host, port, pkgLen);
} else if (0 == strcmp("sync", role)) { } else if (0 == strcmp("sync", role)) {
taosNetCheckSync(host, port); taosNetCheckSync(host, port);
...@@ -559,5 +560,5 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) { ...@@ -559,5 +560,5 @@ void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen) {
taosNetTestStartup(host, port); taosNetTestStartup(host, port);
} }
// tscEmbedded = 0; tscEmbedded = 0;
} }
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册