未验证 提交 4c1e835d 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #12528 from taosdata/feature/3.0_liaohj

fix(query): merge updated data from cache.
......@@ -1271,7 +1271,6 @@ _error:
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos);
......@@ -1301,7 +1300,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
int32_t step = ascScan ? 1 : -1;
TSKEY maxKey =
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
......@@ -1790,22 +1789,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
#endif
}
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
return;
}
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pTsdbReadHandle->outputCapacity) {
int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
}
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
int32_t numOfExisted, int32_t* start, int32_t* end) {
*start = -1;
......@@ -1891,9 +1874,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = true;
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
pos = endPos + step;
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
......@@ -1944,18 +1924,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t step = ascScan ? 1 : -1;
// for search the endPos, so the order needs to reverse
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
STimeWindow* pWin = &blockInfo.window;
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d, end:%d, %s",
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
" rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list
......@@ -1986,20 +1966,16 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
TSKEY key = TD_ROW_KEY(row1);
if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
break;
}
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) &&
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
break;
}
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = TD_ROW_SVER(row1);
......@@ -2054,11 +2030,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
}
#endif
if (TD_SUPPORT_UPDATE(pCfg->update)) {
if (lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
lastKeyAppend = key;
if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
......@@ -2068,9 +2041,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2);
}
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
// still assign data into current row
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
......@@ -2081,12 +2055,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
++curRow;
pos += step;
} else {
moveToNextRowInMem(pCheckInfo);
}
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
} else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
}
......@@ -2112,17 +2087,17 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
if ((lastKeyAppend != TSKEY_INITIAL_VAL) &&
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step;
if (numOfRows > 0) {
curRow = numOfRows - 1;
}
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart];
cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey;
}
......@@ -2134,10 +2109,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
* copy them all to result buffer, since it may be overlapped with file data block.
*/
if (node == NULL ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) &&
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
......@@ -2149,22 +2122,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start];
cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true;
}
}
}
cur->blockCompleted =
(((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) ||
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (!ascScan) {
TSWAP(cur->win.skey, cur->win.ekey);
}
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册