提交 3278c114 编写于 作者: H Hongze Cheng

more code

上级 78e3a0e3
...@@ -801,6 +801,8 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { ...@@ -801,6 +801,8 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
return pIter->pRow; return pIter->pRow;
} }
int32_t tRowInfoCmprFn(const void *p1, const void *p2);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -38,9 +38,10 @@ typedef struct { ...@@ -38,9 +38,10 @@ typedef struct {
} SSttDIter; } SSttDIter;
typedef struct { typedef struct {
int32_t flag; int32_t flag;
SRowInfo rowInfo; SRowInfo rowInfo;
char handle[]; SRBTreeNode n;
char handle[];
} STsdbDataIter; } STsdbDataIter;
typedef struct { typedef struct {
...@@ -56,6 +57,13 @@ typedef struct { ...@@ -56,6 +57,13 @@ typedef struct {
#define TSDB_FLG_DEEP_COMPACT 0x1 #define TSDB_FLG_DEEP_COMPACT 0x1
// ITER ========================= // ITER =========================
static int32_t tsdbDataIterCmprFn(const SRBTreeNode *n1, const SRBTreeNode *n2) {
const STsdbDataIter *pIter1 = (STsdbDataIter *)((char *)n1 - offsetof(STsdbDataIter, n));
const STsdbDataIter *pIter2 = (STsdbDataIter *)((char *)n2 - offsetof(STsdbDataIter, n));
return tRowInfoCmprFn(&pIter1->rowInfo, &pIter2->rowInfo);
}
static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) { static int32_t tsdbMemDIterOpen(STsdbDataIter **ppIter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -87,18 +95,52 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) ...@@ -87,18 +95,52 @@ static int32_t tsdbDataDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter)
goto _exit; goto _exit;
} }
SDataDIter *pDataDIter = (SDataDIter *)pIter->handle;
pDataDIter->pReader = pReader;
pDataDIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx));
if (pDataDIter->aBlockIdx == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tsdbReadBlockIdx(pReader, pDataDIter->aBlockIdx);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pDataDIter->aBlockIdx) == 0) goto _clear_exit;
code = tBlockDataCreate(&pDataDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
// read first data block
pDataDIter->iBlockIdx = 0;
code = tsdbReadDataBlk(pReader, taosArrayGet(pDataDIter->aBlockIdx, pDataDIter->iBlockIdx), &pDataDIter->mDataBlk);
TSDB_CHECK_CODE(code, lino, _exit);
pDataDIter->iDataBlk = 0;
// code = tsdbReadDataBlock(pReader, tMapDat);
// TSDB_CHECK_CODE(code, lino, _exit);
pDataDIter->iRow = 0;
// TODO // TODO
_exit: _exit:
if (code) { if (code) {
_clear_exit:
*ppIter = NULL; *ppIter = NULL;
if (pIter) {
tBlockDataDestroy(&pDataDIter->bData, 1);
tMapDataClear(&pDataDIter->mDataBlk);
taosArrayDestroy(pDataDIter->aBlockIdx);
taosMemoryFree(pIter);
}
} else { } else {
*ppIter = pIter; *ppIter = pIter;
} }
return code; return code;
} }
static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) { static int32_t tsdbSttDIterOpen(SDataFReader *pReader, int32_t iStt, STsdbDataIter **ppIter) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
...@@ -108,11 +150,40 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) { ...@@ -108,11 +150,40 @@ static int32_t tsdbSttDIterOpen(SDataFReader *pReader, STsdbDataIter **ppIter) {
goto _exit; goto _exit;
} }
SSttDIter *pSttDIter = (SSttDIter *)pIter->handle;
pSttDIter->pReader = pReader;
pSttDIter->iStt = iStt;
pSttDIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk));
if (pSttDIter->aSttBlk == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
code = tsdbReadSttBlk(pReader, pSttDIter->iStt, pSttDIter->aSttBlk);
TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pSttDIter->aSttBlk) == 0) goto _clear_exit;
code = tBlockDataCreate(&pSttDIter->bData);
TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iSttBlk = 0;
// code = tsdbReadSttBlock(pReader, taosArrayGet(pSttDIter->aSttBlk, pSttDIter->iSttBlk), &pSttDIter->bData);
// TSDB_CHECK_CODE(code, lino, _exit);
pSttDIter->iRow = 0;
// TODO // TODO
_exit: _exit:
if (code) { if (code) {
_clear_exit:
*ppIter = NULL; *ppIter = NULL;
if (pIter) {
tBlockDataDestroy(&pSttDIter->bData, 1);
taosArrayDestroy(pSttDIter->aSttBlk);
taosMemoryFree(pIter);
}
} else { } else {
*ppIter = pIter; *ppIter = pIter;
} }
...@@ -227,6 +298,27 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -227,6 +298,27 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
compactor.fid = compactor.pDFileSet->fid; compactor.fid = compactor.pDFileSet->fid;
code = tsdbDataFReaderOpen(&compactor.pReader, pTsdb, compactor.pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit);
// open those iterators
tRBTreeCreate(&compactor.rtree, tsdbDataIterCmprFn);
STsdbDataIter *pIter;
code = tsdbDataDIterOpen(compactor.pReader, &pIter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pIter) tRBTreePut(&compactor.rtree, &pIter->n);
for (int32_t iStt = 0; iStt < compactor.pReader->pSet->nSttF; iStt++) {
code = tsdbSttDIterOpen(compactor.pReader, iStt, &pIter);
TSDB_CHECK_CODE(code, lino, _exit);
if (pIter) tRBTreePut(&compactor.rtree, &pIter->n);
}
#if 0
if (flag & TSDB_FLG_DEEP_COMPACT) { if (flag & TSDB_FLG_DEEP_COMPACT) {
code = tsdbDeepCompact(&compactor); code = tsdbDeepCompact(&compactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
...@@ -234,6 +326,7 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { ...@@ -234,6 +326,7 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
code = tsdbShallowCompact(&compactor); code = tsdbShallowCompact(&compactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
#endif
} }
_exit: _exit:
......
...@@ -63,7 +63,6 @@ struct STsdbSnapReader { ...@@ -63,7 +63,6 @@ struct STsdbSnapReader {
uint8_t* aBuf[5]; uint8_t* aBuf[5];
}; };
extern int32_t tRowInfoCmprFn(const void* p1, const void* p2);
extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData);
extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo);
......
...@@ -15,13 +15,26 @@ ...@@ -15,13 +15,26 @@
#include "vnd.h" #include "vnd.h"
extern int32_t tsdbCompact(STsdb *pTsdb, int32_t flag);
extern void vnodePrepareCommit(SVnode *pVnode); extern void vnodePrepareCommit(SVnode *pVnode);
static int32_t vnodeCompactImpl(SCommitInfo *pInfo) { static int32_t vnodeCompactImpl(SCommitInfo *pInfo) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
// TODO // TODO
SVnode *pVnode = pInfo->pVnode;
code = tsdbCompact(pVnode->pTsdb, 0);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
} else {
vDebug("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
}
return code; return code;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册