提交 47d493e8 编写于 作者: H Hongze Cheng

more code

上级 38b4c5c1
...@@ -638,6 +638,7 @@ struct SDataFReader { ...@@ -638,6 +638,7 @@ struct SDataFReader {
uint8_t *aBuf[3]; uint8_t *aBuf[3];
}; };
// NOTE: do NOT change the order of the fields
typedef struct { typedef struct {
int64_t suid; int64_t suid;
int64_t uid; int64_t uid;
......
...@@ -688,13 +688,6 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { ...@@ -688,13 +688,6 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
STsdb *pTsdb = pCompactor->pTsdb; STsdb *pTsdb = pCompactor->pTsdb;
// next compact file
pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
tDFileSetCmprFn, TD_GT);
if (pCompactor->pDFileSet == NULL) goto _exit;
pCompactor->fid = pCompactor->pDFileSet->fid;
// reader // reader
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -841,111 +834,119 @@ _exit: ...@@ -841,111 +834,119 @@ _exit:
return code; return code;
} }
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { static int32_t tsdbCompactFileSet(STsdbCompactor *pCompactor) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// Check if can do compact (TODO) // open compactor
code = tsdbOpenCompactor(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
// Do compact // do compact
STsdbCompactor *pCompactor = &(STsdbCompactor){0}; SRowInfo *pRowInfo;
STSchema *pTSchema;
int64_t nRow = 0;
code = tsdbBeginCompact(pTsdb, pCompactor); code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
while (true) { if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0))) {
code = tsdbOpenCompactor(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
if (pCompactor->pDFileSet == NULL) break; while (pRowInfo) {
nRow++;
// loop to merge row by row if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table
SRowInfo *pRowInfo; (pCompactor->tableId.uid != pRowInfo->uid &&
STSchema *pTSchema; (pRowInfo->suid == 0 ||
int64_t nRow = 0; pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table
) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pRowInfo && (code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, code = tBlockDataInit(&pCompactor->bData, (TABLEID *)pRowInfo, pTSchema, NULL, 0);
pTSchema, NULL, 0))) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
while (pRowInfo) { // append row to block data
nRow++; code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) { // init the block data if not initialized yet pCompactor->tableId.suid = pRowInfo->suid;
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, pCompactor->tableId.uid = pRowInfo->uid;
NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if ((pCompactor->tableId.suid != pRowInfo->suid) || // different super table
(pCompactor->tableId.uid != pRowInfo->uid &&
(pRowInfo->suid == 0 ||
pCompactor->bData.uid && pCompactor->bData.nRow >= pCompactor->minRows)) // different table
) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// append row to block data // check if block data is full
code = tBlockDataAppendRowEx(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); if (pCompactor->bData.nRow >= pCompactor->maxRows) {
code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->tableId.suid = pRowInfo->suid; // iterate to next row
pCompactor->tableId.uid = pRowInfo->uid; code = tsdbCompactNextRow(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
// check if block data is full code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
if (pCompactor->bData.nRow >= pCompactor->maxRows) { TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteBlockData(pCompactor); }
TSDB_CHECK_CODE(code, lino, _exit);
}
// iterate to next row // handle remain data
code = tsdbCompactNextRow(pCompactor); code = tsdbCompactWriteBlockData(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); code = tsdbCompactWriteDataBlk(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbCompactWriteBlockData(pCompactor); code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCompactWriteDataBlk(pCompactor); code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteBlockIdx(pCompactor->pWriter, pCompactor->aBlockIdx); code = tsdbUpdateDFileSetHeader(pCompactor->pWriter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteSttBlk(pCompactor->pWriter, pCompactor->aSttBlk); code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbUpdateDFileSetHeader(pCompactor->pWriter); code = tsdbDataFWriterClose(&pCompactor->pWriter, 1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbFSUpsertFSet(&pCompactor->fs, &pCompactor->pWriter->wSet); _exit:
TSDB_CHECK_CODE(code, lino, _exit); // close compactor
tsdbCloseCompactor(pCompactor);
return code;
}
code = tsdbDataFWriterClose(&pCompactor->pWriter, 1); int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
TSDB_CHECK_CODE(code, lino, _exit); int32_t code = 0;
int32_t lino = 0;
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
tsdbCloseCompactor(pCompactor); // begin compact
code = tsdbBeginCompact(pTsdb, pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
// do compact each file set
while (true) {
pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
tDFileSetCmprFn, TD_GT);
if (pCompactor->pDFileSet == NULL) break;
pCompactor->fid = pCompactor->pDFileSet->fid;
code = tsdbCompactFileSet(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
} }
code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL); code = tsdbFSUpsertDelFile(&pCompactor->fs, NULL);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
// commit/abort compact
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbAbortCompact(pCompactor); tsdbAbortCompact(pCompactor);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册