diff --git a/src/inc/tglobalcfg.h b/src/inc/tglobalcfg.h index c4407f7d55487689c84e5484fbb72749a62834e2..d4c998af31ee9243430d7ae2ba175851c4e3e79d 100644 --- a/src/inc/tglobalcfg.h +++ b/src/inc/tglobalcfg.h @@ -80,6 +80,7 @@ extern short tsNumOfTotalVnodes; extern short tsCheckHeaderFile; extern uint32_t tsServerIp; extern uint32_t tsPublicIpInt; +extern short tsAffectedRowsMod; extern int tsSessionsPerVnode; extern int tsAverageCacheBlocks; diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index f7f01a3c69946238130746106f387d536eb0bd6a..1e6e5e6dc9b36bb9d3c759668e3ce1c7ce7173ac 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -911,6 +911,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int blockIter.nextKey = maxFileKey + 1; } else { // Case 3. need to search the block for slot and pos if (key == minKey || key == maxKey) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -939,6 +940,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } while (left < right); if (key == blockMinKey || key == blockMaxKey) { // duplicate key + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -955,6 +957,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (key == importHandle.pBlocks[blockIter.slot].keyFirst || key == importHandle.pBlocks[blockIter.slot].keyLast) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -976,6 +979,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC); assert(pos != 0); if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) { + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } @@ -1106,6 +1110,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) == KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), blockIter.pos)) { // duplicate key + if (tsAffectedRowsMod) pointsImported++; payloadIter++; continue; } else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < @@ -1320,7 +1325,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int pImport->lastKey = lastKey; for (payloadIter = 0; payloadIter < rows; payloadIter++) { TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); - if (key == pObj->lastKey) continue; + if (key == pObj->lastKey) { + if (tsAffectedRowsMod) rowsImported++; + continue; + } if (key > pObj->lastKey) { // Just as insert pImport->slot = pInfo->currentSlot; pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints; @@ -1333,11 +1341,12 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int } if (pImport->firstKey != pImport->key) break; + if (tsAffectedRowsMod) rowsImported++; } } if (payloadIter == rows) { - pImport->importedRows = 0; + pImport->importedRows += rowsImported; code = 0; goto _exit; } @@ -1470,6 +1479,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int payloadIter++; } else { + if (tsAffectedRowsMod) rowsImported++; payloadIter++; continue; } diff --git a/src/util/src/tglobalcfg.c b/src/util/src/tglobalcfg.c index 7e18e2e74b1922a6744349d8d9443c2858344a7f..d9d5b221a571081c3b464b663c7f6167424e550d 100644 --- a/src/util/src/tglobalcfg.c +++ b/src/util/src/tglobalcfg.c @@ -87,6 +87,12 @@ short tsCheckHeaderFile = 0; int tsSessionsPerVnode = 1000; int tsCacheBlockSize = 16384; // 256 columns int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS; +/** + * Change the meaning of affected rows: + * 0: affected rows not include those duplicate records + * 1: affected rows include those duplicate records + */ +short tsAffectedRowsMod = 0; int tsRowsInFileBlock = 4096; float tsFileBlockMinPercent = 0.05; @@ -539,6 +545,9 @@ static void doInitGlobalConfig() { tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER, 0, 2, 0, TSDB_CFG_UTYPE_NONE); + tsInitConfigOption(cfg++, "affectedRowsMod", &tsAffectedRowsMod, TSDB_CFG_VTYPE_SHORT, + TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT, + 0, 1, 0, TSDB_CFG_UTYPE_NONE); // 0-any, 1-mgmt, 2-dnode // timer