提交 7762e0ea 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 91f9b58f
......@@ -180,6 +180,12 @@ typedef struct STsdbReaderAttr {
SVersionRange verRange;
} STsdbReaderAttr;
typedef struct SResultBlockInfo {
SSDataBlock* pResBlock;
bool freeBlock;
int64_t capacity;
} SResultBlockInfo;
struct STsdbReader {
STsdb* pTsdb;
SVersionRange verRange;
......@@ -187,12 +193,10 @@ struct STsdbReader {
bool suspended;
uint64_t suid;
int16_t order;
bool freeBlock;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock;
int32_t capacity;
SResultBlockInfo resBlockInfo;
SReaderStatus status;
char* idStr; // query info handle, for debug purpose
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
......@@ -205,7 +209,7 @@ struct STsdbReader {
SDelFReader* pDelFReader; // the del file reader
SArray* pDelIdx; // del file block index;
SBlockInfoBuf blockInfoBuf;
int32_t step;
EContentData step;
STsdbReader* innerReader[2];
};
......@@ -727,6 +731,21 @@ void tsdbReleaseDataBlock(STsdbReader* pReader) {
}
}
static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacity, SSDataBlock* pResBlock, SQueryTableDataCond* pCond) {
pResBlockInfo->capacity = capacity;
pResBlockInfo->pResBlock = pResBlock;
terrno = 0;
if (pResBlockInfo->pResBlock == NULL) {
pResBlockInfo->freeBlock = true;
pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity);
} else {
pResBlockInfo->freeBlock = false;
}
return terrno;
}
static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, int32_t capacity,
SSDataBlock* pResBlock, const char* idstr) {
int32_t code = 0;
......@@ -746,21 +765,16 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows.skey, pVnode->config.tsdbCfg.retentions, idstr, &level);
pReader->suid = pCond->suid;
pReader->order = pCond->order;
pReader->capacity = capacity;
pReader->pResBlock = pResBlock;
pReader->idStr = (idstr != NULL) ? taosStrdup(idstr) : NULL;
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
pReader->type = pCond->type;
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
if (pReader->pResBlock == NULL) {
pReader->freeBlock = true;
pReader->pResBlock = createResBlock(pCond, pReader->capacity);
if (pReader->pResBlock == NULL) {
code = terrno;
goto _end;
}
code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
}
if (pCond->numOfCols <= 0) {
......@@ -792,7 +806,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
goto _end;
}
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->pResBlock->pDataBlock, pSup->slotId[0]);
pReader->status.pPrimaryTsCol = taosArrayGet(pReader->resBlockInfo.pResBlock->pDataBlock, pSup->slotId[0]);
int32_t type = pReader->status.pPrimaryTsCol->info.type;
if (type != TSDB_DATA_TYPE_TIMESTAMP) {
tsdbError("the first column isn't primary timestamp in result block, actual: %s, %s", tDataTypes[type].name,
......@@ -1221,7 +1235,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
int32_t numOfOutputCols = pSupInfo->numOfCols;
int32_t code = TSDB_CODE_SUCCESS;
......@@ -1269,8 +1283,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
endIndex += step;
int32_t dumpedRows = asc ? (endIndex - pDumpInfo->rowIndex) : (pDumpInfo->rowIndex - endIndex);
if (dumpedRows > pReader->capacity) { // output buffer check
dumpedRows = pReader->capacity;
if (dumpedRows > pReader->resBlockInfo.capacity) { // output buffer check
dumpedRows = pReader->resBlockInfo.capacity;
}
int32_t i = 0;
......@@ -1785,7 +1799,7 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
pInfo->overlapWithLastBlock = !(pBlock->maxKey.ts < tsLast || pBlock->minKey.ts > tsLast);
}
pInfo->moreThanCapcity = pBlock->nRow > pReader->capacity;
pInfo->moreThanCapcity = pBlock->nRow > pReader->resBlockInfo.capacity;
pInfo->partiallyRequired = dataBlockPartiallyRequired(&pReader->window, &pReader->verRange, pBlock);
pInfo->overlapWithKeyInBuf = keyOverlapFileBlock(keyInBuf, pBlock, &pReader->verRange);
}
......@@ -1832,10 +1846,10 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pBlock = pReader->pResBlock;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int64_t st = taosGetTimestampUs();
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->capacity, pReader);
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]);
pBlock->info.id.uid = pBlockScanInfo->uid;
......@@ -1866,7 +1880,7 @@ static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pB
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
if (nextKey != key) { // merge is not needed
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
if (code) {
return code;
}
......@@ -1913,7 +1927,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
if (hasVal) {
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
if (next1 != ts) {
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) {
return code;
}
......@@ -1922,7 +1936,7 @@ static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLas
return code;
}
} else {
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
code = doAppendRowFromFileBlock(pReader->resBlockInfo.pResBlock, pReader, fRow->pBlockData, fRow->iRow);
if (code) {
return code;
}
......@@ -2120,7 +2134,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2170,7 +2184,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2197,7 +2211,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2257,7 +2271,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2475,7 +2489,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2658,7 +2672,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
return code;
}
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
code = doAppendRowFromTSRow(pReader->resBlockInfo.pResBlock, pReader, pTSRow, pBlockScanInfo);
taosMemoryFree(pTSRow);
tsdbRowMergerClear(pMerger);
......@@ -2740,7 +2754,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
}
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1;
......@@ -2755,7 +2769,7 @@ static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlock
static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
......@@ -2777,7 +2791,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// it is a clean block, load it directly
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
pBlock->nRow <= pReader->capacity) {
pBlock->nRow <= pReader->resBlockInfo.capacity) {
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
......@@ -3064,7 +3078,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
while (1) {
// load the last data block of current table
......@@ -3098,7 +3112,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) {
if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
break;
}
}
......@@ -3164,7 +3178,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
SBlockData* pBData = &pReader->status.fileBlockData;
tBlockDataReset(pBData);
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
tsdbDebug("load data in last block firstly, due to desc scan data, %s", pReader->idStr);
int64_t st = taosGetTimestampUs();
......@@ -3182,7 +3196,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
if (pResBlock->info.rows >= pReader->capacity) {
if (pResBlock->info.rows >= pReader->resBlockInfo.capacity) {
break;
}
}
......@@ -3197,7 +3211,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
pResBlock->info.rows, el, pReader->idStr);
}
} else { // whole block is required, return it directly
SDataBlockInfo* pInfo = &pReader->pResBlock->info;
SDataBlockInfo* pInfo = &pReader->resBlockInfo.pResBlock->info;
pInfo->rows = pBlock->nRow;
pInfo->id.uid = pScanInfo->uid;
pInfo->dataLoad = 0;
......@@ -3373,7 +3387,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -3456,7 +3470,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -3481,7 +3495,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
......@@ -3534,7 +3548,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
return code;
}
if (pReader->pResBlock->info.rows > 0) {
if (pReader->resBlockInfo.pResBlock->info.rows > 0) {
return TSDB_CODE_SUCCESS;
}
}
......@@ -4173,7 +4187,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
STsdbReader* pReader) {
SSDataBlock* pBlock = pReader->pResBlock;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t code = TSDB_CODE_SUCCESS;
do {
......@@ -4281,7 +4295,7 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
return metaGetIvtIdx(pMeta);
}
uint64_t getReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
uint64_t tsdbGetReaderMaxVersion(STsdbReader* pReader) { return pReader->verRange.maxVer; }
static int32_t doOpenReaderImpl(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
......@@ -4484,8 +4498,8 @@ void tsdbReaderClose(STsdbReader* pReader) {
}
}
if (pReader->freeBlock) {
pReader->pResBlock = blockDataDestroy(pReader->pResBlock);
if (pReader->resBlockInfo.freeBlock) {
pReader->resBlockInfo.pResBlock = blockDataDestroy(pReader->resBlockInfo.pResBlock);
}
taosMemoryFree(pSupInfo->colId);
......@@ -4622,7 +4636,7 @@ int32_t tsdbReaderSuspend(STsdbReader* pReader) {
pBlockScanInfo = pStatus->pTableIter == NULL ? NULL : *pStatus->pTableIter;
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->pResBlock->info.window;
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastKey = ASCENDING_TRAVERSE(pReader->order) ? w.ekey : w.skey;
// reset current current table's data block scan info,
......@@ -4707,10 +4721,10 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
STsdbReader* pNextReader = pReader->innerReader[1];
// we need only one row
pPrevReader->capacity = 1;
pPrevReader->resBlockInfo.capacity = 1;
setSharedPtr(pPrevReader, pReader);
pNextReader->capacity = 1;
pNextReader->resBlockInfo.capacity = 1;
setSharedPtr(pNextReader, pReader);
code = doOpenReaderImpl(pPrevReader);
......@@ -4733,7 +4747,7 @@ _err:
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pReader->status.loadFromFile == false) {
return false;
......@@ -4762,7 +4776,7 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool* hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
// cleanup the data that belongs to the previous data block
SSDataBlock* pBlock = pReader->pResBlock;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
blockDataCleanup(pBlock);
*hasNext = false;
......@@ -4947,7 +4961,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
if (pReader->pResBlock->info.id.uid != pFBlock->uid) {
if (pReader->resBlockInfo.pResBlock->info.id.uid != pFBlock->uid) {
return TSDB_CODE_SUCCESS;
}
......@@ -4973,8 +4987,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
pTsAgg->numOfNull = 0;
pTsAgg->colId = PRIMARYKEY_TIMESTAMP_COL_ID;
pTsAgg->min = pReader->pResBlock->info.window.skey;
pTsAgg->max = pReader->pResBlock->info.window.ekey;
pTsAgg->min = pReader->resBlockInfo.pResBlock->info.window.skey;
pTsAgg->max = pReader->resBlockInfo.pResBlock->info.window.ekey;
// update the number of NULL data rows
size_t numOfCols = pSup->numOfCols;
......@@ -4985,7 +4999,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
taosArrayEnsureCap(pSup->pColAgg, colsNum);
}
SSDataBlock* pResBlock = pReader->pResBlock;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
if (pResBlock->pBlockAgg == NULL) {
size_t num = taosArrayGetSize(pResBlock->pDataBlock);
pResBlock->pBlockAgg = taosMemoryCalloc(num, POINTER_BYTES);
......@@ -5056,7 +5070,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return NULL;
}
return pReader->pResBlock;
return pReader->resBlockInfo.pResBlock;
}
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
......@@ -5071,7 +5085,7 @@ SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
SReaderStatus* pStatus = &pTReader->status;
if (pStatus->composedDataBlock) {
return pTReader->pResBlock;
return pTReader->resBlockInfo.pResBlock;
}
SSDataBlock* ret = doRetrieveDataBlock(pTReader);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册