提交 5e2a4986 编写于 作者: H Hongze Cheng

refact: prepare last refact

上级 b62dae95
......@@ -43,6 +43,7 @@ typedef struct STbDataIter STbDataIter;
typedef struct SMapData SMapData;
typedef struct SBlockIdx SBlockIdx;
typedef struct SBlock SBlock;
typedef struct SBlockL SBlockL;
typedef struct SColData SColData;
typedef struct SBlockDataHdr SBlockDataHdr;
typedef struct SBlockData SBlockData;
......@@ -414,6 +415,29 @@ struct SBlock {
SSubBlock aSubBlock[TSDB_MAX_SUBBLOCKS];
};
struct SBlockL {
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} minKey;
struct {
int64_t uid;
int64_t version;
TSKEY ts;
} maxKey;
int64_t minVer;
int64_t maxVer;
int32_t nRow;
int8_t cmprAlg;
int64_t offset;
int32_t szBlock;
int32_t szBlockCol;
int32_t szUid;
int32_t szVer;
int32_t szTSKEY;
};
struct SColData {
int16_t cid;
int8_t type;
......
......@@ -36,14 +36,18 @@ typedef struct {
TSKEY minKey;
TSKEY maxKey;
// commit file data
struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData oBlockMap; // SMapData<SBlock>, read from reader
SBlockData oBlockData;
SMapData mBlock; // SMapData<SBlock>, read from reader
SBlockData bData;
} dReader;
struct {
SDataFWriter *pWriter;
SArray *aBlockIdxN; // SArray<SBlockIdx>
SMapData nBlockMap; // SMapData<SBlock>
SBlockData nBlockData;
SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mBlock; // SMapData<SBlock>
SBlockData bData;
} dWriter;
SSkmInfo skmTable;
SSkmInfo skmRow;
/* commit del */
......@@ -276,16 +280,16 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->nextKey = TSKEY_MAX;
// old
taosArrayClear(pCommitter->aBlockIdx);
tMapDataReset(&pCommitter->oBlockMap);
tBlockDataReset(&pCommitter->oBlockData);
taosArrayClear(pCommitter->dReader.aBlockIdx);
tMapDataReset(&pCommitter->dReader.mBlock);
tBlockDataReset(&pCommitter->dReader.bData);
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid},
tDFileSetCmprFn, TD_EQ);
if (pRSet) {
code = tsdbDataFReaderOpen(&pCommitter->pReader, pTsdb, pRSet);
code = tsdbDataFReaderOpen(&pCommitter->dReader.pReader, pTsdb, pRSet);
if (code) goto _err;
code = tsdbReadBlockIdx(pCommitter->pReader, pCommitter->aBlockIdx, NULL);
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
if (code) goto _err;
}
......@@ -296,9 +300,9 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
taosArrayClear(pCommitter->aBlockIdxN);
tMapDataReset(&pCommitter->nBlockMap);
tBlockDataReset(&pCommitter->nBlockData);
taosArrayClear(pCommitter->dWriter.aBlockIdx);
tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData);
if (pRSet) {
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
......@@ -320,7 +324,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
}
code = tsdbDataFWriterOpen(&pCommitter->pWriter, pTsdb, &wSet);
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err;
_exit:
......@@ -391,10 +395,11 @@ static int32_t tsdbCommitBlockData(SCommitter *pCommitter, SBlockData *pBlockDat
}
}
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
code =
tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, NULL, NULL, pBlockIdx, pBlock, pCommitter->cmprAlg);
if (code) goto _err;
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
return code;
......@@ -407,8 +412,8 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
int8_t toDataOnly) {
int32_t code = 0;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlockData *pBlockDataMerge = &pCommitter->oBlockData;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockDataMerge = &pCommitter->dReader.bData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block;
SBlock *pBlock = &block;
TSDBROW *pRow1;
......@@ -416,7 +421,7 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
TSDBROW *pRow2 = &row2;
// read SBlockData
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
if (code) goto _err;
code = tBlockDataSetSchema(pBlockData, pCommitter->skmTable.pTSchema);
......@@ -513,7 +518,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
TSDBROW *pRow;
SBlock block;
SBlock *pBlock = &block;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
int64_t suid = pIter->pTbData->suid;
int64_t uid = pIter->pTbData->uid;
......@@ -575,14 +580,14 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
SBlock block;
if (pBlock->last) {
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlock, &pCommitter->oBlockData, NULL, NULL);
code = tsdbReadBlockData(pCommitter->dReader.pReader, pBlockIdx, pBlock, &pCommitter->dReader.bData, NULL, NULL);
if (code) goto _err;
tBlockReset(&block);
code = tsdbCommitBlockData(pCommitter, &pCommitter->oBlockData, &block, pBlockIdx, 0);
code = tsdbCommitBlockData(pCommitter, &pCommitter->dReader.bData, &block, pBlockIdx, 0);
if (code) goto _err;
} else {
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
code = tMapDataPutItem(&pCommitter->dWriter.mBlock, pBlock, tPutBlock);
if (code) goto _err;
}
......@@ -598,10 +603,10 @@ static int32_t tsdbCommitTableDataEnd(SCommitter *pCommitter, int64_t suid, int6
SBlockIdx blockIdx = {.suid = suid, .uid = uid};
SBlockIdx *pBlockIdx = &blockIdx;
code = tsdbWriteBlock(pCommitter->pWriter, &pCommitter->nBlockMap, NULL, pBlockIdx);
code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, NULL, pBlockIdx);
if (code) goto _err;
if (taosArrayPush(pCommitter->aBlockIdxN, pBlockIdx) == NULL) {
if (taosArrayPush(pCommitter->dWriter.aBlockIdx, pBlockIdx) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
......@@ -643,7 +648,7 @@ static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->nBlockData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
SBlock block;
TSDBROW *pRow;
......@@ -711,10 +716,10 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
}
if (pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL);
code = tsdbReadBlock(pCommitter->dReader.pReader, pBlockIdx, &pCommitter->dReader.mBlock, NULL);
if (code) goto _err;
nBlock = pCommitter->oBlockMap.nItem;
nBlock = pCommitter->dReader.mBlock.nItem;
ASSERT(nBlock > 0);
suid = pBlockIdx->suid;
......@@ -726,13 +731,13 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow == NULL && nBlock == 0) goto _exit;
// start ===========
tMapDataReset(&pCommitter->nBlockMap);
tMapDataReset(&pCommitter->dWriter.mBlock);
SBlock block;
SBlock *pBlock = &block;
iBlock = 0;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -756,7 +761,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -771,7 +776,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -798,7 +803,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
SBlock nextBlock = {0};
tBlockReset(&nextBlock);
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock + 1, &nextBlock, tGetBlock);
toKey = nextBlock.minKey;
}
......@@ -810,7 +815,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -822,7 +827,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
iBlock++;
if (iBlock < nBlock) {
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock);
tMapDataGetItemByIdx(&pCommitter->dReader.mBlock, iBlock, pBlock, tGetBlock);
} else {
pBlock = NULL;
}
......@@ -857,23 +862,23 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write blockIdx
code = tsdbWriteBlockIdx(pCommitter->pWriter, pCommitter->aBlockIdxN, NULL);
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
if (code) goto _err;
// update file header
code = tsdbUpdateDFileSetHeader(pCommitter->pWriter);
code = tsdbUpdateDFileSetHeader(pCommitter->dWriter.pWriter);
if (code) goto _err;
// upsert SDFileSet
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->pWriter->wSet);
code = tsdbFSUpsertFSet(&pCommitter->fs, &pCommitter->dWriter.pWriter->wSet);
if (code) goto _err;
// close and sync
code = tsdbDataFWriterClose(&pCommitter->pWriter, 1);
code = tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 1);
if (code) goto _err;
if (pCommitter->pReader) {
code = tsdbDataFReaderClose(&pCommitter->pReader);
if (pCommitter->dReader.pReader) {
code = tsdbDataFReaderClose(&pCommitter->dReader.pReader);
if (code) goto _err;
}
......@@ -898,14 +903,14 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t iTbData = 0;
int32_t nTbData = taosArrayGetSize(pMemTable->aTbData);
int32_t iBlockIdx = 0;
int32_t nBlockIdx = taosArrayGetSize(pCommitter->aBlockIdx);
int32_t nBlockIdx = taosArrayGetSize(pCommitter->dReader.aBlockIdx);
STbData *pTbData;
SBlockIdx *pBlockIdx;
ASSERT(nTbData > 0);
pTbData = (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData);
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
while (pTbData || pBlockIdx) {
if (pTbData && pBlockIdx) {
int32_t c = tTABLEIDCmprFn(pTbData, pBlockIdx);
......@@ -936,7 +941,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
continue;
_commit_table_mem_and_disk:
......@@ -944,7 +949,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
if (code) goto _err;
iBlockIdx++;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->aBlockIdx, iBlockIdx) : NULL;
pBlockIdx = (iBlockIdx < nBlockIdx) ? (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, iBlockIdx) : NULL;
iTbData++;
pTbData = (iTbData < nTbData) ? (STbData *)taosArrayGetP(pMemTable->aTbData, iTbData) : NULL;
continue;
......@@ -958,8 +963,8 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
_err:
tsdbError("vgId:%d commit file data failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbDataFReaderClose(&pCommitter->pReader);
tsdbDataFWriterClose(&pCommitter->pWriter, 0);
tsdbDataFReaderClose(&pCommitter->dReader.pReader);
tsdbDataFWriterClose(&pCommitter->dWriter.pWriter, 0);
return code;
}
......@@ -996,22 +1001,22 @@ _err:
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
pCommitter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->aBlockIdx == NULL) {
pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pCommitter->aBlockIdxN = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->aBlockIdxN == NULL) {
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataInit(&pCommitter->oBlockData);
code = tBlockDataInit(&pCommitter->dReader.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->nBlockData);
code = tBlockDataInit(&pCommitter->dWriter.bData);
if (code) goto _exit;
_exit:
......@@ -1019,12 +1024,12 @@ _exit:
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
taosArrayDestroy(pCommitter->aBlockIdx);
tMapDataClear(&pCommitter->oBlockMap);
tBlockDataClear(&pCommitter->oBlockData, 1);
taosArrayDestroy(pCommitter->aBlockIdxN);
tMapDataClear(&pCommitter->nBlockMap);
tBlockDataClear(&pCommitter->nBlockData, 1);
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataClear(&pCommitter->dReader.bData, 1);
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataClear(&pCommitter->dWriter.bData, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册