diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index 65c7fcec4dc34e3866494b9be6047e9517077edf..7603591e0699b2413b2b12a5a93de7a7360e63c6 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -80,6 +80,7 @@ extern short tsNumOfVnodesPerCore; extern short tsNumOfTotalVnodes; extern short tsCheckHeaderFile; extern uint32_t tsPublicIpInt; +extern short tsAffectedRowsMod; extern int tsSessionsPerVnode; extern int tsAverageCacheBlocks; diff --git a/src/modules/http/src/restHandle.c b/src/modules/http/src/restHandle.c index a893d372a1f9b4d34b18ec20094f2a9ca155ad78..a3077008661590e9813b6f3ec91b6c15d9e42902 100644 --- a/src/modules/http/src/restHandle.c +++ b/src/modules/http/src/restHandle.c @@ -67,7 +67,10 @@ bool restProcessSqlRequest(HttpContext* pContext, int timestampFmt) { return false; } - // for async test + + /* + * for async test + * / /* if (httpCheckUsedbSql(sql)) { httpSendErrorResp(pContext, HTTP_NO_EXEC_USEDB); diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 7a4796873e97bf7c8e5205e7487494897e2146ef..7ebab90f0baed0b79936fced88c3a129cff6f170 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -911,6 +911,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int blockIter.nextKey = maxFileKey + 1; } else { // Case 3. need to search the block for slot and pos if (key == minKey || key == maxKey) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -939,6 +940,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } while (left < right); if (key == blockMinKey || key == blockMaxKey) { // duplicate key + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -955,6 +957,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (key == importHandle.pBlocks[blockIter.slot].keyFirst || key == importHandle.pBlocks[blockIter.slot].keyLast) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -976,6 +979,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC); assert(pos != 0); if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -1106,6 +1110,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) == KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), blockIter.pos)) { // duplicate key + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < @@ -1320,7 +1325,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int pImport->lastKey = lastKey; for (payloadIter = 0; payloadIter < rows; payloadIter++) { TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); - if (key == pObj->lastKey) continue; + if (key == pObj->lastKey) { + if (tsAffectedRowsMod) rowsImported++; + continue; + } if (key > pObj->lastKey) { // Just as insert pImport->slot = pInfo->currentSlot; pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints; @@ -1333,11 +1341,12 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int } if (pImport->firstKey != pImport->key) break; + if (tsAffectedRowsMod) rowsImported++; } } if (payloadIter == rows) { - pImport->importedRows = 0; + pImport->importedRows += rowsImported; code = 0; goto _exit; } @@ -1470,6 +1479,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int payloadIter++; } else { + if (tsAffectedRowsMod) rowsImported++; payloadIter++; continue; } diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 6a74f9de5ee12e5c92d52d9f210433d407d3c25f..66b037753100610ae82796e65fea6a6d2f9e5435 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -83,6 +83,12 @@ short tsCheckHeaderFile = 0; int tsSessionsPerVnode = 1000; int tsCacheBlockSize = 16384; // 256 columns int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS; +/** + * Change the meaning of affected rows: + * 0: affected rows not include those duplicate records + * 1: affected rows include those duplicate records + */ +short tsAffectedRowsMod = 0; int tsRowsInFileBlock = 4096; float tsFileBlockMinPercent = 0.05; @@ -535,6 +541,9 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER, 0, 2, 0, TSDB_CFG_UTYPE_NONE); + tsInitConfigOption(cfg++, "affectedRowsMod", &tsAffectedRowsMod, TSDB_CFG_VTYPE_SHORT, + TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT, + 0, 1, 0, TSDB_CFG_UTYPE_NONE); // 0-any, 1-mgmt, 2-dnode // timer