提交 aec4297e 编写于 作者: H Hongze Cheng

refact code

上级 ff4fda84
......@@ -42,7 +42,7 @@ typedef struct SMemTable SMemTable;
typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SDataBlk SDataBlk;
typedef struct SSstBlk SSstBlk;
typedef struct SColData SColData;
typedef struct SDiskDataHdr SDiskDataHdr;
......@@ -114,12 +114,12 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
int32_t tPutBlockCol(uint8_t *p, void *ph);
int32_t tGetBlockCol(uint8_t *p, void *ph);
int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SBlock
void tBlockReset(SBlock *pBlock);
int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
// SDataBlk
void tBlockReset(SDataBlk *pBlock);
int32_t tPutDataBlk(uint8_t *p, void *ph);
int32_t tGetDataBlk(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock);
bool tBlockHasSma(SDataBlk *pBlock);
// SSstBlk
int32_t tPutSstBlk(uint8_t *p, void *ph);
int32_t tGetSstBlk(uint8_t *p, void *ph);
......@@ -265,8 +265,8 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData);
int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData);
// SDelFWriter
int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb);
......@@ -427,7 +427,7 @@ struct SSmaInfo {
int32_t size;
};
struct SBlock {
struct SDataBlk {
TSDBKEY minKey;
TSDBKEY maxKey;
int64_t minVer;
......
......@@ -527,7 +527,7 @@ typedef struct SFSNextRowIter {
SMapData blockMap;
int32_t nBlock;
int32_t iBlock;
SBlock block;
SDataBlk block;
SBlockData blockData;
SBlockData *pBlockData;
int32_t nRow;
......@@ -602,13 +602,13 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) {
}
case SFSNEXTROW_BLOCKDATA:
if (state->iBlock >= 0) {
SBlock block = {0};
SDataBlk block = {0};
tBlockReset(&block);
// tBlockDataReset(&state->blockData);
tBlockDataReset(state->pBlockData);
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetBlock);
tMapDataGetItemByIdx(&state->blockMap, state->iBlock, &block, tGetDataBlk);
/* code = tsdbReadBlockData(state->pDataFReader, &state->blockIdx, &block, &state->blockData, NULL, NULL); */
tBlockDataReset(state->pBlockData);
code = tBlockDataInit(state->pBlockData, state->suid, state->uid, state->pTSchema);
......
......@@ -64,7 +64,7 @@ typedef struct {
SArray *aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx *pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
SMapData mBlock; // SMapData<SDataBlk>
SBlockData bData;
} dReader;
struct {
......@@ -78,7 +78,7 @@ typedef struct {
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aSstBlk; // SArray<SSstBlk>
SMapData mBlock; // SMapData<SBlock>
SMapData mBlock; // SMapData<SDataBlk>
SBlockData bData;
SBlockData bDatal;
} dWriter;
......@@ -562,7 +562,7 @@ _err:
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block;
SDataBlk block;
ASSERT(pBlockData->nRow > 0);
......@@ -597,8 +597,8 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0);
if (code) goto _err;
// put SBlock
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutBlock);
// put SDataBlk
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk);
if (code) goto _err;
// clear
......@@ -1098,7 +1098,7 @@ _exit:
return code;
}
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) {
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
......@@ -1122,7 +1122,7 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
if (tsdbKeyCmprFn(&tKey, &pBlock->minKey) >= 0) pRowInfo = NULL;
if (tsdbKeyCmprFn(&tKey, &pDataBlk->minKey) >= 0) pRowInfo = NULL;
}
}
......@@ -1144,14 +1144,14 @@ _err:
return code;
}
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) {
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) {
int32_t code = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData *pBDataR = &pCommitter->dReader.bData;
SBlockData *pBDataW = &pCommitter->dWriter.bData;
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBDataR);
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pDataBlk, pBDataR);
if (code) goto _err;
tBlockDataClear(pBDataW);
......@@ -1188,7 +1188,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
if (tsdbKeyCmprFn(&tKey, &pBlock->maxKey) > 0) pRowInfo = NULL;
if (tsdbKeyCmprFn(&tKey, &pDataBlk->maxKey) > 0) pRowInfo = NULL;
}
}
} else {
......@@ -1237,57 +1237,57 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, TABLEID id) {
ASSERT(pBlockIdx == NULL || tTABLEIDCmprFn(pBlockIdx, &id) >= 0);
if (pBlockIdx && pBlockIdx->suid == id.suid && pBlockIdx->uid == id.uid) {
int32_t iBlock = 0;
SBlock block;
SBlock *pBlock = &block;
SDataBlk block;
SDataBlk *pDataBlk = &block;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
ASSERT(pRowInfo->suid == id.suid && pRowInfo->uid == id.uid);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
while (pBlock && pRowInfo) {
SBlock tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
int32_t c = tBlockCmprFn(pBlock, &tBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
while (pDataBlk && pRowInfo) {
SDataBlk tBlock = {.minKey = TSDBROW_KEY(&pRowInfo->row), .maxKey = TSDBROW_KEY(&pRowInfo->row)};
int32_t c = tBlockCmprFn(pDataBlk, &tBlock);
if (c < 0) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
} else {
pBlock = NULL;
pDataBlk = NULL;
}
} else if (c > 0) {
code = tsdbCommitAheadBlock(pCommitter, pBlock);
code = tsdbCommitAheadBlock(pCommitter, pDataBlk);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
} else {
code = tsdbCommitMergeBlock(pCommitter, pBlock);
code = tsdbCommitMergeBlock(pCommitter, pDataBlk);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
} else {
pBlock = NULL;
pDataBlk = NULL;
}
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) pRowInfo = NULL;
}
}
while (pBlock) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
while (pDataBlk) {
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pDataBlk, tPutDataBlk);
if (code) goto _err;
iBlock++;
if (iBlock < pCommitter->dReader.mBlock.nItem) {
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pDataBlk, tGetDataBlk);
} else {
pBlock = NULL;
pDataBlk = NULL;
}
}
......
......@@ -109,7 +109,7 @@ typedef struct SDataBlockIter {
int32_t index;
SArray* blockList; // SArray<SFileDataBlockInfo>
int32_t order;
SBlock block; // current SBlock data
SDataBlk block; // current SDataBlk data
SHashObj* pTableMap;
} SDataBlockIter;
......@@ -590,8 +590,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
sizeInDisk += pScanInfo->mapData.nData;
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
SBlock block = {0};
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetBlock);
SDataBlk block = {0};
tMapDataGetItemByIdx(&pScanInfo->mapData, j, &block, tGetDataBlk);
// 1. time range check
if (block.minKey.ts > pReader->window.ekey || block.maxKey.ts < pReader->window.skey) {
......@@ -665,7 +665,7 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
return pBlockInfo;
}
static SBlock* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo) {
SReaderStatus* pStatus = &pReader->status;
......@@ -673,7 +673,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, STableBlockScanIn
SBlockData* pBlockData = &pStatus->fileBlockData;
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(pBlockIter);
SBlock* pBlock = getCurrentBlock(pBlockIter);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SSDataBlock* pResBlock = pReader->pResBlock;
int32_t numOfOutputCols = blockDataGetNumOfCols(pResBlock);
......@@ -758,8 +758,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
ASSERT(pBlockInfo != NULL);
SBlock* pBlock = getCurrentBlock(pBlockIter);
int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
int32_t code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData);
if (code != TSDB_CODE_SUCCESS) {
tsdbError("%p error occurs in loading file block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, code:%s %s",
......@@ -836,7 +836,7 @@ static int32_t doSetCurrentBlock(SDataBlockIter* pBlockIter) {
if (pBlockInfo != NULL) {
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
int32_t* mapDataIndex = taosArrayGet(pScanInfo->pBlockList, pBlockInfo->tbBlockIdx);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetBlock);
tMapDataGetItemByIdx(&pScanInfo->mapData, *mapDataIndex, &pBlockIter->block, tGetDataBlk);
}
#if 0
......@@ -887,12 +887,12 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
}
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
SBlock block = {0};
SDataBlk block = {0};
for (int32_t k = 0; k < num; ++k) {
SBlockOrderWrapper wrapper = {0};
int32_t* mapDataIndex = taosArrayGet(pTableScanInfo->pBlockList, k);
tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetBlock);
tMapDataGetItemByIdx(&pTableScanInfo->mapData, *mapDataIndex, &block, tGetDataBlk);
wrapper.uid = pTableScanInfo->uid;
wrapper.offset = block.aSubBlock[0].offset;
......@@ -981,15 +981,15 @@ static bool blockIteratorNext(SDataBlockIter* pBlockIter) {
/**
* This is an two rectangles overlap cases.
*/
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SBlock* pBlock) {
static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* pVerRange, SDataBlk* pBlock) {
return (pWindow->ekey < pBlock->maxKey.ts && pWindow->ekey >= pBlock->minKey.ts) ||
(pWindow->skey > pBlock->minKey.ts && pWindow->skey <= pBlock->maxKey.ts) ||
(pVerRange->minVer > pBlock->minVer && pVerRange->minVer <= pBlock->maxVer) ||
(pVerRange->maxVer < pBlock->maxVer && pVerRange->maxVer >= pBlock->minVer);
}
static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
int32_t* nextIndex, int32_t order) {
static SDataBlk* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
int32_t* nextIndex, int32_t order) {
bool asc = ASCENDING_TRAVERSE(order);
if (asc && pFBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockList) - 1) {
return NULL;
......@@ -1002,10 +1002,10 @@ static SBlock* getNeighborBlockOfSameTable(SFileDataBlockInfo* pFBlockInfo, STab
int32_t step = asc ? 1 : -1;
*nextIndex = pFBlockInfo->tbBlockIdx + step;
SBlock* pBlock = taosMemoryCalloc(1, sizeof(SBlock));
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
SDataBlk* pBlock = taosMemoryCalloc(1, sizeof(SDataBlk));
int32_t* indexInMapdata = taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, *indexInMapdata, pBlock, tGetDataBlk);
return pBlock;
}
......@@ -1048,7 +1048,7 @@ static int32_t setFileBlockActiveInBlockIter(SDataBlockIter* pBlockIter, int32_t
return TSDB_CODE_SUCCESS;
}
static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t order) {
static bool overlapWithNeighborBlock(SDataBlk* pBlock, SDataBlk* pNeighbor, int32_t order) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
return pBlock->maxKey.ts == pNeighbor->minKey.ts;
......@@ -1057,19 +1057,19 @@ static bool overlapWithNeighborBlock(SBlock* pBlock, SBlock* pNeighbor, int32_t
}
}
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SBlock* pBlock) {
static bool bufferDataInFileBlockGap(int32_t order, TSDBKEY key, SDataBlk* pBlock) {
bool ascScan = ASCENDING_TRAVERSE(order);
return (ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts <= pBlock->minKey.ts)) ||
(!ascScan && (key.ts != TSKEY_INITIAL_VAL && key.ts >= pBlock->maxKey.ts));
}
static bool keyOverlapFileBlock(TSDBKEY key, SBlock* pBlock, SVersionRange* pVerRange) {
static bool keyOverlapFileBlock(TSDBKEY key, SDataBlk* pBlock, SVersionRange* pVerRange) {
return (key.ts >= pBlock->minKey.ts && key.ts <= pBlock->maxKey.ts) && (pBlock->maxVer >= pVerRange->minVer) &&
(pBlock->minVer <= pVerRange->maxVer);
}
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock) {
static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock) {
size_t num = taosArrayGetSize(pBlockScanInfo->delSkyline);
for (int32_t i = pBlockScanInfo->fileDelIndex; i < num; i += 1) {
......@@ -1103,7 +1103,7 @@ static bool doCheckforDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, cons
return false;
}
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBlock* pBlock, int32_t order) {
static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SDataBlk* pBlock, int32_t order) {
if (pBlockScanInfo->delSkyline == NULL) {
return false;
}
......@@ -1138,10 +1138,10 @@ static bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBl
// 3. current timestamp should not be overlap with each other
// 4. output buffer should be large enough to hold all rows in current block
// 5. delete info should not overlap with current block data
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SBlock* pBlock,
static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBlock, SDataBlk* pBlock,
STableBlockScanInfo* pScanInfo, TSDBKEY key, SLastBlockReader* pLastBlockReader) {
int32_t neighborIndex = 0;
SBlock* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
int32_t neighborIndex = 0;
SDataBlk* pNeighbor = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &neighborIndex, pReader->order);
// overlap with neighbor
bool overlapWithNeighbor = false;
......@@ -1948,7 +1948,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pDumpInfo->rowIndex += step;
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
if (pDumpInfo->rowIndex >= pBlock->nRow || pDumpInfo->rowIndex < 0) {
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
......@@ -1967,7 +1967,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// currently loaded file data block is consumed
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->order);
break;
}
......@@ -2320,8 +2320,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
}
static int32_t doBuildDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SBlock* pBlock = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SDataBlk* pBlock = NULL;
SReaderStatus* pStatus = &pReader->status;
SDataBlockIter* pBlockIter = &pStatus->blockIter;
......@@ -2418,7 +2418,7 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
// set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SBlock* pBlock = getCurrentBlock(pBlockIter);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
SReaderStatus* pStatus = &pReader->status;
......@@ -2789,7 +2789,7 @@ typedef enum {
CHECK_FILEBLOCK_QUIT = 0x2,
} CHECK_FILEBLOCK_STATE;
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SBlock* pBlock,
static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, SDataBlk* pBlock,
SFileDataBlockInfo* pFBlock, SRowMerger* pMerger, int64_t key,
CHECK_FILEBLOCK_STATE* state) {
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -2798,8 +2798,8 @@ static int32_t checkForNeighborFileBlock(STsdbReader* pReader, STableBlockScanIn
*state = CHECK_FILEBLOCK_QUIT;
int32_t step = ASCENDING_TRAVERSE(pReader->order) ? 1 : -1;
int32_t nextIndex = -1;
SBlock* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
int32_t nextIndex = -1;
SDataBlk* pNeighborBlock = getNeighborBlockOfSameTable(pFBlock, pScanInfo, &nextIndex, pReader->order);
if (pNeighborBlock == NULL) { // do nothing
return 0;
}
......@@ -2863,7 +2863,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
CHECK_FILEBLOCK_STATE st;
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
checkForNeighborFileBlock(pReader, pScanInfo, pCurrentBlock, pFileBlockInfo, pMerger, key, &st);
if (st == CHECK_FILEBLOCK_QUIT) {
break;
......@@ -3447,8 +3447,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter);
SBlock* pBlock = getCurrentBlock(&pReader->status.blockIter);
int64_t stime = taosGetTimestampUs();
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
int64_t stime = taosGetTimestampUs();
SBlockLoadSuppInfo* pSup = &pReader->suppInfo;
......@@ -3629,7 +3629,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
while (true) {
if (hasNext) {
SBlock* pBlock = getCurrentBlock(pBlockIter);
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
int32_t numOfRows = pBlock->nRow;
pTableBlockInfo->totalRows += numOfRows;
......
......@@ -677,9 +677,9 @@ _err:
return code;
}
int32_t tsdbReadBlockSma(SDataFReader *pReader, SBlock *pBlock, SArray *aColumnDataAgg) {
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
int32_t code = 0;
SSmaInfo *pSmaInfo = &pBlock->smaInfo;
SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
ASSERT(pSmaInfo->size > 0);
......@@ -843,13 +843,13 @@ _err:
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBlockData) {
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[0], 0, pBlockData);
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], 0, pBlockData);
if (code) goto _err;
if (pBlock->nSubBlock > 1) {
if (pDataBlk->nSubBlock > 1) {
SBlockData bData1;
SBlockData bData2;
......@@ -863,8 +863,8 @@ int32_t tsdbReadDataBlock(SDataFReader *pReader, SBlock *pBlock, SBlockData *pBl
tBlockDataInitEx(&bData1, pBlockData);
tBlockDataInitEx(&bData2, pBlockData);
for (int32_t iSubBlock = 1; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
code = tsdbReadBlockDataImpl(pReader, &pBlock->aSubBlock[iSubBlock], 0, &bData1);
for (int32_t iSubBlock = 1; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[iSubBlock], 0, &bData1);
if (code) {
tBlockDataDestroy(&bData1, 1);
tBlockDataDestroy(&bData2, 1);
......@@ -1355,28 +1355,28 @@ _err:
return code;
}
static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) {
static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SDataBlk *pDataBlk) {
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
if (iRow == 0) {
if (tsdbKeyCmprFn(&pBlock->minKey, &key) > 0) {
pBlock->minKey = key;
if (tsdbKeyCmprFn(&pDataBlk->minKey, &key) > 0) {
pDataBlk->minKey = key;
}
} else {
if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) {
pBlock->hasDup = 1;
pDataBlk->hasDup = 1;
}
}
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pBlock->maxKey, &key) < 0) {
pBlock->maxKey = key;
if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&pDataBlk->maxKey, &key) < 0) {
pDataBlk->maxKey = key;
}
pBlock->minVer = TMIN(pBlock->minVer, key.version);
pBlock->maxVer = TMAX(pBlock->maxVer, key.version);
pDataBlk->minVer = TMIN(pDataBlk->minVer, key.version);
pDataBlk->maxVer = TMAX(pDataBlk->maxVer, key.version);
}
pBlock->nRow += pBlockData->nRow;
pDataBlk->nRow += pBlockData->nRow;
}
static int32_t tsdbWriteBlockSma(SDataFWriter *pWriter, SBlockData *pBlockData, SSmaInfo *pSmaInfo) {
......
......@@ -33,7 +33,7 @@ struct STsdbSnapReader {
int32_t iBlockIdx;
int32_t iBlockL;
SMapData mBlock; // SMapData<SBlock>
SMapData mBlock; // SMapData<SDataBlk>
int32_t iBlock;
SBlockData oBlockData;
SBlockData nBlockData;
......@@ -115,8 +115,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
// }
} else if (pReader->pBlockIdx) {
while (pReader->iBlock < pReader->mBlock.nItem) {
SBlock block;
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetBlock);
SDataBlk block;
tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk);
if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) {
// load data (todo)
......@@ -426,7 +426,7 @@ struct STsdbSnapWriter {
SArray* aBlockIdx; // SArray<SBlockIdx>
int32_t iBlockIdx;
SBlockIdx* pBlockIdx;
SMapData mBlock; // SMapData<SBlock>
SMapData mBlock; // SMapData<SDataBlk>
int32_t iBlock;
SBlockData* pBlockData;
int32_t iRow;
......@@ -437,11 +437,11 @@ struct STsdbSnapWriter {
SDataFWriter* pDataFWriter;
SBlockIdx* pBlockIdxW; // NULL when no committing table
SBlock blockW;
SDataBlk blockW;
SBlockData bDataW;
SBlockIdx blockIdxW;
SMapData mBlockW; // SMapData<SBlock>
SMapData mBlockW; // SMapData<SDataBlk>
SArray* aBlockIdxW; // SArray<SBlockIdx>
SArray* aBlockLW; // SArray<SSstBlk>
......@@ -475,7 +475,7 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
// &pWriter->blockW, pWriter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
......@@ -499,15 +499,15 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
}
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
SDataBlk block;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
// if (block.last) {
// code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
......@@ -520,13 +520,13 @@ static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) {
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
if (code) goto _err;
pWriter->iBlock++;
}
// SBlock
// SDataBlk
// code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW);
// if (code) goto _err;
......@@ -553,10 +553,10 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
if (code) goto _err;
// SBlockData
SBlock block;
SDataBlk block;
tMapDataReset(&pWriter->mBlockW);
for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) {
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetBlock);
tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk);
// if (block.last) {
// code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL);
......@@ -570,11 +570,11 @@ static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* p
// if (code) goto _err;
// }
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
if (code) goto _err;
}
// SBlock
// SDataBlk
SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid};
code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx);
if (code) goto _err;
......@@ -642,10 +642,10 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
while (true) {
if (pWriter->iBlock >= pWriter->mBlock.nItem) break;
SBlock block;
int32_t c;
SDataBlk block;
int32_t c;
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetBlock);
tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk);
// if (block.last) {
// pWriter->pBlockData = &pWriter->bDataR;
......@@ -668,14 +668,14 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
tBlockDataClear(&pWriter->bDataW);
}
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk);
if (code) goto _err;
pWriter->iBlock++;
......@@ -719,7 +719,7 @@ static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) {
// &pWriter->blockW, pWriter->cmprAlg);
// if (code) goto _err;
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutBlock);
code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk);
if (code) goto _err;
tBlockReset(&pWriter->blockW);
......
......@@ -231,69 +231,69 @@ int32_t tCmprBlockL(void const *lhs, void const *rhs) {
return 0;
}
// SBlock ======================================================
void tBlockReset(SBlock *pBlock) {
*pBlock = (SBlock){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
// SDataBlk ======================================================
void tBlockReset(SDataBlk *pDataBlk) {
*pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN};
}
int32_t tPutBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
n += tPutI64v(p ? p + n : p, pBlock->minKey.version);
n += tPutI64v(p ? p + n : p, pBlock->minKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->maxKey.version);
n += tPutI64v(p ? p + n : p, pBlock->maxKey.ts);
n += tPutI64v(p ? p + n : p, pBlock->minVer);
n += tPutI64v(p ? p + n : p, pBlock->maxVer);
n += tPutI32v(p ? p + n : p, pBlock->nRow);
n += tPutI8(p ? p + n : p, pBlock->hasDup);
n += tPutI8(p ? p + n : p, pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szBlock);
n += tPutI32v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].szKey);
}
if (pBlock->nSubBlock == 1 && !pBlock->hasDup) {
n += tPutI64v(p ? p + n : p, pBlock->smaInfo.offset);
n += tPutI32v(p ? p + n : p, pBlock->smaInfo.size);
int32_t tPutDataBlk(uint8_t *p, void *ph) {
int32_t n = 0;
SDataBlk *pDataBlk = (SDataBlk *)ph;
n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version);
n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts);
n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version);
n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts);
n += tPutI64v(p ? p + n : p, pDataBlk->minVer);
n += tPutI64v(p ? p + n : p, pDataBlk->maxVer);
n += tPutI32v(p ? p + n : p, pDataBlk->nRow);
n += tPutI8(p ? p + n : p, pDataBlk->hasDup);
n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset);
n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock);
n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey);
}
if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset);
n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size);
}
return n;
}
int32_t tGetBlock(uint8_t *p, void *ph) {
int32_t n = 0;
SBlock *pBlock = (SBlock *)ph;
n += tGetI64v(p + n, &pBlock->minKey.version);
n += tGetI64v(p + n, &pBlock->minKey.ts);
n += tGetI64v(p + n, &pBlock->maxKey.version);
n += tGetI64v(p + n, &pBlock->maxKey.ts);
n += tGetI64v(p + n, &pBlock->minVer);
n += tGetI64v(p + n, &pBlock->maxVer);
n += tGetI32v(p + n, &pBlock->nRow);
n += tGetI8(p + n, &pBlock->hasDup);
n += tGetI8(p + n, &pBlock->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) {
n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szBlock);
n += tGetI32v(p + n, &pBlock->aSubBlock[iSubBlock].szKey);
}
if (pBlock->nSubBlock == 1 && !pBlock->hasDup) {
n += tGetI64v(p + n, &pBlock->smaInfo.offset);
n += tGetI32v(p + n, &pBlock->smaInfo.size);
int32_t tGetDataBlk(uint8_t *p, void *ph) {
int32_t n = 0;
SDataBlk *pDataBlk = (SDataBlk *)ph;
n += tGetI64v(p + n, &pDataBlk->minKey.version);
n += tGetI64v(p + n, &pDataBlk->minKey.ts);
n += tGetI64v(p + n, &pDataBlk->maxKey.version);
n += tGetI64v(p + n, &pDataBlk->maxKey.ts);
n += tGetI64v(p + n, &pDataBlk->minVer);
n += tGetI64v(p + n, &pDataBlk->maxVer);
n += tGetI32v(p + n, &pDataBlk->nRow);
n += tGetI8(p + n, &pDataBlk->hasDup);
n += tGetI8(p + n, &pDataBlk->nSubBlock);
for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) {
n += tGetI64v(p + n, &pDataBlk->aSubBlock[iSubBlock].offset);
n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szBlock);
n += tGetI32v(p + n, &pDataBlk->aSubBlock[iSubBlock].szKey);
}
if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) {
n += tGetI64v(p + n, &pDataBlk->smaInfo.offset);
n += tGetI32v(p + n, &pDataBlk->smaInfo.size);
} else {
pBlock->smaInfo.offset = 0;
pBlock->smaInfo.size = 0;
pDataBlk->smaInfo.offset = 0;
pDataBlk->smaInfo.size = 0;
}
return n;
}
int32_t tBlockCmprFn(const void *p1, const void *p2) {
SBlock *pBlock1 = (SBlock *)p1;
SBlock *pBlock2 = (SBlock *)p2;
SDataBlk *pBlock1 = (SDataBlk *)p1;
SDataBlk *pBlock2 = (SDataBlk *)p2;
if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) {
return -1;
......@@ -304,11 +304,11 @@ int32_t tBlockCmprFn(const void *p1, const void *p2) {
return 0;
}
bool tBlockHasSma(SBlock *pBlock) {
if (pBlock->nSubBlock > 1) return false;
if (pBlock->hasDup) return false;
bool tBlockHasSma(SDataBlk *pDataBlk) {
if (pDataBlk->nSubBlock > 1) return false;
if (pDataBlk->hasDup) return false;
return pBlock->smaInfo.size > 0;
return pDataBlk->smaInfo.size > 0;
}
// SSstBlk ======================================================
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册