提交 1baacb3c 编写于 作者: H Hongze Cheng

Merge branch 'feat/tsdb_refact' of https://github.com/taosdata/TDengine into feat/tsdb_refact

......@@ -118,6 +118,11 @@ typedef struct SFileBlockDumpInfo {
int64_t lastKey;
} SFileBlockDumpInfo;
typedef struct SVersionRange {
uint64_t minVer;
uint64_t maxVer;
} SVersionRange;
typedef struct SComposedDataBlock {
bool composed;
int32_t rows;
......@@ -154,8 +159,7 @@ struct STsdbReader {
STSchema* pSchema;
SDataFReader* pFileReader;
int64_t startVersion;
int64_t endVersion;
SVersionRange verRange;
#if 0
SFileBlockInfo* pDataBlockInfo;
SDataCols* pDataCols; // in order to hold current file data block
......@@ -413,8 +417,8 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->order = pCond->order;
pReader->capacity = 4096;
pReader->idStr = strdup(idstr);
pReader->startVersion= pCond->startVersion;
pReader->endVersion = 100000;//pCond->endVersion; // todo for test purpose
pReader->verRange.minVer= pCond->startVersion;
pReader->verRange.maxVer = 100000;//pCond->endVersion; // todo for test purpose
pReader->type = pCond->type;
pReader->window = *pCond->twindows;
......@@ -954,6 +958,12 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, uint32_
return TSDB_CODE_SUCCESS;
}
static void setBlockDumpCompleted(SFileBlockDumpInfo* pDumpInfo, SBlockData* pBlockData) {
pDumpInfo->rowIndex = pBlockData->nRow;
pDumpInfo->totalRows = pBlockData->nRow;
pDumpInfo->lastKey = pBlockData->aTSKEY[pBlockData->nRow - 1] + 1; // todo step value
}
static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData) {
int64_t st = taosGetTimestampUs();
......@@ -966,6 +976,19 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
goto _error;
}
SColVal cv = {0};
for(int32_t i = 0; i < taosArrayGetSize(pReader->pResBlock->pDataBlock); ++i) {
SColData* pData = (SColData*)taosArrayGet(pBlockData->aColDataP, pReader->suppInfo.slotIds[i]);
SColumnInfoData* pColData = taosArrayGet(pReader->pResBlock->pDataBlock, i);
for(int32_t j = 0; j < pBlockData->nRow; ++j) {
tColDataGetValue(pData, j, &cv);
colDataAppend(pColData, j, (const char*)&cv.value, cv.isNull);
}
}
pReader->pResBlock->info.rows = pBlockData->nRow;
setBlockDumpCompleted(&pReader->status.fBlockDumpInfo, pBlockData);
/*
int32_t ret = tsdbLoadBlockDataCols(&(pReader->rhelper), pBlock, pCheckInfo->pCompInfo, colIds,
(int)(QH_GET_NUM_OF_COLS(pReader)), true);
......@@ -2081,7 +2104,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
return TSDB_CODE_SUCCESS;
}
static bool blockIteratorNext(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
if (pBlockIter->index >= pBlockIter->numOfBlocks - 1) {
return false;
}
......@@ -2252,6 +2275,15 @@ static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock)
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
}
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts || key.ts <= pBlock->maxKey.ts) /*&& (pBlock->maxVersion >= pVerRange->minVer) && (pBlock->minVersion <= pVerRange->maxVer)*/;
}
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo *pFBlock, SBlock* pBlock, STableBlockScanInfo *pScanInfo, TSDBKEY key) {
return (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) ||
keyOverlapFileBlock(key, pBlock, &pReader->verRange));
}
static int32_t buildInmemDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBKEY *key) {
if (pBlockScanInfo->iter != NULL) {
pBlockScanInfo->memHasVal = tsdbTbDataIterNext(pBlockScanInfo->iter);
......@@ -2446,7 +2478,7 @@ static int32_t initMemIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader*
}
int32_t code = TSDB_CODE_SUCCESS;
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->startVersion};
TSDBKEY startKey = {.ts = pReader->window.skey, .version = pReader->verRange.minVer};
STbData* d = NULL;
if (pReader->pTsdb->mem != NULL) {
......@@ -2498,6 +2530,39 @@ static TSDBKEY getCurrentKeyInBuf(SDataBlockIter* pBlockIter, STsdbReader* pRead
return key;
}
static int32_t moveToNextFile(STsdbReader* pReader, int32_t* numOfBlocks) {
SReaderStatus* pStatus = &pReader->status;
while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader);
if (!hasNext) { // no data files on disk
break;
}
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(pIndexList) > 0) {
uint32_t numOfValidTable = 0;
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (numOfValidTable > 0) {
break;
}
}
// no blocks in current file, try next files
}
return TSDB_CODE_SUCCESS;
}
static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
SReaderStatus* pStatus = &pReader->status;
SFileSetIter* pFIter = &pStatus->fileIter;
......@@ -2507,35 +2572,13 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
if (pFIter->index < pFIter->numOfFiles) {
if (pReader->status.blockIter.index == -1) {
int32_t numOfBlocks = 0;
while (1) {
bool hasNext = filesetIteratorNext(&pStatus->fileIter, pReader->order, pReader);
if (!hasNext) { // no data files on disk
break;
}
SArray* pIndexList = taosArrayInit(4, sizeof(SBlockIdx));
int32_t code = doLoadBlockIndex(pReader, pReader->pFileReader, pIndexList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(pIndexList) > 0) {
uint32_t numOfValidTable = 0;
code = doLoadFileBlock(pReader, pIndexList, &numOfValidTable, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (numOfValidTable > 0) {
break;
}
}
// no blocks in current file, try next files
int32_t code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -2546,14 +2589,15 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (dataBlockPartialRequired(&pReader->window, pBlock) || overlapWithNeighborBlock(pFBlock, pBlock, pScanInfo) /*|| points overlaps with data block*/) {
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
......@@ -2567,8 +2611,8 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
*exists = true;
}
*exists = true;
} else {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
......@@ -2577,14 +2621,53 @@ static int32_t loadDataInFiles(STsdbReader* pReader, bool* exists) {
// current block are exhausted, try the next file block
if (pDumpInfo->rowIndex >= pBlock->nRow) {
bool hasNext = blockIteratorNext(pReader, &pReader->status.blockIter);
if (!hasNext) {
// current file is exhausted, let's try the next file
bool hasNext = blockIteratorNext(&pReader->status.blockIter);
if (!hasNext) { // current file is exhausted, let's try the next file
int32_t numOfBlocks = 0;
int32_t code = moveToNextFile(pReader, &numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// initialize the block iterator for a new fileset
code = initBlockIterator(pReader, pBlockIter, numOfBlocks);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pFBlock = getCurrentBlockInfo(pBlockIter);
pScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
pBlock = taosArrayGet(pScanInfo->pBlockList, pFBlock->tbBlockIdx);
TSDBKEY key = getCurrentKeyInBuf(pBlockIter, pReader);
if (fileBlockShouldLoad(pReader, pFBlock, pBlock, pScanInfo, key)) {
SBlockData data = {0};
doLoadFileBlockData(pReader, pBlockIter, pScanInfo, &data);
// build composed data block
buildComposedDataBlock(pReader, pFBlock, pBlock, pScanInfo);
} else if (bufferDataInFileBlockGap(pReader->order, key, pBlock)) {
// data in memory that are earlier than current file block
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pScanInfo, &maxKey);
// build data block from in-memory buffer data completed.
} else { // whole block is required, return it directly
// todo
// 1. the version of all rows should be less than the endVersion
// 2. current block should not overlap with next neighbor block
// 3. current timestamp should not be overlap with each other
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->uid = pScanInfo->uid;
pInfo->window.skey = pBlock->minKey.ts;
pInfo->window.ekey = pBlock->maxKey.ts;
setComposedBlockFlag(pReader, false);
}
*exists = true;
} else { // try next data block in current file
// 1. check if ts in buffer is overlap with current file data block
TSDBKEY key1 = getCurrentKeyInBuf(pBlockIter, pReader);
blockIteratorNext(pBlockIter);
}
} else {
......@@ -2646,7 +2729,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
return NULL;
}
if (key.version <= pReader->endVersion) {
if (key.version <= pReader->verRange.maxVer) {
return pRow;
}
......@@ -2664,7 +2747,7 @@ TSDBROW* getValidRow(STbDataIter* pIter, bool* hasVal, STsdbReader* pReader) {
return NULL;
}
if (key.version <= pReader->endVersion) {
if (key.version <= pReader->verRange.maxVer) {
return pRow;
}
}
......@@ -2699,7 +2782,7 @@ int32_t doLoadRowsOfIdenticalTsInFileBlock(SFileDataBlockInfo* pFBlock, SBlock*
int32_t rowIndex = pDumpInfo->rowIndex + 1;
while (pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pReader->endVersion) {
if (pBlockData->aVersion[rowIndex] > pReader->verRange.maxVer) {
continue;
}
......@@ -3217,7 +3300,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
initMemIterator(pBlockScanInfo, pReader);
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->endVersion};
TSDBKEY maxKey = {.ts = pReader->window.ekey, .version = pReader->verRange.maxVer};
buildInmemDataBlock(pReader, pBlockScanInfo, &maxKey);
if (pReader->pResBlock->info.rows > 0) {
return true;
......@@ -3354,16 +3437,16 @@ int32_t tsdbRetrieveDataBlockStatisInfo(STsdbReader* pReader, SColumnDataAgg***
}
SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
if (pReader->status.composedDataBlock) {
SReaderStatus* pStatus = &pReader->status;
if (pStatus->composedDataBlock) {
return pReader->pResBlock->pDataBlock;
} else {
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pStatus->blockIter);
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pFBlock->uid, sizeof(pFBlock->uid));
int32_t code = tBlockDataInit(&pReader->status.fileBlockData);
doLoadFileBlockData(pReader, &pReader->status.blockIter, pBlockScanInfo, &pReader->status.fileBlockData);
// TSDBROW row = tsdbRowFromBlockData(&pReader->status.fileBlockData, 0);
// doAppendOneRow(pReader->pResBlock, pReader, row.);
int32_t code = tBlockDataInit(&pStatus->fileBlockData);
doLoadFileBlockData(pReader, &pStatus->blockIter, pBlockScanInfo, &pStatus->fileBlockData);
return pReader->pResBlock->pDataBlock;
}
......
......@@ -187,6 +187,10 @@ int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tab
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo->schemaVer.sw == NULL) {
return TSDB_CODE_SUCCESS;
}
*sversion = pTaskInfo->schemaVer.sw->version;
*tversion = pTaskInfo->schemaVer.tversion;
if (pTaskInfo->schemaVer.dbname) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册