提交 c7267c7c 编写于 作者: H Hongze Cheng

more code

上级 be85aac8
......@@ -273,20 +273,6 @@ _exit:
// COMPACT =========================
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
taosArrayDestroy(pCompactor->aSkyLine);
taosArrayDestroy(pCompactor->aDelData);
taosArrayDestroy(pCompactor->aDelIdx);
tsdbDelFReaderClose(&pCompactor->pDelFReader);
tsdbFSDestroy(&pCompactor->fs);
tBlockDataDestroy(&pCompactor->bData);
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
pCompactor->tbSkm.pTSchema = NULL;
taosArrayDestroy(pCompactor->aBlockIdx);
tMapDataClear(&pCompactor->mDataBlk);
taosArrayDestroy(pCompactor->aSttBlk);
}
static int32_t tsdbCommitCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
......@@ -423,43 +409,6 @@ static int32_t tDelIdxCmprFn(const SDelIdx *pDelIdx1, const SDelIdx *pDelIdx2) {
return 0;
}
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
TSDBKEY tKey = TSDBROW_KEY(pRow);
while (tKey.ts > pCompactor->sKey.ts) {
pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version;
pCompactor->iKey++;
if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts;
} else {
pCompactor->sKey.ts = TSKEY_MAX;
}
}
if (tKey.ts < pCompactor->sKey.ts) {
if (tKey.version > pCompactor->sKey.version) {
return false;
} else {
return true;
}
} else if (tKey.ts == pCompactor->sKey.ts) {
int64_t version;
if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version);
} else {
version = pCompactor->sKey.version;
}
if (tKey.version > version) {
return false;
} else {
return true;
}
}
return false;
}
static int32_t tsdbCompactNextRow(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
......@@ -774,9 +723,9 @@ typedef struct {
SArray *aDelIdx; // SArray<SDelIdx>
SArray *aDelData; // SArray<SDelData>
SArray *aSkyLine; // SArray<TSDBKEY>
TSDBKEY *aTSDBKEY;
int32_t iKey;
TSDBKEY sKey;
int32_t iDelIdx;
int32_t iSkyLine;
int8_t onGoing;
// Reader
SDataFReader *pReader;
......@@ -799,9 +748,38 @@ static int32_t tsdbCompactWriteTableDataStart(STsdbCompactor *pCompactor, TABLEI
pCompactor->tbid = *pId;
// TODO
// tombstone
for (;;) {
if (pCompactor->iDelIdx >= taosArrayGetSize(pCompactor->aDelIdx)) {
if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine);
pCompactor->iSkyLine = 0;
break;
}
SDelIdx *pDelIdx = (SDelIdx *)taosArrayGet(pCompactor->aDelIdx, pCompactor->iDelIdx);
int32_t c = tTABLEIDCmprFn(pDelIdx, &pCompactor->tbid);
if (c < 0) {
pCompactor->iDelIdx++;
} else if (c == 0) {
pCompactor->iDelIdx++;
code = tsdbReadDelData(pCompactor->pDelFReader, pDelIdx, pCompactor->aDelData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbBuildDeleteSkyline(pCompactor->aDelData, 0, taosArrayGetSize(pCompactor->aDelData) - 1,
pCompactor->aSkyLine);
TSDB_CHECK_CODE(code, lino, _exit);
// update table schema if necessary (TODO)
pCompactor->iSkyLine = 0;
break;
} else {
if (pCompactor->aSkyLine) taosArrayClear(pCompactor->aSkyLine);
pCompactor->iSkyLine = 0;
break;
}
}
// reader and write (TODO)
code = tsdbUpdateTableSchema(pCompactor->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pCompactor->tbSkm);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -880,6 +858,43 @@ _exit:
return code;
}
static bool tsdbCompactRowIsDeleted(STsdbCompactor *pCompactor, TSDBROW *pRow) {
// TSDBKEY tKey = TSDBROW_KEY(pRow);
// while (tKey.ts > pCompactor->sKey.ts) {
// pCompactor->sKey.version = pCompactor->aTSDBKEY[pCompactor->iKey].version;
// pCompactor->iKey++;
// if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
// pCompactor->sKey.ts = pCompactor->aTSDBKEY[pCompactor->iKey].ts;
// } else {
// pCompactor->sKey.ts = TSKEY_MAX;
// }
// }
// if (tKey.ts < pCompactor->sKey.ts) {
// if (tKey.version > pCompactor->sKey.version) {
// return false;
// } else {
// return true;
// }
// } else if (tKey.ts == pCompactor->sKey.ts) {
// int64_t version;
// if (pCompactor->iKey < taosArrayGetSize(pCompactor->aSkyLine)) {
// version = TMAX(pCompactor->sKey.version, pCompactor->aTSDBKEY[pCompactor->iKey].version);
// } else {
// version = pCompactor->sKey.version;
// }
// if (tKey.version > version) {
// return false;
// } else {
// return true;
// }
// }
return false;
}
static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *pRowInfo) {
int32_t code = 0;
int32_t lino = 0;
......@@ -892,6 +907,7 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p
pRowInfo = &rInfo;
}
// start a new table data write if need
if (pRowInfo->uid != pCompactor->tbid.uid) {
if (pCompactor->tbid.uid) {
code = tsdbCompactWriteTableDataEnd(pCompactor);
......@@ -902,7 +918,10 @@ static int32_t tsdbCompactWriteTableData(STsdbCompactor *pCompactor, SRowInfo *p
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
// check if row is deleted
if (pCompactor->onGoing && tsdbCompactRowIsDeleted(pCompactor, &pRowInfo->row)) goto _exit;
code = tBlockDataUpsertRow(&pCompactor->bData, &pRowInfo->row, NULL, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
if (pCompactor->bData.nRow >= pCompactor->maxRows) {
......@@ -977,6 +996,10 @@ static int32_t tsdbCompactFileSetStart(STsdbCompactor *pCompactor, SDFileSet *pS
pCompactor->fid = pSet->fid;
pCompactor->tbid = (TABLEID){0};
/* tombstone */
pCompactor->iDelIdx = 0;
pCompactor->iSkyLine = 0;
/* reader */
code = tsdbDataFReaderOpen(&pCompactor->pReader, pCompactor->pTsdb, pSet);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -1114,6 +1137,31 @@ _exit:
return code;
}
static void tsdbEndCompact(STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
// writer
tBlockDataDestroy(&pCompactor->sData);
tBlockDataDestroy(&pCompactor->bData);
taosArrayDestroy(pCompactor->aSttBlk);
tMapDataClear(&pCompactor->mDataBlk);
taosArrayDestroy(pCompactor->aBlockIdx);
// reader
// tombstone
taosArrayDestroy(pCompactor->aSkyLine);
taosArrayDestroy(pCompactor->aDelData);
taosArrayDestroy(pCompactor->aDelIdx);
// others
tDestroyTSchema(pCompactor->tbSkm.pTSchema);
tsdbFSDestroy(&pCompactor->fs);
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
}
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
......@@ -1128,26 +1176,22 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
code = tsdbFSCopy(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit);
/* tombstone (TODO ) */
#if 0
/* tombstone */
if (pCompactor->fs.pDelFile) {
code = tsdbDelFReaderOpen(&pCompactor->pDelFReader, pCompactor->fs.pDelFile, pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx));
if (pCompactor->aDelIdx == NULL) {
if ((pCompactor->aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData));
if (pCompactor->aDelData == NULL) {
if ((pCompactor->aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY));
if (pCompactor->aSkyLine == NULL) {
if ((pCompactor->aSkyLine = taosArrayInit(0, sizeof(TSDBKEY))) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -1155,7 +1199,6 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
code = tsdbReadDelIdx(pCompactor->pDelFReader, pCompactor->aDelIdx);
TSDB_CHECK_CODE(code, lino, _exit);
}
#endif
/* reader */
......@@ -1213,6 +1256,6 @@ _exit:
// } else {
// tsdbCommitCompact(pCompactor);
// }
// tsdbEndCompact(pCompactor);
tsdbEndCompact(pCompactor);
return code;
}
......@@ -15,6 +15,8 @@
#include "tsdb.h"
// STsdbDataIter2
/* open */
int32_t tsdbOpenDataFileDataIter(SDataFReader* pReader, STsdbDataIter2** ppIter) {
int32_t code = 0;
......@@ -392,3 +394,23 @@ int32_t tsdbDataIterNext2(STsdbDataIter2* pIter, STsdbFilterInfo* pFilterInfo) {
}
/* get */
// STsdbFSetIter
typedef struct STsdbFSetDataIter {
STsdb* pTsdb;
int32_t flags;
/* tombstone */
SDelFReader* pDelFReader;
SArray* aDelIdx; // SArray<SDelIdx>
SArray* aDelData; // SArray<SDelData>
SArray* aSkeyLine; // SArray<TABLEID>
int32_t iDelIdx;
int32_t iSkyLine;
/* time-series data */
SDataFReader* pReader;
STsdbDataIter2* iterList;
STsdbDataIter2* pIter;
SRBTree rbt;
} STsdbFSetDataIter;
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册