提交 a14aa3aa 编写于 作者: H Hongze Cheng

Merge branch 'refact/tsdb_last' of https://github.com/taosdata/TDengine into refact/tsdb_last

......@@ -88,6 +88,7 @@ typedef struct SLastBlockReader {
SBlockData lastBlockData;
STimeWindow window;
SVersionRange verRange;
int32_t order;
uint64_t uid;
int16_t* rowIndex; // row index ptr, usually from the STableBlockScanInfo->indexInBlockL
} SLastBlockReader;
......@@ -313,11 +314,11 @@ static void limitOutputBufferSize(const SQueryTableDataCond* pCond, int32_t* cap
}
// init file iterator
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32_t order, const char* idstr) {
static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdbReader* pReader/*int32_t order, const char* idstr*/) {
size_t numOfFileset = taosArrayGetSize(aDFileSet);
pIter->index = ASCENDING_TRAVERSE(order) ? -1 : numOfFileset;
pIter->order = order;
pIter->index = ASCENDING_TRAVERSE(pReader->order) ? -1 : numOfFileset;
pIter->order = pReader->order;
pIter->pFileList = aDFileSet;
pIter->numOfFiles = numOfFileset;
......@@ -325,14 +326,18 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
if (pIter->pLastBlockReader == NULL) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr);
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
return code;
}
pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
SLastBlockReader* pLReader = pIter->pLastBlockReader;
pLReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
pLReader->order = pReader->order;
pLReader->window = pReader->window;
pLReader->verRange = pReader->verRange;
}
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
......@@ -1284,7 +1289,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
// todo here we need to each key in the last files to identify if it is really overlapped with last block
bool overlapWithlastBlock = false;
if (hasDataInLastBlock(pLastBlockReader)) {
if (/*hasDataInLastBlock(pLastBlockReader)*/taosArrayGetSize(pLastBlockReader->pBlockL) > 0) {
SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
}
......@@ -1364,7 +1369,6 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
return pReader->pMemSchema;
}
// todo handle desc
static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
......@@ -1372,51 +1376,96 @@ static int32_t doMergeBufAndFileRows_Rv(STsdbReader* pReader, STableBlockScanInf
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
int64_t tsLast = INT64_MIN;
if (pLastBlockReader->lastBlockData.nRow > 0) {
tsLast = getCurrentKeyInLastBlock(pLastBlockReader);
}
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t minKey = INT64_MAX;
if (minKey > tsLast) {
minKey = tsLast;
}
int64_t minKey = 0;
if (pReader->order == TSDB_ORDER_ASC) {
minKey = INT64_MAX; // chosen the minimum value
if (minKey > tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
minKey = tsLast;
}
if (minKey > k.ts) {
minKey = k.ts;
}
if (minKey > k.ts) {
minKey = k.ts;
}
if (minKey > key && pBlockData->nRow > 0) {
minKey = key;
if (minKey > key && pBlockData->nRow > 0) {
minKey = key;
}
} else {
minKey = INT64_MIN;
if (minKey < tsLast && pLastBlockReader->lastBlockData.nRow > 0) {
minKey = tsLast;
}
if (minKey < k.ts) {
minKey = k.ts;
}
if (minKey < key && pBlockData->nRow > 0) {
minKey = key;
}
}
// file block ---> last block -----> imem -----> mem
bool init = false;
if (minKey == key) {
init = true;
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
if (minKey == tsLast) {
if (!init) {
// file block ---> last block -----> imem -----> mem
if (pReader->order == TSDB_ORDER_ASC) {
if (minKey == key) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
if (minKey == k.ts) {
if (!init) {
if (minKey == k.ts) {
if (!init) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
}
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
} else {
if (minKey == k.ts) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
if (minKey == tsLast) {
if (!init) {
init = true;
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, *pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
}
doMergeRowsInLastBlock(pLastBlockReader, tsLast, &merge);
}
doMergeRowsInBuf(pIter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
if (minKey == key) {
if (!init) {
init = true;
tRowMergerInit(&merge, &fRow, pReader->pSchema);
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
}
tRowMergerGetRow(&merge, &pTSRow);
......@@ -1500,6 +1549,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS;
}
// todo handle the desc order check
static int32_t doMergeThreeLevelRowsRv(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
......@@ -1753,23 +1803,34 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin,
SVersionRange* pVerRange, int16_t* startPos) {
static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, int16_t* startPos) {
pLastBlockReader->uid = uid;
pLastBlockReader->window = *pWin;
pLastBlockReader->verRange = *pVerRange;
pLastBlockReader->rowIndex = startPos;
if (*startPos == -1) {
if (ASCENDING_TRAVERSE(pLastBlockReader->order)) {
// do nothing
} else {
*startPos = pLastBlockReader->lastBlockData.nRow;
}
}
}
#define ALL_ROWS_CHECKED_INDEX INT16_MIN
static void setAllRowsChecked(SLastBlockReader *pLastBlockReader) {
*pLastBlockReader->rowIndex = ALL_ROWS_CHECKED_INDEX;
}
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
if (*(pLastBlockReader->rowIndex) >= pLastBlockReader->lastBlockData.nRow) {
int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1;
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
return false;
}
*(pLastBlockReader->rowIndex) += 1;
*(pLastBlockReader->rowIndex) += step;
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow; ++i) {
for(int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) {
if (pBlockData->aUid[i] != pLastBlockReader->uid) {
continue;
}
......@@ -1784,12 +1845,12 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
// no data any more
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
setAllRowsChecked(pLastBlockReader);
return false;
}
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
setAllRowsChecked(pLastBlockReader);
return false;
}
......@@ -1798,7 +1859,7 @@ static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
}
// set all data is consumed in last block
*(pLastBlockReader->rowIndex) = pBlockData->nRow;
setAllRowsChecked(pLastBlockReader);
return false;
}
......@@ -1817,15 +1878,14 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
return pBlockData->aTSKEY[*pLastBlockReader->rowIndex];
}
// todo handle desc order
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if (*pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) {
return false;
}
return true;
}
// todo refactor
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -1849,6 +1909,25 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
if (pBlockData->nRow > 0) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// no last block
if (pLastBlockReader->lastBlockData.nRow == 0) {
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS;
} else {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
}
// row in last file block
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
if (ts < key) { // save rows in last block
......@@ -1901,7 +1980,6 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
}
}
} else { // only last block exists
// only last block exits
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -1936,7 +2014,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
}
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, &pBlockScanInfo->indexInBlockL);
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pBlockScanInfo->indexInBlockL);
// bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -2274,12 +2352,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
// todo opt perf by avoiding load last block repeatly
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
if (code != TSDB_CODE_SUCCESS) { // todo handle error
if (code != TSDB_CODE_SUCCESS) {
return code;
}
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == -1) {
initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL);
if (pScanInfo->indexInBlockL == -1 || pScanInfo->indexInBlockL == pLastBlockReader->lastBlockData.nRow) {
bool hasData = nextRowInLastBlock(pLastBlockReader);
if (!hasData) { // current table does not have rows in last block, try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
......@@ -2327,10 +2405,10 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
// load the last data block of current table
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
return code;
}
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, &pScanInfo->indexInBlockL);
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pScanInfo->indexInBlockL);
}
if (pBlockInfo == NULL) { // build data block from last data file
......@@ -2340,7 +2418,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
tBlockDataReset(&pStatus->fileBlockData);
code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
// todo
return code;
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
......@@ -2456,6 +2534,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
// all data blocks are checked in this last block file, now let's try the next file
if (pReader->status.pTableIter == NULL) {
code = initForFirstBlockInFile(pReader, pBlockIter);
......@@ -2881,7 +2963,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS;
}
// todo support desc order
// todo check if the rows are dropped or not
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) {
while(nextRowInLastBlock(pLastBlockReader)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
......@@ -3240,7 +3322,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
// no data in files, let's try buffer in memory
......@@ -3261,8 +3343,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
goto _err;
}
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader->order,
pPrevReader->idStr);
initFilesetIterator(&pPrevReader->status.fileIter, pPrevReader->pReadSnap->fs.aDFileSet, pPrevReader);
resetDataBlockIterator(&pPrevReader->status.blockIter, pPrevReader->order, pReader->status.pTableMap);
// no data in files, let's try buffer in memory
......@@ -3507,13 +3588,13 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
tBlockDataReset(&pStatus->fileBlockData);
int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
//todo
terrno = code;
return NULL;
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData, 1);
terrno = code;
return NULL;
}
......@@ -3555,7 +3636,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
tsdbDataFReaderClose(&pReader->pFileReader);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader->order, pReader->idStr);
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
resetDataBlockIterator(&pReader->status.blockIter, pReader->order, pReader->status.pTableMap);
resetDataBlockScanInfo(pReader->status.pTableMap);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册