提交 ebd60a96 编写于 作者: H Hongze Cheng

refact: tsdb last file optimize 1

上级 a73ea6f9
......@@ -113,6 +113,9 @@ int32_t tPutBlock(uint8_t *p, void *ph);
int32_t tGetBlock(uint8_t *p, void *ph);
int32_t tBlockCmprFn(const void *p1, const void *p2);
bool tBlockHasSma(SBlock *pBlock);
// SBlockL
int32_t tPutBlockL(uint8_t *p, void *ph);
int32_t tGetBlockL(uint8_t *p, void *ph);
// SBlockIdx
int32_t tPutBlockIdx(uint8_t *p, void *ph);
int32_t tGetBlockIdx(uint8_t *p, void *ph);
......@@ -225,6 +228,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync);
int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter);
int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx);
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf);
int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2,
SBlockIdx *pBlockIdx, SBlock *pBlock, int8_t cmprAlg);
......@@ -233,6 +237,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo);
int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet);
int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx, uint8_t **ppBuf);
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf);
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData, uint8_t **ppBuf);
int32_t tsdbReadColData(SDataFReader *pReader, SBlockIdx *pBlockIdx, SBlock *pBlock, int16_t *aColId, int32_t nCol,
SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2);
......@@ -416,6 +421,7 @@ struct SBlock {
};
struct SBlockL {
int64_t suid;
struct {
int64_t uid;
int64_t version;
......@@ -452,6 +458,7 @@ struct SColData {
struct SBlockData {
int32_t nRow;
int64_t *aUid;
int64_t *aVersion;
TSKEY *aTSKEY;
SArray *aIdx; // SArray<int32_t>
......@@ -513,6 +520,7 @@ struct SHeadFile {
int64_t commitID;
int64_t size;
int64_t offset;
int64_t loffset;
};
struct SDataFile {
......
......@@ -39,14 +39,18 @@ typedef struct {
struct {
SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aBlockL; // SArray<SBlockL>
SMapData mBlock; // SMapData<SBlock>, read from reader
SBlockData bData;
SBlockData bDatal;
} dReader;
struct {
SDataFWriter *pWriter;
SArray *aBlockIdx; // SArray<SBlockIdx>
SArray *aBlockL; // SArray<SBlockL>
SMapData mBlock; // SMapData<SBlock>
SBlockData bData;
SBlockData bDatal;
} dWriter;
SSkmInfo skmTable;
SSkmInfo skmRow;
......@@ -279,10 +283,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
// memory
pCommitter->nextKey = TSKEY_MAX;
// old
taosArrayClear(pCommitter->dReader.aBlockIdx);
tMapDataReset(&pCommitter->dReader.mBlock);
tBlockDataReset(&pCommitter->dReader.bData);
// Reader
pRSet = (SDFileSet *)taosArraySearch(pCommitter->fs.aDFileSet, &(SDFileSet){.fid = pCommitter->commitFid},
tDFileSetCmprFn, TD_EQ);
if (pRSet) {
......@@ -291,22 +292,29 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = tsdbReadBlockIdx(pCommitter->dReader.pReader, pCommitter->dReader.aBlockIdx, NULL);
if (code) goto _err;
code = tsdbReadBlockL(pCommitter->dReader.pReader, pCommitter->dReader.aBlockL, NULL);
if (code) goto _err;
} else {
pCommitter->dReader.pReader = NULL;
taosArrayClear(pCommitter->dReader.aBlockIdx);
taosArrayClear(pCommitter->dReader.aBlockL);
}
tMapDataReset(&pCommitter->dReader.mBlock);
tBlockDataReset(&pCommitter->dReader.bData);
tBlockDataReset(&pCommitter->dReader.bDatal);
// new
// Writer
SHeadFile fHead;
SDataFile fData;
SLastFile fLast;
SSmaFile fSma;
SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .pLastF = &fLast, .pSmaF = &fSma};
taosArrayClear(pCommitter->dWriter.aBlockIdx);
tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData);
if (pRSet) {
wSet.diskId = pRSet->diskId;
wSet.fid = pCommitter->commitFid;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0};
fData = *pRSet->pDataF;
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
fSma = *pRSet->pSmaF;
......@@ -319,7 +327,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
wSet.diskId = did;
wSet.fid = pCommitter->commitFid;
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0};
fHead = (SHeadFile){.commitID = pCommitter->commitID, .offset = 0, .size = 0, .loffset = 0};
fData = (SDataFile){.commitID = pCommitter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pCommitter->commitID, .size = 0};
fSma = (SSmaFile){.commitID = pCommitter->commitID, .size = 0};
......@@ -327,6 +335,12 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet);
if (code) goto _err;
taosArrayClear(pCommitter->dWriter.aBlockIdx);
taosArrayClear(pCommitter->dWriter.aBlockL);
tMapDataReset(&pCommitter->dWriter.mBlock);
tBlockDataReset(&pCommitter->dWriter.bData);
tBlockDataReset(&pCommitter->dWriter.bDatal);
_exit:
return code;
......@@ -861,7 +875,11 @@ _err:
static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) {
int32_t code = 0;
// write blockIdx
// write aBlockL
code = tsdbWriteBlockL(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockL, NULL);
if (code) goto _err;
// write aBlockIdx
code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx, NULL);
if (code) goto _err;
......@@ -1001,14 +1019,15 @@ _err:
static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
int32_t code = 0;
// Reader
pCommitter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dReader.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) {
pCommitter->dReader.aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pCommitter->dReader.aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
......@@ -1016,20 +1035,46 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) {
code = tBlockDataInit(&pCommitter->dReader.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->dReader.bDatal);
if (code) goto _exit;
// Writer
pCommitter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pCommitter->dWriter.aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pCommitter->dWriter.aBlockL = taosArrayInit(0, sizeof(SBlockL));
if (pCommitter->dWriter.aBlockL == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tBlockDataInit(&pCommitter->dWriter.bData);
if (code) goto _exit;
code = tBlockDataInit(&pCommitter->dWriter.bDatal);
if (code) goto _exit;
_exit:
return code;
}
static void tsdbCommitDataEnd(SCommitter *pCommitter) {
// Reader
taosArrayDestroy(pCommitter->dReader.aBlockIdx);
taosArrayDestroy(pCommitter->dReader.aBlockL);
tMapDataClear(&pCommitter->dReader.mBlock);
tBlockDataClear(&pCommitter->dReader.bData, 1);
tBlockDataClear(&pCommitter->dReader.bDatal, 1);
// Writer
taosArrayDestroy(pCommitter->dWriter.aBlockIdx);
taosArrayDestroy(pCommitter->dWriter.aBlockL);
tMapDataClear(&pCommitter->dWriter.mBlock);
tBlockDataClear(&pCommitter->dWriter.bData, 1);
tBlockDataClear(&pCommitter->dWriter.bDatal, 1);
tTSchemaDestroy(pCommitter->skmTable.pTSchema);
tTSchemaDestroy(pCommitter->skmRow.pTSchema);
}
......
......@@ -21,6 +21,7 @@ int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
n += tPutI64v(p ? p + n : p, pHeadFile->commitID);
n += tPutI64v(p ? p + n : p, pHeadFile->size);
n += tPutI64v(p ? p + n : p, pHeadFile->offset);
n += tPutI64v(p ? p + n : p, pHeadFile->loffset);
return n;
}
......@@ -31,6 +32,7 @@ static int32_t tGetHeadFile(uint8_t *p, SHeadFile *pHeadFile) {
n += tGetI64v(p + n, &pHeadFile->commitID);
n += tGetI64v(p + n, &pHeadFile->size);
n += tGetI64v(p + n, &pHeadFile->offset);
n += tGetI64v(p + n, &pHeadFile->loffset);
return n;
}
......
......@@ -597,6 +597,68 @@ _err:
return code;
}
int32_t tsdbReadBlockL(SDataFReader *pReader, SArray *aBlockL, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pReader->pSet->pHeadF->loffset;
int64_t size = pReader->pSet->pHeadF->offset - offset;
int64_t n;
uint32_t delimiter;
uint8_t *pBuf = NULL;
SBlockL blockl;
if (!ppBuf) ppBuf = &pBuf;
// alloc
code = tRealloc(ppBuf, size);
if (code) goto _err;
// seek
if (taosLSeekFile(pReader->pHeadFD, offset, SEEK_SET) < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// read
n = taosReadFile(pReader->pHeadFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
} else if (n < size) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// check
if (!taosCheckChecksumWhole(*ppBuf, size)) {
code = TSDB_CODE_FILE_CORRUPTED;
goto _err;
}
// decode
n = 0;
n = tGetU32(*ppBuf + n, &delimiter);
ASSERT(delimiter == TSDB_FILE_DLMT);
taosArrayClear(aBlockL);
while (n < size - sizeof(TSCKSUM)) {
n += tGetBlockL(*ppBuf + n, &blockl);
if (taosArrayPush(aBlockL, &blockl) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n + sizeof(TSCKSUM) == size);
tFree(pBuf);
return code;
_err:
tsdbError("vgId:%d read blockl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) {
int32_t code = 0;
int64_t offset = pBlockIdx->offset;
......@@ -1593,6 +1655,53 @@ _err:
return code;
}
int32_t tsdbWriteBlockL(SDataFWriter *pWriter, SArray *aBlockL, uint8_t **ppBuf) {
int32_t code = 0;
SHeadFile *pHeadFile = &pWriter->fHead;
uint8_t *pBuf = NULL;
int64_t size;
int64_t n;
// size
size = sizeof(uint32_t);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) {
size += tPutBlockL(NULL, taosArrayGet(aBlockL, iBlockL));
}
size += sizeof(TSCKSUM);
// alloc
if (!ppBuf) ppBuf = &pBuf;
code = tRealloc(ppBuf, size);
if (code) goto _err;
// encode
n = 0;
n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT);
for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aBlockL); iBlockL++) {
n += tPutBlockL(*ppBuf + n, taosArrayGet(aBlockL, iBlockL));
}
taosCalcChecksumAppend(0, *ppBuf, size);
ASSERT(n + sizeof(TSCKSUM) == size);
// write
n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size);
if (n < 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// update
pHeadFile->loffset = pHeadFile->size;
pHeadFile->size += size;
return code;
_err:
tsdbError("vgId:%d tsdb write blockl failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code));
return code;
}
static void tsdbUpdateBlockInfo(SBlockData *pBlockData, SBlock *pBlock) {
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]};
......
......@@ -911,14 +911,14 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3
if (pSet) {
wSet.diskId = pSet->diskId;
wSet.fid = fid;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
fData = *pSet->pDataF;
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
fSma = *pSet->pSmaF;
} else {
wSet.diskId = (SDiskID){.level = 0, .id = 0};
wSet.fid = fid;
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0};
fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0, .loffset = 0};
fData = (SDataFile){.commitID = pWriter->commitID, .size = 0};
fLast = (SLastFile){.commitID = pWriter->commitID, .size = 0};
fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0};
......
......@@ -295,6 +295,56 @@ bool tBlockHasSma(SBlock *pBlock) {
return pBlock->aSubBlock[0].nSma > 0;
}
// SBlockL ======================================================
int32_t tPutBlockL(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockL *pBlockL = (SBlockL *)ph;
n += tPutI64(p ? p + n : p, pBlockL->suid);
n += tPutI64(p ? p + n : p, pBlockL->minKey.uid);
n += tPutI64v(p ? p + n : p, pBlockL->minKey.version);
n += tPutI64(p ? p + n : p, pBlockL->minKey.ts);
n += tPutI64(p ? p + n : p, pBlockL->maxKey.uid);
n += tPutI64v(p ? p + n : p, pBlockL->maxKey.version);
n += tPutI64(p ? p + n : p, pBlockL->maxKey.ts);
n += tPutI64v(p ? p + n : p, pBlockL->minVer);
n += tPutI64v(p ? p + n : p, pBlockL->maxVer);
n += tPutI32v(p ? p + n : p, pBlockL->nRow);
n += tPutI8(p ? p + n : p, pBlockL->cmprAlg);
n += tPutI64v(p ? p + n : p, pBlockL->offset);
n += tPutI32v(p ? p + n : p, pBlockL->szBlock);
n += tPutI32v(p ? p + n : p, pBlockL->szBlockCol);
n += tPutI32v(p ? p + n : p, pBlockL->szUid);
n += tPutI32v(p ? p + n : p, pBlockL->szVer);
n += tPutI32v(p ? p + n : p, pBlockL->szTSKEY);
return n;
}
int32_t tGetBlockL(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockL *pBlockL = (SBlockL *)ph;
n += tGetI64(p + n, &pBlockL->suid);
n += tGetI64(p + n, &pBlockL->minKey.uid);
n += tGetI64v(p + n, &pBlockL->minKey.version);
n += tGetI64(p + n, &pBlockL->minKey.ts);
n += tGetI64(p + n, &pBlockL->maxKey.uid);
n += tGetI64v(p + n, &pBlockL->maxKey.version);
n += tGetI64(p + n, &pBlockL->maxKey.ts);
n += tGetI64v(p + n, &pBlockL->minVer);
n += tGetI64v(p + n, &pBlockL->maxVer);
n += tGetI32v(p + n, &pBlockL->nRow);
n += tGetI8(p + n, &pBlockL->cmprAlg);
n += tGetI64v(p + n, &pBlockL->offset);
n += tGetI32v(p + n, &pBlockL->szBlock);
n += tGetI32v(p + n, &pBlockL->szBlockCol);
n += tGetI32v(p + n, &pBlockL->szUid);
n += tGetI32v(p + n, &pBlockL->szVer);
n += tGetI32v(p + n, &pBlockL->szTSKEY);
return n;
}
// SBlockCol ======================================================
int32_t tPutBlockCol(uint8_t *p, void *ph) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册