提交 9694933e 编写于 作者: H Hongze Cheng

fix: compact issue

上级 f523a96c
......@@ -100,7 +100,7 @@ int32_t vnodeShouldCommit(SVnode* pVnode);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int32_t vnodeCommitInfo(const char* dir);
int32_t vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode);
......
......@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr;
typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
#define VNODE_META_DIR "meta"
......@@ -173,6 +174,7 @@ int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
......@@ -450,10 +452,14 @@ struct SCommitInfo {
SVnodeInfo info;
SVnode* pVnode;
TXN* txn;
};
// APIs
int32_t (*commitFn)(STsdb* pTsdb, SCommitInfo* pInfo);
struct SCompactInfo {
SVnode* pVnode;
int32_t flag;
int64_t commitID;
};
#ifdef __cplusplus
}
#endif
......
......@@ -88,9 +88,8 @@ static int32_t tsdbAbortCompact(STsdbCompactor *pCompactor) {
int32_t lino = 0;
STsdb *pTsdb = pCompactor->pTsdb;
// TODO
ASSERT(0);
code = tsdbFSRollback(pTsdb);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
......@@ -571,12 +570,12 @@ static void tsdbEndCompact(STsdbCompactor *pCompactor) {
tsdbInfo("vgId:%d %s done, commit ID:%" PRId64, TD_VID(pCompactor->pTsdb->pVnode), __func__, pCompactor->commitID);
}
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
static int32_t tsdbBeginCompact(STsdb *pTsdb, SCompactInfo *pInfo, STsdbCompactor *pCompactor) {
int32_t code = 0;
int32_t lino = 0;
pCompactor->pTsdb = pTsdb;
pCompactor->commitID = 0; // TODO
pCompactor->commitID = pInfo->commitID;
pCompactor->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
......@@ -637,12 +636,12 @@ _exit:
return code;
}
int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
int32_t tsdbCompact(STsdb *pTsdb, SCompactInfo *pInfo) {
int32_t code = 0;
STsdbCompactor *pCompactor = &(STsdbCompactor){0};
if ((code = tsdbBeginCompact(pTsdb, pCompactor))) return code;
if ((code = tsdbBeginCompact(pTsdb, pInfo, pCompactor))) return code;
for (;;) {
SDFileSet *pSet = (SDFileSet *)taosArraySearch(pCompactor->fs.aDFileSet, &(SDFileSet){.fid = pCompactor->fid},
......
......@@ -113,8 +113,6 @@ int vnodeBegin(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
pVnode->state.commitID++;
// alloc buffer pool
code = vnodeGetBufPoolToUse(pVnode);
TSDB_CHECK_CODE(code, lino, _exit);
......@@ -221,7 +219,7 @@ _err:
return -1;
}
int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
int vnodeCommitInfo(const char *dir) {
char fname[TSDB_FILENAME_LEN];
char tfname[TSDB_FILENAME_LEN];
......@@ -233,8 +231,7 @@ int vnodeCommitInfo(const char *dir, const SVnodeInfo *pInfo) {
return -1;
}
vInfo("vgId:%d, vnode info is committed", pInfo->config.vgId);
vInfo("vnode info is committed, dir:%s", dir);
return 0;
}
......@@ -289,7 +286,7 @@ _err:
return -1;
}
int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
char dir[TSDB_FILENAME_LEN] = {0};
......@@ -301,7 +298,7 @@ int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
pInfo->info.config = pVnode->config;
pInfo->info.state.committed = pVnode->state.applied;
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
pInfo->info.state.commitID = pVnode->state.commitID;
pInfo->info.state.commitID = ++pVnode->state.commitID;
pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta);
......@@ -336,7 +333,7 @@ _exit:
vError("vgId:%d, %s failed at line %d since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, lino,
tstrerror(code), pVnode->state.commitID);
} else {
vDebug("vgId:%d, %s done", TD_VID(pVnode), __func__);
vDebug("vgId:%d, %s done, commit id:%d", TD_VID(pVnode), __func__, pInfo->info.state.commitID);
}
return code;
......@@ -468,7 +465,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
}
// commit info
if (vnodeCommitInfo(dir, &pInfo->info) < 0) {
if (vnodeCommitInfo(dir) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
......
......@@ -15,60 +15,86 @@
#include "vnd.h"
extern int32_t tsdbCompact(STsdb *pTsdb, int32_t flag);
int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo);
static int32_t vnodeCompactImpl(SCommitInfo *pInfo) {
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
int32_t lino = 0;
// TODO
SCompactInfo *pInfo = (SCompactInfo *)param;
SVnode *pVnode = pInfo->pVnode;
code = tsdbCompact(pVnode->pTsdb, 0);
// do compact
code = tsdbCompact(pInfo->pVnode->pTsdb, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
// end compact
char dir[TSDB_FILENAME_LEN] = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
vDebug("vgId:%d %s done", TD_VID(pInfo->pVnode), __func__);
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeCommitInfo(dir);
_exit:
taosMemoryFree(pInfo);
tsem_post(&pInfo->pVnode->canCommit);
return code;
}
static int32_t vnodeCompactTask(void *param) {
static int32_t vnodePrepareCompact(SVnode *pVnode, SCompactInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
SCommitInfo *pInfo = (SCommitInfo *)param;
tsem_wait(&pVnode->canCommit);
// compact
vnodeCompactImpl(pInfo);
pInfo->pVnode = pVnode;
pInfo->flag = 0;
pInfo->commitID = ++pVnode->state.commitID;
// end compact
tsem_post(&pInfo->pVnode->canCommit);
char dir[TSDB_FILENAME_LEN] = {0};
SVnodeInfo info = {0};
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
vnodeLoadInfo(dir, &info);
info.state.commitID = pInfo->commitID;
vnodeSaveInfo(dir, &info);
_exit:
taosMemoryFree(pInfo);
if (code) {
vError("vgId:%d %s failed at line %d since %s, commit ID:%d", TD_VID(pVnode), __func__, lino, tstrerror(code),
pVnode->state.commitID);
} else {
vDebug("vgId:%d %s done, commit ID:%d", TD_VID(pVnode), __func__, pVnode->state.commitID);
}
return code;
}
int32_t vnodeAsyncCompact(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
// schedule compact task
SCommitInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
if (pInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
TSDB_CHECK_CODE(code, lino, _exit);
}
vnodePrepareCommit(pVnode, pInfo);
vnodeAsyncCommit(pVnode);
code = vnodePrepareCompact(pVnode, pInfo);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeScheduleTask(vnodeCompactTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
if (pInfo) taosMemoryFree(pInfo);
} else {
vInfo("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
}
......
......@@ -49,7 +49,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
info.state.commitID = 0;
vInfo("vgId:%d, save config while create", pCfg->vgId);
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir, &info) < 0) {
if (vnodeSaveInfo(dir, &info) < 0 || vnodeCommitInfo(dir) < 0) {
vError("vgId:%d, failed to save vnode config since %s", pCfg ? pCfg->vgId : 0, tstrerror(terrno));
return -1;
}
......@@ -97,7 +97,7 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *p
return -1;
}
ret = vnodeCommitInfo(dir, &info);
ret = vnodeCommitInfo(dir);
if (ret < 0) {
vError("vgId:%d, failed to commit vnode config since %s", pReq->vgId, tstrerror(terrno));
return -1;
......@@ -198,7 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return -1;
}
ret = vnodeCommitInfo(dir, &info);
ret = vnodeCommitInfo(dir);
if (ret < 0) {
vError("vgId:%d, failed to commit vnode config since %s", pReq->dstVgId, tstrerror(terrno));
return -1;
......@@ -257,7 +257,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
if (updated) {
vInfo("vgId:%d, save vnode info since dnode info changed", info.config.vgId);
(void)vnodeSaveInfo(dir, &info);
(void)vnodeCommitInfo(dir, &info);
(void)vnodeCommitInfo(dir);
}
// create handle
......
......@@ -349,7 +349,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
snprintf(dir, TSDB_FILENAME_LEN, "%s", pWriter->pVnode->path);
}
vnodeCommitInfo(dir, &pWriter->info);
vnodeCommitInfo(dir);
} else {
vnodeRollback(pWriter->pVnode);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册