提交 619eae3c 编写于 作者: H Haojun Liao

fix(query): merge updated data from cache.

上级 a66bc987
...@@ -1271,7 +1271,6 @@ _error: ...@@ -1271,7 +1271,6 @@ _error:
static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo); static int32_t getEndPosInDataBlock(STsdbReadHandle* pTsdbReadHandle, SDataBlockInfo* pBlockInfo);
static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows,
int32_t start, int32_t end); int32_t start, int32_t end);
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols);
static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle); static void doCheckGeneratedBlockRange(STsdbReadHandle* pTsdbReadHandle);
static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo,
SDataBlockInfo* pBlockInfo, int32_t endPos); SDataBlockInfo* pBlockInfo, int32_t endPos);
...@@ -1301,7 +1300,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock* ...@@ -1301,7 +1300,7 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) || if ((ascScan && (key != TSKEY_INITIAL_VAL && key < binfo.window.skey)) ||
(!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) { (!ascScan && (key != TSKEY_INITIAL_VAL && key > binfo.window.ekey))) {
// do not load file block into buffer // do not load file block into buffer
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1; int32_t step = ascScan ? 1 : -1;
TSKEY maxKey = TSKEY maxKey =
ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step); ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? (binfo.window.skey - step) : (binfo.window.ekey - step);
...@@ -1790,22 +1789,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa ...@@ -1790,22 +1789,6 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
#endif #endif
} }
static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) {
if (numOfRows == 0 || ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
return;
}
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
if (numOfRows < pTsdbReadHandle->outputCapacity) {
int32_t emptySize = pTsdbReadHandle->outputCapacity - numOfRows;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pTsdbReadHandle->pColumns, i);
memmove((char*)pColInfo->pData, (char*)pColInfo->pData + emptySize * pColInfo->info.bytes,
numOfRows * pColInfo->info.bytes);
}
}
}
static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos, static void getQualifiedRowsPos(STsdbReadHandle* pTsdbReadHandle, int32_t startPos, int32_t endPos,
int32_t numOfExisted, int32_t* start, int32_t* end) { int32_t numOfExisted, int32_t* start, int32_t* end) {
*start = -1; *start = -1;
...@@ -1891,9 +1874,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa ...@@ -1891,9 +1874,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, STa
cur->lastKey = tsArray[endPos] + step; cur->lastKey = tsArray[endPos] + step;
cur->blockCompleted = true; cur->blockCompleted = true;
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
// The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases. // The value of pos may be -1 or pBlockInfo->rows, and it is invalid in both cases.
pos = endPos + step; pos = endPos + step;
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
...@@ -1944,18 +1924,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -1944,18 +1924,18 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst &&
tsArray[pBlock->numOfRows - 1] == pBlock->keyLast); tsArray[pBlock->numOfRows - 1] == pBlock->keyLast);
bool ascScan = ASCENDING_TRAVERSE(pTsdbReadHandle->order);
int32_t step = ascScan ? 1 : -1;
// for search the endPos, so the order needs to reverse // for search the endPos, so the order needs to reverse
int32_t order = (pTsdbReadHandle->order == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC; int32_t order = ascScan ? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? 1 : -1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle)); int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pTsdbReadHandle));
STable* pTable = NULL;
int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo); int32_t endPos = getEndPosInDataBlock(pTsdbReadHandle, &blockInfo);
STimeWindow* pWin = &blockInfo.window;
tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 " start merge data block, file block range:%" PRIu64 "-%" PRIu64
" rows:%d, start:%d, end:%d, %s", " rows:%d, start:%d, end:%d, %s", pTsdbReadHandle, pCheckInfo->tableId, pWin->skey, pWin->ekey, blockInfo.rows,
pTsdbReadHandle, pCheckInfo->tableId, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows,
cur->pos, endPos, pTsdbReadHandle->idStr); cur->pos, endPos, pTsdbReadHandle->idStr);
// compared with the data from in-memory buffer, to generate the correct timestamp array list // compared with the data from in-memory buffer, to generate the correct timestamp array list
...@@ -1986,20 +1966,16 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -1986,20 +1966,16 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
TSKEY key = TD_ROW_KEY(row1); TSKEY key = TD_ROW_KEY(row1);
if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || if ((key > pTsdbReadHandle->window.ekey && ascScan) || (key < pTsdbReadHandle->window.ekey && !ascScan)) {
(key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
break; break;
} }
if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && if (((pos > endPos || tsArray[pos] > pTsdbReadHandle->window.ekey) && ascScan) ||
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || ((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) && !ascScan)) {
((pos < endPos || tsArray[pos] < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
break; break;
} }
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || if ((key < tsArray[pos] && ascScan) || (key > tsArray[pos] && !ascScan)) {
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if (rv1 != TD_ROW_SVER(row1)) { if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
rv1 = TD_ROW_SVER(row1); rv1 = TD_ROW_SVER(row1);
...@@ -2054,11 +2030,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2054,11 +2030,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
} }
#endif #endif
if (TD_SUPPORT_UPDATE(pCfg->update)) { if (TD_SUPPORT_UPDATE(pCfg->update)) {
if (lastKeyAppend != key) {
lastKeyAppend = key;
++curRow;
}
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos); numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, pos, pos);
lastKeyAppend = key;
if (rv1 != TD_ROW_SVER(row1)) { if (rv1 != TD_ROW_SVER(row1)) {
// pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1));
...@@ -2068,9 +2041,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2068,9 +2041,10 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
// pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2));
rv2 = TD_ROW_SVER(row2); rv2 = TD_ROW_SVER(row2);
} }
numOfRows +=
mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols, // still assign data into current row
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend); mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, &curRow, row1, row2, numOfCols,
pCheckInfo->tableId, pSchema1, pSchema2, pCfg->update, &lastKeyAppend);
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key; cur->win.skey = key;
...@@ -2081,12 +2055,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2081,12 +2055,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
cur->mixBlock = true; cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
++curRow;
pos += step; pos += step;
} else { } else {
moveToNextRowInMem(pCheckInfo); moveToNextRowInMem(pCheckInfo);
} }
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || } else if ((key > tsArray[pos] && ascScan) || (key < tsArray[pos] && !ascScan)) {
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
} }
...@@ -2112,17 +2087,17 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2112,17 +2087,17 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
int32_t qstart = 0, qend = 0; int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend); getQualifiedRowsPos(pTsdbReadHandle, pos, end, numOfRows, &qstart, &qend);
if ((lastKeyAppend != TSKEY_INITIAL_VAL) && if ((lastKeyAppend != TSKEY_INITIAL_VAL) && (lastKeyAppend != (ascScan ? tsArray[qstart] : tsArray[qend]))) {
(lastKeyAppend != (ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qstart] : tsArray[qend]))) {
++curRow; ++curRow;
} }
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend); numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, curRow, qstart, qend);
pos += (qend - qstart + 1) * step; pos += (qend - qstart + 1) * step;
if (numOfRows > 0) { if (numOfRows > 0) {
curRow = numOfRows - 1; curRow = numOfRows - 1;
} }
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[qend] : tsArray[qstart]; cur->win.ekey = ascScan ? tsArray[qend] : tsArray[qstart];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
lastKeyAppend = cur->win.ekey; lastKeyAppend = cur->win.ekey;
} }
...@@ -2134,10 +2109,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2134,10 +2109,8 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
* copy them all to result buffer, since it may be overlapped with file data block. * copy them all to result buffer, since it may be overlapped with file data block.
*/ */
if (node == NULL || if (node == NULL ||
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ascScan) ||
ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ascScan)) {
((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) &&
!ASCENDING_TRAVERSE(pTsdbReadHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block // no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) { if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos]; cur->win.skey = tsArray[pos];
...@@ -2149,22 +2122,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf ...@@ -2149,22 +2122,20 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf
numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end); numOfRows = doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step; pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pTsdbReadHandle->order) ? tsArray[end] : tsArray[start]; cur->win.ekey = ascScan ? tsArray[end] : tsArray[start];
cur->lastKey = cur->win.ekey + step; cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true; cur->mixBlock = true;
} }
} }
} }
cur->blockCompleted = cur->blockCompleted = (((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ascScan) ||
(((pos > endPos || cur->lastKey > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || ((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ascScan));
((pos < endPos || cur->lastKey < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order)));
if (!ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (!ascScan) {
TSWAP(cur->win.skey, cur->win.ekey); TSWAP(cur->win.skey, cur->win.ekey);
} }
moveDataToFront(pTsdbReadHandle, numOfRows, numOfCols);
updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos); updateInfoAfterMerge(pTsdbReadHandle, pCheckInfo, numOfRows, pos);
doCheckGeneratedBlockRange(pTsdbReadHandle); doCheckGeneratedBlockRange(pTsdbReadHandle);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册