提交 129c692f 编写于 作者: H Hongze Cheng

more async commit

上级 6cb65f33
......@@ -77,7 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryPreClose(SVnode *pVnode);
void vnodeQueryPreClose(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
......@@ -86,7 +86,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode);
int32_t vnodeCommit(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
......
......@@ -154,6 +154,7 @@ int32_t metaGetStbStats(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg, int8_t rollback);
int tsdbClose(STsdb** pTsdb);
int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbPrepareCommit(STsdb* pTsdb);
int32_t tsdbCommit(STsdb* pTsdb);
int32_t tsdbFinishCommit(STsdb* pTsdb);
int32_t tsdbRollbackCommit(STsdb* pTsdb);
......
......@@ -150,20 +150,26 @@ _exit:
return code;
}
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
return 0;
}
int32_t tsdbCommit(STsdb *pTsdb) {
if (!pTsdb) return 0;
int32_t code = 0;
int32_t lino = 0;
SCommitter commith;
SMemTable *pMemTable = pTsdb->mem;
SMemTable *pMemTable = pTsdb->imem;
// check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
tsdbUnrefMemTable(pMemTable);
goto _exit;
}
......@@ -811,12 +817,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
int32_t lino = 0;
memset(pCommitter, 0, sizeof(*pCommitter));
ASSERT(pTsdb->mem && pTsdb->imem == NULL && "last tsdb commit incomplete");
taosThreadRwlockWrlock(&pTsdb->rwLock);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
ASSERT(pTsdb->imem && "last tsdb commit incomplete");
pCommitter->pTsdb = pTsdb;
pCommitter->commitID = pTsdb->pVnode->state.commitID;
......
......@@ -15,12 +15,17 @@
#include "vnd.h"
typedef struct {
SVnodeInfo info;
SVnode *pVnode;
} SCommitInfo;
#define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static void vnodeWaitCommit(SVnode *pVnode);
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeCommitImpl(SCommitInfo *pInfo);
int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool
......@@ -185,27 +190,28 @@ _err:
return -1;
}
typedef struct {
SVnodeInfo info;
SVnode *pVnode;
} SCommitInfo;
static void vnodePrepareCommit(SVnode *pVnode) {
tsem_wait(&pVnode->canCommit);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
tsdbPrepareCommit(pVnode->pTsdb);
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
SVnode *pVnode = (SVnode *)pVnode;
SCommitInfo *pInfo = (SCommitInfo *)arg;
code = vnodeCommit(pVnode);
// commit
code = vnodeCommitImpl(pInfo);
if (code) goto _exit;
tsem_post(&pVnode->canCommit);
// end commit
tsem_post(&pInfo->pVnode->canCommit);
_exit:
taosMemoryFree(pInfo);
return code;
}
int vnodeAsyncCommit(SVnode *pVnode) {
......@@ -215,16 +221,18 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodePrepareCommit(pVnode);
// schedule the task
SVnodeInfo *pInfo = (SVnodeInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
pVnode->state.commitTerm = pVnode->state.applyTerm;
SCommitInfo *pInfo = (SCommitInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pInfo->config = pVnode->config;
pInfo->state.committed = pVnode->state.applied;
pInfo->state.commitTerm = pVnode->state.applyTerm;
pInfo->state.commitID = pVnode->state.commitID;
vnodeScheduleTask(vnodeCommitTask, pVnode);
pInfo->info.config = pVnode->config;
pInfo->info.state.committed = pVnode->state.applied;
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
pInfo->info.state.commitID = pVnode->state.commitID;
vnodeScheduleTask(vnodeCommitTask, pInfo);
_exit:
if (code) {
......@@ -243,11 +251,12 @@ int vnodeSyncCommit(SVnode *pVnode) {
return 0;
}
int vnodeCommit(SVnode *pVnode) {
int32_t code = 0;
int32_t lino = 0;
SVnodeInfo info = {0};
char dir[TSDB_FILENAME_LEN];
static int vnodeCommitImpl(SCommitInfo *pInfo) {
int32_t code = 0;
int32_t lino = 0;
char dir[TSDB_FILENAME_LEN] = {0};
SVnode *pVnode = pInfo->pVnode;
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pVnode->state.commitID, pVnode->state.applied, pVnode->state.applyTerm);
......@@ -258,19 +267,13 @@ int vnodeCommit(SVnode *pVnode) {
return -1;
}
pVnode->state.commitTerm = pVnode->state.applyTerm;
// save info
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.commitTerm = pVnode->state.applyTerm;
info.state.commitID = pVnode->state.commitID;
if (pVnode->pTfs) {
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
} else {
snprintf(dir, TSDB_FILENAME_LEN, "%s", pVnode->path);
}
if (vnodeSaveInfo(dir, &info) < 0) {
if (vnodeSaveInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -281,9 +284,6 @@ int vnodeCommit(SVnode *pVnode) {
code = smaPreCommit(pVnode->pSma);
TSDB_CHECK_CODE(code, lino, _exit);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
code = TSDB_CODE_FAILED;
......@@ -304,7 +304,7 @@ int vnodeCommit(SVnode *pVnode) {
}
// commit info
if (vnodeCommitInfo(dir, &info) < 0) {
if (vnodeCommitInfo(dir, &pInfo->info) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
......@@ -322,14 +322,13 @@ int vnodeCommit(SVnode *pVnode) {
TSDB_CHECK_CODE(code, lino, _exit);
}
pVnode->state.committed = info.state.committed;
pVnode->state.committed = pInfo->info.state.committed;
if (smaPostCommit(pVnode->pSma) < 0) {
vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
// apply the commit (TODO)
// walEndSnapshot(pVnode->pWal);
syncEndSnapshot(pVnode->sync);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册