提交 9dc5b4cb 编写于 作者: H Haojun Liao

fix(query): support read from last file.

上级 39ea124b
...@@ -44,6 +44,7 @@ typedef struct STableBlockScanInfo { ...@@ -44,6 +44,7 @@ typedef struct STableBlockScanInfo {
SArray* delSkyline; // delete info for this table SArray* delSkyline; // delete info for this table
int32_t fileDelIndex; int32_t fileDelIndex;
bool iterInit; // whether to initialize the in-memory skip list iterator or not bool iterInit; // whether to initialize the in-memory skip list iterator or not
int16_t indexInBlockL;// row position in last block
} STableBlockScanInfo; } STableBlockScanInfo;
typedef struct SBlockOrderWrapper { typedef struct SBlockOrderWrapper {
...@@ -76,12 +77,27 @@ typedef struct SBlockLoadSuppInfo { ...@@ -76,12 +77,27 @@ typedef struct SBlockLoadSuppInfo {
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated. char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
} SBlockLoadSuppInfo; } SBlockLoadSuppInfo;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
typedef struct SLastBlockReader {
SArray* pBlockL;
int32_t currentBlockIndex;
SBlockData lastBlockData;
STimeWindow window;
SVersionRange verRange;
uint64_t uid;
int32_t rowIndex;
} SLastBlockReader;
typedef struct SFilesetIter { typedef struct SFilesetIter {
int32_t numOfFiles; // number of total files int32_t numOfFiles; // number of total files
int32_t index; // current accessed index in the list int32_t index; // current accessed index in the list
SArray* pFileList; // data file list SArray* pFileList; // data file list
int32_t order; int32_t order;
SArray* pLastBlockList;// last block array list SLastBlockReader* pLastBlockReader; // last file block reader
} SFilesetIter; } SFilesetIter;
typedef struct SFileDataBlockInfo { typedef struct SFileDataBlockInfo {
...@@ -91,14 +107,12 @@ typedef struct SFileDataBlockInfo { ...@@ -91,14 +107,12 @@ typedef struct SFileDataBlockInfo {
} SFileDataBlockInfo; } SFileDataBlockInfo;
typedef struct SDataBlockIter { typedef struct SDataBlockIter {
SBlockNumber numOfBlocks; int32_t numOfBlocks;
int32_t index; int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo> SArray* blockList; // SArray<SFileDataBlockInfo>
SArray* pLastBlockList; // last block list
int32_t order; int32_t order;
SBlock block; // current SBlock data SBlock block; // current SBlock data
SHashObj* pTableMap; SHashObj* pTableMap;
SArray* pBlockL; // ptr to SBlockL in fileIterator
} SDataBlockIter; } SDataBlockIter;
typedef struct SFileBlockDumpInfo { typedef struct SFileBlockDumpInfo {
...@@ -108,11 +122,6 @@ typedef struct SFileBlockDumpInfo { ...@@ -108,11 +122,6 @@ typedef struct SFileBlockDumpInfo {
bool allDumped; bool allDumped;
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
typedef struct SReaderStatus { typedef struct SReaderStatus {
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
SHashObj* pTableMap; // SHash<STableBlockScanInfo> SHashObj* pTableMap; // SHash<STableBlockScanInfo>
...@@ -142,7 +151,6 @@ struct STsdbReader { ...@@ -142,7 +151,6 @@ struct STsdbReader {
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
SDataFReader* pFileReader; SDataFReader* pFileReader;
SVersionRange verRange; SVersionRange verRange;
SArray* pLastBlock; // last block info
int32_t step; int32_t step;
STsdbReader* innerReader[2]; STsdbReader* innerReader[2];
...@@ -154,10 +162,11 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i ...@@ -154,10 +162,11 @@ static int buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static TSDBROW* getValidRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader, static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader,
SRowMerger* pMerger); SRowMerger* pMerger);
static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger, static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid); static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow, uint64_t uid);
static int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex); int32_t rowIndex);
static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order);
...@@ -171,6 +180,8 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb ...@@ -171,6 +180,8 @@ static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdb
static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader);
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader);
static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
...@@ -191,7 +202,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { ...@@ -191,7 +202,6 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes); pSupInfo->buildBuf[i] = taosMemoryMalloc(pCol->info.bytes);
// tsdbInfo("-------------------%d\n", pCol->info.bytes);
} }
} }
...@@ -208,7 +218,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK ...@@ -208,7 +218,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
} }
for (int32_t j = 0; j < numOfTables; ++j) { for (int32_t j = 0; j < numOfTables; ++j) {
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid, .indexInBlockL = -1};
if (ASCENDING_TRAVERSE(pTsdbReader->order)) { if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) {
info.lastKey = pTsdbReader->window.skey; info.lastKey = pTsdbReader->window.skey;
...@@ -310,13 +320,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32 ...@@ -310,13 +320,17 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, int32
pIter->pFileList = aDFileSet; pIter->pFileList = aDFileSet;
pIter->numOfFiles = numOfFileset; pIter->numOfFiles = numOfFileset;
pIter->pLastBlockList = taosArrayInit(4, sizeof(SBlockL)); if (pIter->pLastBlockReader == NULL) {
if (pIter->pLastBlockList == NULL) { pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
if (pIter->pLastBlockReader == NULL) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY; int32_t code = TSDB_CODE_OUT_OF_MEMORY;
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr); tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), idstr);
return code; return code;
} }
pIter->pLastBlockReader->pBlockL = taosArrayInit(4, sizeof(SBlockL));
}
tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr); tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, idstr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -377,7 +391,7 @@ _err: ...@@ -377,7 +391,7 @@ _err:
static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) { static void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, SHashObj* pTableMap) {
pIter->order = order; pIter->order = order;
pIter->index = -1; pIter->index = -1;
pIter->numOfBlocks = (SBlockNumber){-1, -1}; pIter->numOfBlocks = -1;
if (pIter->blockList == NULL) { if (pIter->blockList == NULL) {
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
} else { } else {
...@@ -685,7 +699,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* ...@@ -685,7 +699,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray*
} }
pBlockNum->numOfLastBlocks += 1; pBlockNum->numOfLastBlocks += 1;
taosArrayPush(pQualifiedLastBlock, &i); taosArrayPush(pQualifiedLastBlock, pLastBlock);
} }
} }
...@@ -726,12 +740,13 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ ...@@ -726,12 +740,13 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
} }
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
if (pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks) { if (taosArrayGetSize(pBlockIter->blockList) == 0) {
ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList));
return NULL; return NULL;
} else {
SFileDataBlockInfo* pFBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
return pFBlockInfo;
} }
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index);
return pBlockInfo;
} }
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
...@@ -824,13 +839,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -824,13 +839,17 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
double elapsedTime = 0; double elapsedTime = 0;
int32_t code = 0; int32_t code = 0;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
if (pFBlock != NULL) { if (pBlockInfo != NULL) {
SBlock* pBlock = getCurrentBlock(pBlockIter); SBlock* pBlock = getCurrentBlock(pBlockIter);
code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
tstrerror(code), pReader->idStr);
goto _error; goto _error;
} }
...@@ -838,14 +857,41 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -838,14 +857,41 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 tsdbDebug("%p load file block into buffer, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow, pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr); pBlock->minVer, pBlock->maxVer, elapsedTime, pReader->idStr);
} else { } else {
int32_t pos = pBlockIter->index - pBlockIter->numOfBlocks.numOfBlocks; #if 0
int32_t* index = taosArrayGet(pBlockIter->pLastBlockList, pos); SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SBlockL* pBlockL = taosArrayGet(pReader->status.fileIter.pLastBlockList, *index); uint64_t uid = pBlockInfo->uid;
SArray* pBlocks = pLastBlockReader->pBlockL;
pLastBlockReader->currentBlockIndex = -1;
// find the correct SBlockL
for(int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
SBlockL* pBlock = taosArrayGet(pBlocks, i);
if (pBlock->minUid >= uid && pBlock->maxUid <= uid) {
pLastBlockReader->currentBlockIndex = i;
break;
}
}
// SBlockL* pBlockL = taosArrayGet(pLastBlockReader->pBlockL, *index);
code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData); code = tsdbReadLastBlock(pReader->pFileReader, pBlockL, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
pBlockL->minVer, pBlockL->maxVer, tstrerror(code), pReader->idStr);
goto _error;
}
tsdbDebug("%p load last file block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlockL->nRow,
pBlockL->minVer, pBlockL->maxVer, elapsedTime, pReader->idStr);
#endif
} }
pReader->cost.blockLoadTime += elapsedTime; pReader->cost.blockLoadTime += elapsedTime;
...@@ -854,10 +900,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI ...@@ -854,10 +900,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
// tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
// ", rows:%d, %s",
// pReader, pBlockIter->index, pFBlock->tbBlockIdx, pBlock->minKey.ts, pBlock->maxKey.ts, pBlock->nRow,
// pReader->idStr);
return code; return code;
} }
...@@ -926,17 +968,12 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) { ...@@ -926,17 +968,12 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockNumber* pBlockNum, SArray* pQLastBlock) { static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks) {
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
pBlockIter->numOfBlocks = *pBlockNum; pBlockIter->numOfBlocks = numOfBlocks;
taosArrayClear(pBlockIter->blockList); taosArrayClear(pBlockIter->blockList);
if (pBlockNum->numOfLastBlocks > 0) {
taosArrayDestroy(pBlockIter->pLastBlockList);
pBlockIter->pLastBlockList = pQLastBlock;
}
// access data blocks according to the offset of each block in asc/desc order. // access data blocks according to the offset of each block in asc/desc order.
int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = (int32_t)taosHashGetSize(pReader->status.pTableMap);
...@@ -988,21 +1025,20 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -988,21 +1025,20 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
sup.numOfTables += 1; sup.numOfTables += 1;
} }
ASSERT(pBlockNum->numOfBlocks == cnt); ASSERT(numOfBlocks == cnt);
int32_t total = pBlockNum->numOfLastBlocks + pBlockNum->numOfBlocks;
// since there is only one table qualified, blocks are not sorted // since there is only one table qualified, blocks are not sorted
if (sup.numOfTables == 1 || pBlockNum->numOfBlocks == 0) { if (sup.numOfTables == 1) {
for (int32_t i = 0; i < pBlockNum->numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i}; SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
taosArrayPush(pBlockIter->blockList, &blockInfo); taosArrayPush(pBlockIter->blockList, &blockInfo);
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s", tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
pReader, total, (et - st) / 1000.0, pReader->idStr); pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
pBlockIter->index = asc ? 0 : (total - 1); pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1011,7 +1047,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -1011,7 +1047,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
pReader->idStr); pReader->idStr);
ASSERT(cnt <= pBlockNum->numOfBlocks && sup.numOfTables <= numOfTables); ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables);
SMultiwayMergeTreeInfo* pTree = NULL; SMultiwayMergeTreeInfo* pTree = NULL;
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
...@@ -1038,12 +1074,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte ...@@ -1038,12 +1074,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
} }
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, total, (et - st) / 1000.0, tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks, (et - st) / 1000.0,
pReader->idStr); pReader->idStr);
cleanupBlockOrderSupporter(&sup); cleanupBlockOrderSupporter(&sup);
taosMemoryFree(pTree); taosMemoryFree(pTree);
pBlockIter->index = asc ? 0 : (total - 1); pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
doSetCurrentBlock(pBlockIter); doSetCurrentBlock(pBlockIter);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1053,7 +1089,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) { ...@@ -1053,7 +1089,7 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
bool asc = ASCENDING_TRAVERSE(pBlockIter->order); bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
if ((pBlockIter->index >= pBlockIter->numOfBlocks.numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) { if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
return false; return false;
} }
...@@ -1100,7 +1136,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock ...@@ -1100,7 +1136,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1;
int32_t index = pBlockIter->index; int32_t index = pBlockIter->index;
while (index < pBlockIter->numOfBlocks.numOfBlocks && index >= 0) { while (index < pBlockIter->numOfBlocks && index >= 0) {
SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index); SFileDataBlockInfo* pFBlock = taosArrayGet(pBlockIter->blockList, index);
if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) { if (pFBlock->uid == pFBlockInfo->uid && pFBlock->tbBlockIdx == pFBlockInfo->tbBlockIdx) {
return index; return index;
...@@ -1114,7 +1150,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock ...@@ -1114,7 +1150,7 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
} }
static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) { static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks.numOfBlocks) { if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1; return -1;
} }
...@@ -1224,7 +1260,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl ...@@ -1224,7 +1260,7 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
// 4. output buffer should be large enough to hold all rows in current block // 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data // 5. delete info should not overlap with current block data
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock, static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
STableBlockScanInfo* pScanInfo, TSDBKEY key) { STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
int32_t neighborIndex = 0; int32_t neighborIndex = 0;
SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order); SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
...@@ -1239,8 +1275,17 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc ...@@ -1239,8 +1275,17 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc
bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true; bool hasDup = (pBlock->nSubBlock == 1) ? pBlock->hasDup : true;
bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order); bool overlapWithDel = overlapWithDelSkyline(pScanInfo, pBlock, pReader->order);
// todo here we need to each key in the last files to identify if it is really overlapped with last block
bool overlapWithlastBlock = false;
if (hasDataInLastBlock(pLastBlockReader)) {
SBlockL *pBlockL = taosArrayGet(pLastBlockReader->pBlockL, pLastBlockReader->currentBlockIndex);
// int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
overlapWithlastBlock = !(pBlock->maxKey.ts < pBlockL->minKey || pBlock->minKey.ts > pBlockL->maxKey);
}
return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) || return (overlapWithNeighbor || hasDup || dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) || overlapWithDel); keyOverlapFileBlock(key, pBlock, &pReader->verRange) || (pBlock->nRow > pReader->capacity) ||
overlapWithDel || overlapWithlastBlock);
} }
static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) { static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, int64_t endKey) {
...@@ -1279,7 +1324,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB ...@@ -1279,7 +1324,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step]; int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed if (nextKey != key) { // merge is not needed
doAppendRowFromBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex); doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
return true; return true;
} }
...@@ -1386,12 +1431,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1386,12 +1431,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
SRowMerger merge = {0}; SRowMerger merge = {0};
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SArray* pDelList = pBlockScanInfo->delSkyline; SArray* pDelList = pBlockScanInfo->delSkyline;
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pDelList, pReader);
...@@ -1532,6 +1576,14 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo* ...@@ -1532,6 +1576,14 @@ static int32_t doMergeThreeLevelRows(STsdbReader* pReader, STableBlockScanInfo*
static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo,
STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
// it is an multi-table data block
if (pBlockData->aUid != NULL) {
uint64_t uid = pBlockData->aUid[pDumpInfo->rowIndex];
if (uid != pBlockScanInfo->uid) { // move to next row
return false;
}
}
// check for version and time range // check for version and time range
int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex]; int64_t ver = pBlockData->aVersion[pDumpInfo->rowIndex];
if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) { if (ver > pReader->verRange.maxVer || ver < pReader->verRange.minVer) {
...@@ -1553,16 +1605,87 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum ...@@ -1553,16 +1605,87 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) { static void initLastBlockReader(SLastBlockReader* pLastBlockReader, uint64_t uid, STimeWindow* pWin, SVersionRange* pVerRange,
int16_t startPos) {
pLastBlockReader->uid = uid;
pLastBlockReader->window = *pWin;
pLastBlockReader->verRange = *pVerRange;
pLastBlockReader->rowIndex = startPos;
}
static bool nextRowInLastBlock(SLastBlockReader *pLastBlockReader) {
if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
return false;
}
pLastBlockReader->rowIndex += 1;
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
for(int32_t i = pLastBlockReader->rowIndex; i < pBlockData->nRow; ++i) {
if (pBlockData->aUid[i] != pLastBlockReader->uid) {
continue;
}
if (pBlockData->aTSKEY[i] < pLastBlockReader->window.skey) {
continue;
}
if (pBlockData->aVersion[i] < pLastBlockReader->verRange.minVer) {
continue;
}
// no data any more
if (pBlockData->aTSKEY[i] > pLastBlockReader->window.ekey) {
return false;
}
if (pBlockData->aVersion[i] > pLastBlockReader->verRange.maxVer) {
return false;
}
pLastBlockReader->rowIndex = i;
return true;
}
pLastBlockReader->rowIndex = pBlockData->nRow;
return false;
}
static int32_t saveCurrentState(SLastBlockReader* pLastBlockReader) {
return pLastBlockReader->rowIndex;
}
static void restoreState(SLastBlockReader* pLastBlockReader, int32_t state) {
pLastBlockReader->rowIndex = state;
}
static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) {
SBlockData* pBlockData = &pLastBlockReader->lastBlockData;
return pBlockData->aTSKEY[pLastBlockReader->rowIndex];
}
// todo handle desc order
static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) {
if (pLastBlockReader->rowIndex >= pLastBlockReader->lastBlockData.nRow) {
return false;
}
return true;
}
static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData, SLastBlockReader *pLastBlockReader) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData;
int64_t key = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t key = INT64_MIN;
if (pBlockData->nRow > 0) {
key = pBlockData->aTSKEY[pDumpInfo->rowIndex];
}
TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader); TSDBROW* pRow = getValidRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader); TSDBROW* piRow = getValidRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iter.hasVal && pBlockScanInfo->iiter.hasVal) {
return doMergeThreeLevelRows(pReader, pBlockScanInfo); return doMergeThreeLevelRows(pReader, pBlockScanInfo, pBlockData);
} else { } else {
// imem + file // imem + file
if (pBlockScanInfo->iiter.hasVal) { if (pBlockScanInfo->iiter.hasVal) {
...@@ -1574,12 +1697,48 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1574,12 +1697,48 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key); return doMergeBufAndFileRows(pReader, pBlockScanInfo, pRow, &pBlockScanInfo->iter, key);
} }
if (pBlockData->nRow > 0) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
// row in last file block
int64_t ts = getCurrentKeyInLastBlock(pLastBlockReader);
if (ts < key) { // save rows in last block
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow1 = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
} else if (ts == key) {
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
doMergeRowsInLastBlock(pLastBlockReader, ts, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
} else { // ts > key, asc; todo handle desc
// imem & mem are all empty, only file exist // imem & mem are all empty, only file exist
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) { if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} else { } else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STSRow* pTSRow = NULL; STSRow* pTSRow = NULL;
SRowMerger merge = {0}; SRowMerger merge = {0};
...@@ -1588,6 +1747,26 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI ...@@ -1588,6 +1747,26 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
tRowMergerGetRow(&merge, &pTSRow); tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow);
tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS;
}
}
} else { // only last block exists
SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tsdbRowFromBlockData(pLastBlockData, pLastBlockReader->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
doMergeRowsInLastBlock(pLastBlockReader, tsLastBlock, &merge);
tRowMergerGetRow(&merge, &pTSRow);
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid);
taosMemoryFree(pTSRow); taosMemoryFree(pTSRow);
tRowMergerClear(&merge); tRowMergerClear(&merge);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1599,25 +1778,34 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -1599,25 +1778,34 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->pResBlock; SSDataBlock* pResBlock = pReader->pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
} else {
pBlockScanInfo = pReader->status.pTableIter;
}
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
int16_t startIndex = pBlockInfo != NULL? pBlockScanInfo->indexInBlockL:-1;
initLastBlockReader(pLastBlockReader, pBlockScanInfo->uid, &pReader->window, &pReader->verRange, startIndex);
bool has = nextRowInLastBlock(pLastBlockReader); // todo handle if no data in last block
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBlockData* pBlockData = &pReader->status.fileBlockData; SBlockData* pBlockData = &pReader->status.fileBlockData;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t numOfSub = 1;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
while (1) { while (1) {
// todo check the validate of row in file block // todo check the validate of row in file block
{ {
if (!isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) { if (pBlockData->nRow > 0 && !isValidFileBlockRow(pBlockData, pDumpInfo, pBlockScanInfo, pReader)) {
pDumpInfo->rowIndex += step; pDumpInfo->rowIndex += step;
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
numOfSub = pBlock->nSubBlock;
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order); setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break; break;
...@@ -1625,13 +1813,17 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -1625,13 +1813,17 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
continue; continue;
} }
if (!hasDataInLastBlock(pLastBlockReader)) {
break;
}
} }
buildComposedDataBlockImpl(pReader, pBlockScanInfo); buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter); SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
// currently loaded file data block is consumed // currently loaded file data block is consumed
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) { if (pBlockData->nRow > 0 && (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0)) {
setBlockAllDumped(pDumpInfo, pBlock, pReader->order); setBlockAllDumped(pDumpInfo, pBlock, pReader->order);
break; break;
} }
...@@ -1647,9 +1839,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -1647,9 +1839,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
int64_t et = taosGetTimestampUs(); int64_t et = taosGetTimestampUs();
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, subBlock:%d, brange:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s", " rows:%d, elapsed time:%.2f ms %s",
pReader, pBlockScanInfo->uid, numOfSub, pResBlock->info.window.skey, pResBlock->info.window.ekey, pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1825,7 +2017,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead ...@@ -1825,7 +2017,7 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
return key; return key;
} }
static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SArray* pQLastBlock) { static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
...@@ -1844,21 +2036,29 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr ...@@ -1844,21 +2036,29 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return code; return code;
} }
code = tsdbReadBlockL(pReader->pFileReader, pStatus->fileIter.pLastBlockList); SArray* pLastBlocks = pStatus->fileIter.pLastBlockReader->pBlockL;
code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
return code; return code;
} }
if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pStatus->fileIter.pLastBlockList) > 0) { if (taosArrayGetSize(pIndexList) > 0 || taosArrayGetSize(pLastBlocks) > 0) {
code = doLoadFileBlock(pReader, pIndexList, pStatus->fileIter.pLastBlockList, pBlockNum, pQLastBlock); SArray* pQLastBlock = taosArrayInit(4, sizeof(SBlockL));
code = doLoadFileBlock(pReader, pIndexList, pLastBlocks, pBlockNum, pQLastBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pIndexList); taosArrayDestroy(pIndexList);
taosArrayDestroy(pQLastBlock);
return code; return code;
} }
if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) { if (pBlockNum->numOfBlocks + pBlockNum->numOfLastBlocks > 0) {
ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks); ASSERT(taosArrayGetSize(pQLastBlock) == pBlockNum->numOfLastBlocks);
taosArrayClear(pLastBlocks);
taosArrayAddAll(pLastBlocks, pQLastBlock);
taosArrayDestroy(pQLastBlock);
break; break;
} }
} }
...@@ -1870,31 +2070,113 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr ...@@ -1870,31 +2070,113 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, uint64_t uid, STsdbReader* pReader) {
SArray* pBlocks = pLastBlockReader->pBlockL;
SBlockL* pBlock = NULL;
pLastBlockReader->currentBlockIndex = -1;
// find the correct SBlockL
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); ++i) {
SBlockL* p = taosArrayGet(pBlocks, i);
if (p->minUid <= uid && p->maxUid >= uid) {
pLastBlockReader->currentBlockIndex = i;
pBlock = p;
break;
}
}
if (pLastBlockReader->currentBlockIndex == -1) {
return TSDB_CODE_SUCCESS;
}
int32_t code = tBlockDataCreate(&pLastBlockReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tBlockDataInit(&pLastBlockReader->lastBlockData, pReader->suid, pReader->suid ? 0 : uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
//todo add log
return code;
}
code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData);
if (code != TSDB_CODE_SUCCESS) {
// tsdbDebug("%p error occurs in loading last block into buffer, last block index:%d, total:%d brange:%" PRId64 "-%" PRId64
// ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", code:%s %s",
// pReader, *index, pBlockIter->numOfBlocks.numOfLastBlocks, 0, 0, pBlock->nRow,
// pBlock->minVer, pBlock->maxVer, tstrerror(code), pReader->idStr);
}
return TSDB_CODE_SUCCESS;
}
static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
while(1) {
if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
} else { // let's try next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return TSDB_CODE_SUCCESS;
}
}
// find the last block that contain the specified block uid
return doLoadRelatedLastBlock(pLastBlockReader, pStatus->pTableIter->uid, pReader);
//todo check for all empty table
}
}
static int32_t doBuildDataBlock(STsdbReader* pReader) { static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = NULL;
SBlock* pBlock = NULL; SBlock* pBlock = NULL;
TSDBKEY key = {0}; TSDBKEY key = {0};
STableBlockScanInfo* pScanInfo = NULL;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
if (pFBlock != NULL) { if (pBlockInfo != NULL) {
pScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); pScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
pBlock = getCurrentBlock(pBlockIter); pBlock = getCurrentBlock(pBlockIter);
key = getCurrentKeyInBuf(pBlockIter, pReader); key = getCurrentKeyInBuf(pBlockIter, pReader);
code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo->uid, pReader);
if (code != TSDB_CODE_SUCCESS) {
// todo handle error
}
initLastBlockReader(pLastBlockReader, pBlockInfo->uid, &pReader->window, &pReader->verRange, pScanInfo->indexInBlockL);
bool hasData = nextRowInLastBlock(pLastBlockReader);
} else {
ASSERT(pBlockIter->numOfBlocks == 0);
} }
if (pFBlock == NULL || fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) { if (pBlockInfo == NULL || fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, key, pLastBlockReader)) {
if (pBlockInfo != NULL) {
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData); code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pScanInfo->uid, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
//todo
}
code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
}
// build composed data block // build composed data block
code = buildComposedDataBlock(pReader); code = buildComposedDataBlock(pReader);
...@@ -1962,22 +2244,24 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) ...@@ -1962,22 +2244,24 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SBlockNumber num = {0}; SBlockNumber num = {0};
SArray* pQLastBlock = taosArrayInit(4, sizeof(int32_t));
int32_t code = moveToNextFile(pReader, &num, pQLastBlock); int32_t code = moveToNextFile(pReader, &num);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
// all data files are consumed, try data in buffer // all data files are consumed, try data in buffer
if (num.numOfBlocks + num.numOfLastBlocks == 0) { if (num.numOfBlocks + num.numOfLastBlocks == 0) {
taosArrayDestroy(pQLastBlock);
pReader->status.loadFromFile = false; pReader->status.loadFromFile = false;
return code; return code;
} }
// initialize the block iterator for a new fileset // initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, &num, pQLastBlock); if (num.numOfBlocks > 0) {
code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks);
} else {
pBlockIter->numOfBlocks = 0;
}
// set the correct start position according to the query time window // set the correct start position according to the query time window
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
...@@ -1995,6 +2279,38 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -1995,6 +2279,38 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
if (pBlockIter->numOfBlocks == 0) {
_begin:
code = doLoadLastBlockSequentially(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// all data blocks are check in last file, now let's try the next file
if (pReader->status.pTableIter == NULL) {
code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code;
}
// this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) {
goto _begin;
}
}
code = doBuildDataBlock(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
while (1) { while (1) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
...@@ -2007,17 +2323,27 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { ...@@ -2007,17 +2323,27 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
bool hasNext = blockIteratorNext(&pReader->status.blockIter); bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (hasNext) { // check for the next block in the block accessed order list if (hasNext) { // check for the next block in the block accessed order list
initBlockDumpInfo(pReader, pBlockIter); initBlockDumpInfo(pReader, pBlockIter);
} else { // data blocks in current file are exhausted, let's try the next file now } else if (taosArrayGetSize(pReader->status.fileIter.pLastBlockReader->pBlockL) > 0) { // data blocks in current file are exhausted, let's try the next file now
// todo dump all data in last block if exists.
pBlockIter->numOfBlocks = 0;
taosArrayClear(pBlockIter->blockList);
tBlockDataReset(&pReader->status.fileBlockData);
goto _begin;
} else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
// error happens or all the data files are completely checked // error happens or all the data files are completely checked
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
return code; return code;
} }
// this file does not have blocks, let's start check the last block file
if (pBlockIter->numOfBlocks == 0) {
goto _begin;
}
} }
} }
// current block is not loaded yet, or data in buffer may overlap with the file block.
code = doBuildDataBlock(pReader); code = doBuildDataBlock(pReader);
} }
...@@ -2336,7 +2662,6 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn ...@@ -2336,7 +2662,6 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
// 3. load the neighbor block, and set it to be the currently accessed file data block // 3. load the neighbor block, and set it to be the currently accessed file data block
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData);
int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData); int32_t code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -2386,6 +2711,21 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc ...@@ -2386,6 +2711,21 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// todo support desc order
int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, int64_t ts, SRowMerger* pMerger) {
while(nextRowInLastBlock(pLastBlockReader)) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 == ts) {
TSDBROW fRow1 = tsdbRowFromBlockData(&pLastBlockReader->lastBlockData, pLastBlockReader->rowIndex);
tRowMerge(pMerger, &fRow1);
} else {
break;
}
}
return TSDB_CODE_SUCCESS;
}
void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) { STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL; TSDBROW* pNextRow = NULL;
...@@ -2558,7 +2898,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* ...@@ -2558,7 +2898,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doAppendRowFromBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) { int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData, int32_t rowIndex) {
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t outputRowIndex = pResBlock->info.rows; int32_t outputRowIndex = pResBlock->info.rows;
...@@ -3021,8 +3361,12 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) { ...@@ -3021,8 +3361,12 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid)); STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
tBlockDataReset(&pStatus->fileBlockData); tBlockDataReset(&pStatus->fileBlockData);
tBlockDataClear(&pStatus->fileBlockData); int32_t code = tBlockDataInit(&pStatus->fileBlockData, pReader->suid, pBlockScanInfo->uid, pReader->pSchema);
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData); if (code != TSDB_CODE_SUCCESS) {
//todo
}
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData, 1); tBlockDataDestroy(&pStatus->fileBlockData, 1);
...@@ -3118,12 +3462,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -3118,12 +3462,12 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
SDataBlockIter* pBlockIter = &pStatus->blockIter; SDataBlockIter* pBlockIter = &pStatus->blockIter;
pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles; pTableBlockInfo->numOfFiles += pStatus->fileIter.numOfFiles;
if (pBlockIter->numOfBlocks.numOfBlocks > 0) { if (pBlockIter->numOfBlocks > 0) {
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks; pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
} }
pTableBlockInfo->numOfTables = numOfTables; pTableBlockInfo->numOfTables = numOfTables;
bool hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0); bool hasNext = (pBlockIter->numOfBlocks > 0);
while (true) { while (true) {
if (hasNext) { if (hasNext) {
...@@ -3154,8 +3498,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa ...@@ -3154,8 +3498,8 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
break; break;
} }
pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks.numOfBlocks; pTableBlockInfo->numOfBlocks += pBlockIter->numOfBlocks;
hasNext = (pBlockIter->numOfBlocks.numOfBlocks > 0); hasNext = (pBlockIter->numOfBlocks > 0);
} }
// tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables, // tsdbDebug("%p %d blocks found in file for %d table(s), fid:%d, %s", pReader, numOfBlocks, numOfTables,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册