提交 53226eae 编写于 作者: H Hongze Cheng

more code

上级 07e89142
...@@ -15,6 +15,10 @@ ...@@ -15,6 +15,10 @@
#include "tsdb.h" #include "tsdb.h"
#define TSDB_ITER_TYPE_MEM 0x0
#define TSDB_ITER_TYPE_DAT 0x1
#define TSDB_ITER_TYPE_STT 0x2
typedef struct { typedef struct {
} SMemDIter; } SMemDIter;
...@@ -37,7 +41,9 @@ typedef struct { ...@@ -37,7 +41,9 @@ typedef struct {
int32_t iRow; int32_t iRow;
} SSttDIter; } SSttDIter;
typedef struct { typedef struct STsdbDataIter {
struct STsdbDataIter *next;
int32_t flag; int32_t flag;
SRowInfo rowInfo; SRowInfo rowInfo;
SRBTreeNode n; SRBTreeNode n;
...@@ -49,8 +55,9 @@ typedef struct { ...@@ -49,8 +55,9 @@ typedef struct {
STsdbFS fs; STsdbFS fs;
int64_t cid; int64_t cid;
int32_t fid; int32_t fid;
SDataFReader *pReader;
SDFileSet *pDFileSet; SDFileSet *pDFileSet;
SDataFReader *pReader;
STsdbDataIter *iterList; // list of iterators
SRBTree rtree; SRBTree rtree;
SBlockData bData; SBlockData bData;
} STsdbCompactor; } STsdbCompactor;
...@@ -58,6 +65,8 @@ typedef struct { ...@@ -58,6 +65,8 @@ typedef struct {
#define TSDB_FLG_DEEP_COMPACT 0x1 #define TSDB_FLG_DEEP_COMPACT 0x1
// ITER ========================= // ITER =========================
static int32_t tsdbDataIterNext(STsdbDataIter *pIter);
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) { static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n)); const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n)); const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));
...@@ -95,6 +104,7 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) ...@@ -95,6 +104,7 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter)
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pIter->flag = TSDB_ITER_TYPE_DAT;
SDataDIter *pDataDIter = (SDataDIter *)pIter->handle; SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
pDataDIter->pReader = pReader; pDataDIter->pReader = pReader;
...@@ -109,21 +119,16 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) ...@@ -109,21 +119,16 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter)
if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit; if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;
// TODO
code = tBlockDataCreate(&pDataDIter->bData); code = tBlockDataCreate(&pDataDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// read first data block pDataDIter->iBlockIdx = -1;
pDataDIter->iBlockIdx = 0;
code = tsdbReadDataBlk(pReader, taosArrayGet(pDataDIter->aBlockIdx, pDataDIter->iBlockIdx), &pDataDIter->mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pDataDIter->iDataBlk = 0; pDataDIter->iDataBlk = 0;
// code = tsdbReadDataBlock(pReader, tMapDat);
// TSDB_CHECK_CODE(code, lino, _exit);
pDataDIter->iRow = 0; pDataDIter->iRow = 0;
// TODO code = tsdbDataIterNext(pIter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -150,6 +155,7 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt ...@@ -150,6 +155,7 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit; goto _exit;
} }
pIter->flag = TSDB_ITER_TYPE_STT;
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle; SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
pSttDIter->pReader = pReader; pSttDIter->pReader = pReader;
...@@ -168,13 +174,11 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt ...@@ -168,13 +174,11 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIt
code = tBlockDataCreate(&pSttDIter->bData); code = tBlockDataCreate(&pSttDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iSttBlk = 0; pSttDIter->iSttBlk = -1;
// code = tsdbReadSttBlock(pReader, taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData); pSttDIter->iRow = -1;
// TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iRow = 0;
// TODO code = tsdbDataIterNext(pIter);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
...@@ -193,12 +197,42 @@ _exit: ...@@ -193,12 +197,42 @@ _exit:
static void tsdbDataIterClose(STsdbDataIter *pIter) { static void tsdbDataIterClose(STsdbDataIter *pIter) {
// TODO // TODO
ASSERT(0);
} }
static int32_t tsdbDataIterNext(STsdbDataIter *pIter) { static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
if (pIter->flag & TSDB_ITER_TYPE_MEM) {
// TODO // TODO
ASSERT(0);
} else if (pIter->flag & TSDB_ITER_TYPE_DAT) {
// TODO
ASSERT(0);
} else if (pIter->flag & TSDB_ITER_TYPE_STT) {
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
pSttDIter->iRow++;
if (pSttDIter->iRow < pSttDIter->bData.nRow) {
ASSERT(0);
} else {
pSttDIter->iSttBlk++;
if (pSttDIter->iSttBlk < taosArrayGetSize(pSttDIter->aSttBlk)) {
code = tsdbReadSttBlock(pSttDIter->pReader, pSttDIter->iStt,
taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iRow = 0;
} else {
// code = TSDB_CODE_TDB_NO_DATA;
// goto _exit;
}
}
} else {
ASSERT(0);
}
_exit: _exit:
return code; return code;
} }
...@@ -289,52 +323,102 @@ _exit: ...@@ -289,52 +323,102 @@ _exit:
return code; return code;
} }
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// Check if can do compact (TODO) STsdb *pTsdb = pCompactor->pTsdb;
// Do compact // next compact file
STsdbCompactor compactor = {0}; pCompactor->pDFileSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
tDFileSetCmprFn, TD_GT);
if (pCompactor->pDFileSet == NULL) goto _exit;
code = tsdbBeginCompact(pTsdb, &compactor); pCompactor->fid = pCompactor->pDFileSet->fid;
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
while (true) { // open iters
compactor.pDFileSet = (SDFileSet *)taosArraySearch(compactor.fs.aDFileSet, &compactor.fid, tDFileSetCmprFn, TD_GT); STsdbDataIter *pIter;
if (compactor.pDFileSet == NULL) break;
compactor.fid = compactor.pDFileSet->fid; pCompactor->iterList = NULL;
tRBTreeCreate(&pCompactor->rtree, tsdbDataIterCmprFn);
code = tsdbDataFReaderOpen(&compactor.pReader, pTsdb, compactor.pDFileSet); code = tsdbDataDIterOpen(pCompactor->pReader, &pIter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// open those iterators if (pIter) {
tRBTreeCreate(&compactor.rtree, tsdbDataIterCmprFn); pIter->next = pCompactor->iterList;
pCompactor->iterList = pIter;
STsdbDataIter *pIter; tRBTreePut(&pCompactor->rtree, &pIter->n);
}
code = tsdbDataDIterOpen(compactor.pReader, &pIter); for (int32_t iStt = 0; iStt < pCompactor->pReader->pSet->nSttF; iStt++) {
code = tsdbSttDIterOpen(pCompactor->pReader, iStt, &pIter);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); if (pIter) {
pIter->next = pCompactor->iterList;
pCompactor->iterList = pIter;
tRBTreePut(&pCompactor->rtree, &pIter->n);
}
}
for (int32_t iStt = 0; iStt < compactor.pReader->pSet->nSttF; iStt++) { _exit:
code = tsdbSttDIterOpen(compactor.pReader, iStt, &pIter); if (code) {
TSDB_CHECK_CODE(code, lino, _exit); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
} else {
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
return code;
}
if (pIter) tRBTreePut(&compactor.rtree, &pIter->n); static void tsdbCloseCompactor(STsdbCompactor *pCompactor) {
STsdb *pTsdb = pCompactor->pTsdb;
for (STsdbDataIter *pIter = pCompactor->iterList; pIter;) {
STsdbDataIter *pIterNext = pIter->next;
tsdbDataIterClose(pIter);
pIter = pIterNext;
} }
// TODO
ASSERT(0);
_exit:
tsdbDebug("vgId:%d %s done", TD_VID(pTsdb->pVnode), __func__);
}
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
int32_t code = 0;
int32_t lino = 0;
// Check if can do compact (TODO)
// Do compact
STsdbCompactor compactor = {0};
code = tsdbBeginCompact(pTsdb, &compactor);
TSDB_CHECK_CODE(code, lino, _exit);
while (true) {
code = tsdbOpenCompactor(&compactor);
TSDB_CHECK_CODE(code, lino, _exit);
if (compactor.pDFileSet == NULL) break;
// loop to merge row by row // loop to merge row by row
TSDBROW *pRow = NULL; TSDBROW *pRow = NULL;
int64_t nRow = 0;
for (;;) { for (;;) {
code = tsdbCompactNextRow(&compactor, &pRow); code = tsdbCompactNextRow(&compactor, &pRow);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pRow == NULL) break; if (pRow == NULL) break;
nRow++;
// code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0); // code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0);
// TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
...@@ -343,6 +427,8 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -343,6 +427,8 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
// TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
// } // }
} }
tsdbCloseCompactor(&compactor);
} }
_exit: _exit:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册