提交 043cc110 编写于 作者: H Hongze Cheng

more code

上级 3fc30c99
...@@ -15,12 +15,11 @@ ...@@ -15,12 +15,11 @@
#include "vnd.h" #include "vnd.h"
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeCommitImpl(void *arg);
static void vnodeWaitCommit(SVnode *pVnode); static void vnodeWaitCommit(SVnode *pVnode);
int vnodeBegin(SVnode *pVnode) { int vnodeBegin(SVnode *pVnode) {
...@@ -107,7 +106,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { ...@@ -107,7 +106,8 @@ int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
// free info binary // free info binary
taosMemoryFree(data); taosMemoryFree(data);
vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname, pInfo->config.syncCfg.replicaNum); vInfo("vgId:%d, vnode info is saved, fname:%s replica:%d", pInfo->config.vgId, fname,
pInfo->config.syncCfg.replicaNum);
return 0; return 0;
...@@ -185,21 +185,61 @@ _err: ...@@ -185,21 +185,61 @@ _err:
return -1; return -1;
} }
typedef struct {
SVnodeInfo info;
SVnode *pVnode;
} SCommitInfo;
static void vnodePrepareCommit(SVnode *pVnode) {
tsem_wait(&pVnode->canCommit);
vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
}
static int32_t vnodeCommitTask(void *arg) {
int32_t code = 0;
SVnode *pVnode = (SVnode *)pVnode;
code = vnodeCommit(pVnode);
if (code) goto _exit;
tsem_post(&pVnode->canCommit);
_exit:
return code;
}
int vnodeAsyncCommit(SVnode *pVnode) { int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit(pVnode); int32_t code = 0;
// vnodeBufPoolSwitch(pVnode); // prepare to commit
// tsdbPrepareCommit(pVnode->pTsdb); vnodePrepareCommit(pVnode);
vnodeScheduleTask(vnodeCommitImpl, pVnode); // schedule the task
SVnodeInfo *pInfo = (SVnodeInfo *)taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pInfo->config = pVnode->config;
pInfo->state.committed = pVnode->state.applied;
pInfo->state.commitTerm = pVnode->state.applyTerm;
pInfo->state.commitID = pVnode->state.commitID;
vnodeScheduleTask(vnodeCommitTask, pVnode);
return 0; _exit:
if (code) {
vError("vgId:%d %s failed since %s, commit id:%" PRId64, TD_VID(pVnode), __func__, tstrerror(code),
pVnode->state.commitID);
} else {
vDebug("vgId:%d %s done", TD_VID(pVnode), __func__);
}
return code;
} }
int vnodeSyncCommit(SVnode *pVnode) { int vnodeSyncCommit(SVnode *pVnode) {
vnodeAsyncCommit(pVnode); vnodeAsyncCommit(pVnode);
vnodeWaitCommit(pVnode); tsem_wait(&pVnode->canCommit);
tsem_post(&(pVnode->canCommit)); tsem_post(&pVnode->canCommit);
return 0; return 0;
} }
...@@ -318,20 +358,6 @@ void vnodeRollback(SVnode *pVnode) { ...@@ -318,20 +358,6 @@ void vnodeRollback(SVnode *pVnode) {
(void)taosRemoveFile(tFName); (void)taosRemoveFile(tFName);
} }
static int vnodeCommitImpl(void *arg) {
SVnode *pVnode = (SVnode *)arg;
// metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq);
// tsdbCommit(pVnode->pTsdb, );
// vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit));
return 0;
}
static FORCE_INLINE void vnodeWaitCommit(SVnode *pVnode) { tsem_wait(&pVnode->canCommit); }
static int vnodeEncodeState(const void *pObj, SJson *pJson) { static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj; const SVState *pState = (SVState *)pObj;
......
...@@ -242,14 +242,14 @@ _err: ...@@ -242,14 +242,14 @@ _err:
return NULL; return NULL;
} }
void vnodePreClose(SVnode *pVnode) { void vnodePreClose(SVnode *pVnode) {
vnodeQueryPreClose(pVnode); vnodeQueryPreClose(pVnode);
vnodeSyncPreClose(pVnode); vnodeSyncPreClose(pVnode);
} }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeCommit(pVnode); vnodeSyncCommit(pVnode);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
walClose(pVnode->pWal); walClose(pVnode->pWal);
......
...@@ -259,7 +259,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr ...@@ -259,7 +259,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
pWriter->ever = ever; pWriter->ever = ever;
// commit it // commit it
code = vnodeCommit(pVnode); code = vnodeSyncCommit(pVnode);
if (code) { if (code) {
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
......
...@@ -311,11 +311,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -311,11 +311,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
_do_commit: _do_commit:
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
// commit current change vnodeAsyncCommit(pVnode);
if (vnodeCommit(pVnode) < 0) {
vError("vgId:%d, failed to commit vnode since %s.", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// start a new one // start a new one
if (vnodeBegin(pVnode) < 0) { if (vnodeBegin(pVnode) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册