提交 7516ecae 编写于 作者: H Hongze Cheng

refact more vnode

上级 95b1e954
...@@ -72,6 +72,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); ...@@ -72,6 +72,7 @@ int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ==================== // vnodeCommit ====================
int vnodeBegin(SVnode* pVnode); int vnodeBegin(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode); int vnodeShouldCommit(SVnode* pVnode);
int vnodeCommit(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
......
...@@ -27,3 +27,5 @@ int metaBegin(SMeta *pMeta) { ...@@ -27,3 +27,5 @@ int metaBegin(SMeta *pMeta) {
return 0; return 0;
} }
int metaCommit(SMeta *pMeta) { return tdbCommit(pMeta->pEnv, &pMeta->txn); }
...@@ -22,7 +22,7 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); ...@@ -22,7 +22,7 @@ static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo); static int vnodeDecodeInfo(uint8_t *pData, SVnodeInfo *pInfo);
static int vnodeStartCommit(SVnode *pVnode); static int vnodeStartCommit(SVnode *pVnode);
static int vnodeEndCommit(SVnode *pVnode); static int vnodeEndCommit(SVnode *pVnode);
static int vnodeCommit(void *arg); static int vnodeCommitImpl(void *arg);
static void vnodeWaitCommit(SVnode *pVnode); static void vnodeWaitCommit(SVnode *pVnode);
int vnodeBegin(SVnode *pVnode) { int vnodeBegin(SVnode *pVnode) {
...@@ -55,13 +55,7 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -55,13 +55,7 @@ int vnodeBegin(SVnode *pVnode) {
return 0; return 0;
} }
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) { return pVnode->inUse->size > pVnode->config.szBuf / 3; }
if (pVnode->inUse->size > pVnode->config.szBuf / 3) {
return 1;
}
return 0;
}
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
char fname[TSDB_FILENAME_LEN]; char fname[TSDB_FILENAME_LEN];
...@@ -183,7 +177,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { ...@@ -183,7 +177,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
// vnodeBufPoolSwitch(pVnode); // vnodeBufPoolSwitch(pVnode);
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
vnodeScheduleTask(vnodeCommit, pVnode); vnodeScheduleTask(vnodeCommitImpl, pVnode);
return 0; return 0;
} }
...@@ -195,7 +189,53 @@ int vnodeSyncCommit(SVnode *pVnode) { ...@@ -195,7 +189,53 @@ int vnodeSyncCommit(SVnode *pVnode) {
return 0; return 0;
} }
static int vnodeCommit(void *arg) { int vnodeCommit(SVnode *pVnode) {
SVnodeInfo info;
char dir[TSDB_FILENAME_LEN];
pVnode->onCommit = pVnode->inUse;
pVnode->inUse = NULL;
// save info
info.config = pVnode->config;
info.state = pVnode->state;
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path);
if (vnodeSaveInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
// commit each sub-system
if (metaCommit(pVnode->pMeta) < 0) {
ASSERT(0);
return -1;
}
if (tsdbCommit(pVnode->pTsdb) < 0) {
ASSERT(0);
return -1;
}
if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0);
return -1;
}
// walCommit (TODO)
// commit info
if (vnodeCommitInfo(dir, &info) < 0) {
ASSERT(0);
return -1;
}
// apply the commit (TODO)
vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit;
pVnode->onCommit = NULL;
return 0;
}
static int vnodeCommitImpl(void *arg) {
SVnode *pVnode = (SVnode *)arg; SVnode *pVnode = (SVnode *)arg;
// metaCommit(pVnode->pMeta); // metaCommit(pVnode->pMeta);
......
...@@ -136,12 +136,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -136,12 +136,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vDebug("vgId: %d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vDebug("vgId: %d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
// Check if it needs to commit // commit if need
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
// tsem_wait(&(pVnode->canCommit)); // commit current change
if (vnodeAsyncCommit(pVnode) < 0) { vnodeCommit(pVnode);
// TODO: handle error
} // start a new one
vnodeBegin(pVnode);
} }
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册