提交 553f1bd5 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -129,6 +129,8 @@ typedef struct SReaderStatus {
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
SFileBlockDumpInfo fBlockDumpInfo;
SDFileSet* pCurrentFileSet; // current opened file set
SBlockData fileBlockData;
SFileSetIter fileIter;
SDataBlockIter blockIter;
......@@ -352,6 +354,8 @@ static int32_t initFileIterator(SFileSetIter* pIter, const STsdbFSState* pFState
static void resetDataBlockIterator(SDataBlockIter* pIter) {
pIter->numOfBlocks = -1;
pIter->index = -1;
pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo));
}
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
......@@ -363,13 +367,14 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
// check file the time range of coverage
STimeWindow win = {0};
SDFileSet *pDFile = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pDFile);
int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileSet);
if (code != TSDB_CODE_SUCCESS) {
goto _err;
}
// todo file range check
// tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
// current file are not overlapped with query time window, ignore remain files
......@@ -380,6 +385,8 @@ static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader*
return false;
}
return true;
_err:
return false;
}
......@@ -884,10 +891,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
pScanInfo->pBlockList = taosArrayInit(16, sizeof(SBlock));
}
pScanInfo->blockIdx = blockIndex;
taosArrayPush(pIndexList, &blockIndex);
}
tMapDataClear(&blockIdxMap);
// tMapDataClear(&blockIdxMap);
return TSDB_CODE_SUCCESS;
_err:
......@@ -895,7 +903,7 @@ _err:
return code;
}
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables) {
static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_t* numOfValidTables, int32_t* numOfBlocks) {
size_t numOfTables = taosArrayGetSize(pIndexList);
*numOfValidTables = 0;
......@@ -917,6 +925,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return code;
}
// 1. time range check
if ((ASCENDING_TRAVERSE(pReader->order) &&
(block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey)) ||
(!ASCENDING_TRAVERSE(pReader->order) &&
......@@ -924,10 +933,17 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
continue;
}
// 2. version range check
// if (block.minVersion > pReader->startVersion || block.maxVersion < pReader->endVersion) {
// continue;
// }
void* p = taosArrayPush(pScanInfo->pBlockList, &block);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*numOfBlocks) += 1;
}
if (pScanInfo->pBlockList != NULL && taosArrayGetSize(pScanInfo->pBlockList) > 0) {
......@@ -940,7 +956,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs();
int32_t numOfCols = taosArrayGetSize(pReader->pResBlock->pDataBlock);
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
......@@ -1983,7 +1998,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
int32_t cnt = 0;
void* ptr = NULL;
while(1) {
ptr = taosHashIterate(pReader->status.pTableMap, &ptr);
ptr = taosHashIterate(pReader->status.pTableMap, ptr);
if (ptr == NULL) {
break;
}
......@@ -2025,6 +2040,8 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted %s",
pReader, cnt, pReader->idStr);
pBlockIter->index = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -2428,18 +2445,29 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->mem, pReader->suid, pBlockScanInfo->uid, &d);
tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (d != NULL) {
code = tsdbTbDataIterCreate(d, &startKey, 0, &pBlockScanInfo->iter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
STbData* di = NULL;
if (pReader->pTsdb->imem != NULL) {
tsdbGetTbDataFromMemTable(pReader->pTsdb->imem, pReader->suid, pBlockScanInfo->uid, &di);
tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (di != NULL) {
code = tsdbTbDataIterCreate(di, &startKey, 0, &pBlockScanInfo->iiter);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
}
pBlockScanInfo->iterInit = true;
......@@ -2494,7 +2522,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
if (taosArrayGetSize(pIndexList) > 0) {
uint32_t numOfValidTable = 0;
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable);
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2539,6 +2567,7 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
*exists = true;
}
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -3167,7 +3196,15 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pStatus->loadFromFile) {
bool exists = true;
int32_t code = loadDataInFiles(pReader, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
return true;
} else { // let's try the in-memory buffer
}
} else { // no data in files, let's try the buffer
while(1) {
if (pStatus->pTableIter == NULL) {
......@@ -3323,10 +3360,12 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlockData data = {0};
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &data);
int32_t code = tBlockDataInit(&pReader->status.fileBlockData);
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData);
// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0);
// doAppendOneRow(pReader->pResBlock, pReader, row.);
// todo convert blockData to ssdatablock
return pReader->pResBlock->pDataBlock;
}
// /**
// * In the following two cases, the data has been loaded to SColumnInfoData.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册