提交 dde1b94d 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -363,12 +363,11 @@ static void resetDataBlockIterator(SDataBlockIter* pIter) {
}
static bool filesetIteratorNext(SFileSetIter* pIter, int32_t order, STsdbReader* pReader) {
pIter->index += 1;
if (pIter->index >= pIter->numOfFiles) {
return false;
}
pIter->index += 1;
// check file the time range of coverage
STimeWindow win = {0};
pReader->status.pCurrentFileSet = (SDFileSet *)taosArrayGet(pIter->pFileList, pIter->index);
......@@ -920,7 +919,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
tsdbReadBlock(pReader->pFileReader, pBlockIdx, &mapData, NULL);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t));
ASSERT(pScanInfo->pBlockList == NULL || taosArrayGetSize(pScanInfo->pBlockList) == 0);
taosArrayClear(pScanInfo->pBlockList);
for (int32_t j = 0; j < mapData.nItem; ++j) {
SBlock block = {0};
......@@ -971,18 +971,25 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SBlock* pBlock = taosArrayGet(pBlockScanInfo->pBlockList, pFBlock->tbBlockIdx);
uint8_t *pb = NULL, *pb1 = NULL;
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, /*pReader->suppInfo.colIdList, numOfCols, */&pb, &pb1);
int32_t code = tsdbReadBlockData(pReader->pFileReader, &pBlockScanInfo->blockIdx, pBlock, pBlockData, &pb, &pb1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
SColVal cv = {0};
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) {
SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
for(int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
if (pColData->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
colDataAppend(pColData, j, (const char*)&pBlockData->aTSKEY[j], false);
}
} else {
SColVal cv = {0};
SColData* pData = (SColData*)taosArrayGetP(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
for (int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
}
}
}
......@@ -2276,7 +2283,7 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
}
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
}
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
......@@ -2511,7 +2518,6 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
initMemIterator(pScanInfo, pReader);
if (pScanInfo->memHasVal) {
......@@ -2563,7 +2569,82 @@ static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
tBlockDataInit(&data);
code = doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// build composed data block
code = buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
code = buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
return code;
}
static int32_t buildInmemBlockSeqentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
while(1) {
if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return false;
}
}
}
static int32_t loadDataInFiles(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter;
......@@ -2583,36 +2664,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
return code;
}
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SBlock* pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
code = doBuildDataBlock(pReader);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
*exists = true;
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
......@@ -2629,46 +2682,22 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
return code;
}
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
// all data files are consumed, try data in buffer
if (numOfBlocks == 0) {
pReader->status.loadFromFile = false;
return code;
} else {
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doBuildDataBlock(pReader);
}
pFBlock = getCurrentBlockInfo(pBlockIter);
pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
*exists = true;
} else { // try next data block in current file
// 1. check if ts in buffer is overlap with current file data block
blockIteratorNext(pBlockIter);
doBuildDataBlock(pReader);
}
} else {
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
......@@ -3263,13 +3292,13 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
bool tsdbNextDataBlock(STsdbReader* pReader) {
bool ret = false;
if (isEmptyQueryTimeWindow(pReader)) {
return false;
}
// cleanup the data that belongs to the previous data block
blockDataCleanup(pReader->pResBlock);
SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock);
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
......@@ -3277,41 +3306,20 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
if (pReader->type == BLOCK_LOAD_OFFSET_ORDER) {
if (pStatus->loadFromFile) {
bool exists = true;
int32_t code = loadDataInFiles(pReader, &exists);
int32_t code = loadDataInFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
if (pBlock->info.rows > 0) {
return true;
} else { // let's try the in-memory buffer
} else {
buildInmemBlockSeqentially(pReader);
return pBlock->info.rows > 0;
}
} else { // no data in files, let's try the buffer
while(1) {
if (pStatus->pTableIter == NULL) {
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
if (pStatus->pTableIter == NULL) {
return false;
}
}
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
}
// current table is exhausted, let's try the next table
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
if (pStatus->pTableIter == NULL) {
return false;
}
}
buildInmemBlockSeqentially(pReader);
return pBlock->info.rows > 0;
}
} else if (pReader->type == BLOCK_LOAD_TABLESEQ_ORDER) {
......@@ -3351,7 +3359,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
// pReader->cost.checkForNextTime += elapsedTime;
// return ret;
// }
return ret;
return false;
}
void tsdbRetrieveDataBlockInfo(STsdbReader* pReader, SDataBlockInfo* pDataBlockInfo) {
......@@ -3450,40 +3458,6 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
return pReader->pResBlock->pDataBlock;
}
// /**
// * In the following two cases, the data has been loaded to SColumnInfoData.
// * 1. data is from cache, 2. data block is not completed qualified to query time range
// */
// if (pReader->cur.fid == INT32_MIN) {
// return pReader->pColumns;
// } else {
// SFileBlockInfo* pBlockInfo = &pReader->pDataBlockInfo[pReader->cur.slot];
// STableBlockScanInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// if (pReader->cur.mixBlock) {
// return pReader->pColumns;
// } else {
// SDataBlockInfo binfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlockInfo->compBlock);
// assert(pReader->realNumOfRows <= binfo.rows);
// // data block has been loaded, todo extract method
// SDataBlockLoadInfo* pBlockLoadInfo = &pReader->dataBlockLoadInfo;
// if (pBlockLoadInfo->slot == pReader->cur.slot && pBlockLoadInfo->fileGroup->fid == pReader->cur.fid &&
// pBlockLoadInfo->uid == pCheckInfo->tableId) {
// return pReader->pColumns;
// } else { // only load the file block
// SBlock* pBlock = pBlockInfo->compBlock;
// if (doLoadFileBlockData(pReader, pBlock, pCheckInfo, pReader->cur.slot) != TSDB_CODE_SUCCESS) {
// return NULL;
// }
// int32_t numOfRows = doCopyRowsFromFileBlock(pReader, pReader->outputCapacity, 0, 0, pBlock->numOfRows - 1);
// return pReader->pColumns;
// }
// }
// }
return NULL;
}
void tsdbResetReadHandle(STsdbReader* pReader, SQueryTableDataCond* pCond, int32_t tWinIdx) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册