未验证 提交 b72865e5 编写于 作者: C Cary Xu 提交者: GitHub

Merge pull request #12254 from taosdata/feature/TD-14481-3.0

fix: merge data of mem and file
......@@ -879,14 +879,14 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order,
}
}
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow, TDRowVerT maxVer) {
static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow,
TDRowVerT maxVer) {
STSRow *rmem = NULL, *rimem = NULL;
if (pCheckInfo->iter) {
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
if (node != NULL) {
rmem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rmem) > maxVer) {
rmem = NULL;
}
......@@ -898,7 +898,7 @@ static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter);
if (node != NULL) {
rimem = (STSRow*)SL_GET_NODE_DATA(node);
#if 0 // TODO: skiplist refactor
#if 0 // TODO: skiplist refactor
if (TD_ROW_VER(rimem) > maxVer) {
rimem = NULL;
}
......@@ -1677,7 +1677,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
colIdOfRow2 = tdKvRowColIdAt(row2, k);
}
if (colIdOfRow1 < colIdOfRow2) { // the most probability
if (colIdOfRow1 < colIdOfRow2) { // the most probability
if (colIdOfRow1 < pColInfo->info.colId) {
++j;
continue;
......@@ -1720,7 +1720,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow);
}
++nResult;
} else if (update){
} else if (update) {
mergeOption = 2;
} else {
mergeOption = 0;
......@@ -1741,7 +1741,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
++(*curRow);
}
++nResult;
} else if(update) {
} else if (update) {
mergeOption = 2;
} else {
mergeOption = 0;
......@@ -2018,9 +2018,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
rv2 = TD_ROW_SVER(row2);
}
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// numOfRows += 1;
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
......@@ -2028,7 +2028,6 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->win.ekey = key;
cur->lastKey = key + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
#if 0
......@@ -2064,7 +2063,11 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
#endif
if (TD_SUPPORT_UPDATE(pCfg->update)) {
doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
......@@ -2074,10 +2077,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// ++numOfRows;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
}
......@@ -2117,15 +2120,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
lastKeyAppend = tsArray[qend];
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, qstart, qend);
if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step;
if(numOfRows > 0) {
if (numOfRows > 0) {
curRow = numOfRows - 1;
}
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey;
}
} while (numOfRows < pTsdbReadHandle->outputCapacity);
......@@ -2429,7 +2437,7 @@ static int32_t getFirstFileDataBlock(STsdbReadHandle* pTsdbReadHandle, bool* exi
int32_t numOfTables = (int32_t)taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo);
STsdbKeepCfg* pCfg = REPO_KEEP_CFG(pTsdbReadHandle->pTsdb);
STimeWindow win = TSWINDOW_INITIALIZER;
STimeWindow win = TSWINDOW_INITIALIZER;
while (true) {
tsdbRLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
......@@ -2735,7 +2743,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
STSchema* pSchema = NULL;
TSKEY lastRowKey = TSKEY_INITIAL_VAL;
do {
STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL, TD_VER_MAX);
if (row == NULL) {
......@@ -2760,8 +2767,8 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
pSchema = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), pCheckInfo->tableId, 0);
rv = TD_ROW_SVER(row);
}
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema,
NULL, pCfg->update, &lastRowKey);
numOfRows += mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, &curRows, row, NULL, numOfCols, pCheckInfo->tableId,
pSchema, NULL, pCfg->update, &lastRowKey);
if (numOfRows >= maxRowsToRead) {
moveToNextRowInMem(pCheckInfo);
......@@ -2770,7 +2777,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
} while (moveToNextRowInMem(pCheckInfo));
taosMemoryFreeClear(pSchema); // free the STSChema
taosMemoryFreeClear(pSchema); // free the STSChema
assert(numOfRows <= maxRowsToRead);
......@@ -2898,8 +2905,8 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) {
// if (ret != TSDB_CODE_SUCCESS) {
// return false;
// }
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols, pCheckInfo->tableId,
NULL, NULL, true, &lastRowKey);
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, pRow, NULL, numOfCols,
pCheckInfo->tableId, NULL, NULL, true, &lastRowKey);
taosMemoryFreeClear(pRow);
// update the last key value
......@@ -3468,7 +3475,7 @@ void tsdbRetrieveDataBlockInfo(tsdbReaderT* pTsdbReadHandle, SDataBlockInfo* pDa
pDataBlockInfo->rows = cur->rows;
pDataBlockInfo->window = cur->win;
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
// ASSERT(pDataBlockInfo->numOfCols >= (int32_t)(QH_GET_NUM_OF_COLS(pHandle));
}
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册