提交 26802eb7 编写于 作者: H Hongze Cheng

add feature #1180

上级 f8ff7091
...@@ -80,6 +80,7 @@ extern short tsNumOfTotalVnodes; ...@@ -80,6 +80,7 @@ extern short tsNumOfTotalVnodes;
extern short tsCheckHeaderFile; extern short tsCheckHeaderFile;
extern uint32_t tsServerIp; extern uint32_t tsServerIp;
extern uint32_t tsPublicIpInt; extern uint32_t tsPublicIpInt;
extern short tsAffectedRowsMod;
extern int tsSessionsPerVnode; extern int tsSessionsPerVnode;
extern int tsAverageCacheBlocks; extern int tsAverageCacheBlocks;
......
...@@ -911,6 +911,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int ...@@ -911,6 +911,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
blockIter.nextKey = maxFileKey + 1; blockIter.nextKey = maxFileKey + 1;
} else { // Case 3. need to search the block for slot and pos } else { // Case 3. need to search the block for slot and pos
if (key == minKey || key == maxKey) { if (key == minKey || key == maxKey) {
if (tsAffectedRowsMod) pointsImported++;
payloadIter++; payloadIter++;
continue; continue;
} }
...@@ -939,6 +940,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int ...@@ -939,6 +940,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
} while (left < right); } while (left < right);
if (key == blockMinKey || key == blockMaxKey) { // duplicate key if (key == blockMinKey || key == blockMaxKey) { // duplicate key
if (tsAffectedRowsMod) pointsImported++;
payloadIter++; payloadIter++;
continue; continue;
} }
...@@ -955,6 +957,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int ...@@ -955,6 +957,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (key == importHandle.pBlocks[blockIter.slot].keyFirst || if (key == importHandle.pBlocks[blockIter.slot].keyFirst ||
key == importHandle.pBlocks[blockIter.slot].keyLast) { key == importHandle.pBlocks[blockIter.slot].keyLast) {
if (tsAffectedRowsMod) pointsImported++;
payloadIter++; payloadIter++;
continue; continue;
} }
...@@ -976,6 +979,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int ...@@ -976,6 +979,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC); importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
assert(pos != 0); assert(pos != 0);
if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) { if (KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), pos) == key) {
if (tsAffectedRowsMod) pointsImported++;
payloadIter++; payloadIter++;
continue; continue;
} }
...@@ -1106,6 +1110,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int ...@@ -1106,6 +1110,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) == if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) ==
KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY), KEY_AT_INDEX(importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY),
blockIter.pos)) { // duplicate key blockIter.pos)) { // duplicate key
if (tsAffectedRowsMod) pointsImported++;
payloadIter++; payloadIter++;
continue; continue;
} else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) < } else if (KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter) <
...@@ -1320,7 +1325,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int ...@@ -1320,7 +1325,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
pImport->lastKey = lastKey; pImport->lastKey = lastKey;
for (payloadIter = 0; payloadIter < rows; payloadIter++) { for (payloadIter = 0; payloadIter < rows; payloadIter++) {
TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter); TSKEY key = KEY_AT_INDEX(payload, pObj->bytesPerPoint, payloadIter);
if (key == pObj->lastKey) continue; if (key == pObj->lastKey) {
if (tsAffectedRowsMod) rowsImported++;
continue;
}
if (key > pObj->lastKey) { // Just as insert if (key > pObj->lastKey) { // Just as insert
pImport->slot = pInfo->currentSlot; pImport->slot = pInfo->currentSlot;
pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints; pImport->pos = pInfo->cacheBlocks[pImport->slot]->numOfPoints;
...@@ -1333,11 +1341,12 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int ...@@ -1333,11 +1341,12 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
} }
if (pImport->firstKey != pImport->key) break; if (pImport->firstKey != pImport->key) break;
if (tsAffectedRowsMod) rowsImported++;
} }
} }
if (payloadIter == rows) { if (payloadIter == rows) {
pImport->importedRows = 0; pImport->importedRows += rowsImported;
code = 0; code = 0;
goto _exit; goto _exit;
} }
...@@ -1470,6 +1479,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int ...@@ -1470,6 +1479,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
payloadIter++; payloadIter++;
} else { } else {
if (tsAffectedRowsMod) rowsImported++;
payloadIter++; payloadIter++;
continue; continue;
} }
......
...@@ -87,6 +87,12 @@ short tsCheckHeaderFile = 0; ...@@ -87,6 +87,12 @@ short tsCheckHeaderFile = 0;
int tsSessionsPerVnode = 1000; int tsSessionsPerVnode = 1000;
int tsCacheBlockSize = 16384; // 256 columns int tsCacheBlockSize = 16384; // 256 columns
int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS; int tsAverageCacheBlocks = TSDB_DEFAULT_AVG_BLOCKS;
/**
* Change the meaning of affected rows:
* 0: affected rows not include those duplicate records
* 1: affected rows include those duplicate records
*/
short tsAffectedRowsMod = 0;
int tsRowsInFileBlock = 4096; int tsRowsInFileBlock = 4096;
float tsFileBlockMinPercent = 0.05; float tsFileBlockMinPercent = 0.05;
...@@ -539,6 +545,9 @@ static void doInitGlobalConfig() { ...@@ -539,6 +545,9 @@ static void doInitGlobalConfig() {
tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT, tsInitConfigOption(cfg++, "alternativeRole", &tsAlternativeRole, TSDB_CFG_VTYPE_INT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER, TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_CLUSTER,
0, 2, 0, TSDB_CFG_UTYPE_NONE); 0, 2, 0, TSDB_CFG_UTYPE_NONE);
tsInitConfigOption(cfg++, "affectedRowsMod", &tsAffectedRowsMod, TSDB_CFG_VTYPE_SHORT,
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_LOG | TSDB_CFG_CTYPE_B_CLIENT,
0, 1, 0, TSDB_CFG_UTYPE_NONE);
// 0-any, 1-mgmt, 2-dnode // 0-any, 1-mgmt, 2-dnode
// timer // timer
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册