提交 69536887 编写于 作者: H Hongze Cheng

add commit snapshot

上级 a1a498a1
...@@ -62,6 +62,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); ...@@ -62,6 +62,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
int64_t vnodeGetSyncHandle(SVnode *pVnode); int64_t vnodeGetSyncHandle(SVnode *pVnode);
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
......
...@@ -74,7 +74,7 @@ typedef struct { ...@@ -74,7 +74,7 @@ typedef struct {
// SVState // SVState
struct SVState { struct SVState {
int64_t processed; // int64_t processed;
int64_t committed; int64_t committed;
int64_t applied; int64_t applied;
}; };
......
...@@ -158,12 +158,23 @@ int vnodeSyncCommit(SVnode *pVnode) { ...@@ -158,12 +158,23 @@ int vnodeSyncCommit(SVnode *pVnode) {
} }
static int vnodeCommit(void *arg) { static int vnodeCommit(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
char dir[TSDB_FILENAME_LEN];
SVnodeInfo info = {0};
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
info.config = pVnode->config;
info.state.committed = pVnode->state.applied;
info.state.applied = pVnode->state.applied;
vnodeSaveInfo(dir, &info);
// metaCommit(pVnode->pMeta); // metaCommit(pVnode->pMeta);
tqCommit(pVnode->pTq); tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb); tsdbCommit(pVnode->pTsdb);
vnodeCommitInfo(dir, &info);
vnodeBufPoolRecycle(pVnode); vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit)); tsem_post(&(pVnode->canCommit));
return 0; return 0;
...@@ -185,6 +196,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) { ...@@ -185,6 +196,7 @@ static int vnodeEncodeState(const void *pObj, SJson *pJson) {
const SVState *pState = (SVState *)pObj; const SVState *pState = (SVState *)pObj;
if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonAddIntegerToObject(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "applied version", pState->applied) < 0) return -1;
return 0; return 0;
} }
...@@ -193,6 +205,7 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) { ...@@ -193,6 +205,7 @@ static int vnodeDecodeState(const SJson *pJson, void *pObj) {
SVState *pState = (SVState *)pObj; SVState *pState = (SVState *)pObj;
if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1; if (tjsonGetNumberValue(pJson, "commit version", pState->committed) < 0) return -1;
if (tjsonGetNumberValue(pJson, "applied version", pState->applied) < 0) return -1;
return 0; return 0;
} }
......
...@@ -75,8 +75,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -75,8 +75,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
pVnode->path = (char *)&pVnode[1]; pVnode->path = (char *)&pVnode[1];
strcpy(pVnode->path, path); strcpy(pVnode->path, path);
pVnode->config = info.config; pVnode->config = info.config;
pVnode->state.committed = info.state.committed; pVnode->state = info.state;
pVnode->state.processed = pVnode->state.applied = pVnode->state.committed;
pVnode->pTfs = pTfs; pVnode->pTfs = pTfs;
pVnode->msgCb = msgCb; pVnode->msgCb = msgCb;
...@@ -170,4 +169,6 @@ void vnodeClose(SVnode *pVnode) { ...@@ -170,4 +169,6 @@ void vnodeClose(SVnode *pVnode) {
} }
} }
int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; }
\ No newline at end of file
void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; }
\ No newline at end of file
...@@ -23,6 +23,7 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); ...@@ -23,6 +23,7 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp); static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
#if 0
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc; SRpcMsg *pRpc;
...@@ -40,6 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) { ...@@ -40,6 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
walFsync(pVnode->pWal, false); walFsync(pVnode->pWal, false);
#endif
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册