提交 a21e0fe7 编写于 作者: B Benguang Zhao

enh: propose vnode commit synchronously

上级 55c6f115
...@@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) { ...@@ -77,7 +77,7 @@ static inline bool tmsgIsValid(tmsg_t type) {
} }
static inline bool vnodeIsMsgBlock(tmsg_t type) { static inline bool vnodeIsMsgBlock(tmsg_t type) {
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM) || (type == TDMT_VND_COMMIT);
} }
static inline bool syncUtilUserCommit(tmsg_t msgType) { static inline bool syncUtilUserCommit(tmsg_t msgType) {
......
...@@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); ...@@ -97,7 +97,6 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit); int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode); void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir); int32_t vnodeCommitInfo(const char* dir);
......
...@@ -378,7 +378,6 @@ struct SVnode { ...@@ -378,7 +378,6 @@ struct SVnode {
STQ* pTq; STQ* pTq;
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
SVCommitSched commitSched;
int64_t sync; int64_t sync;
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;
...@@ -387,9 +386,6 @@ struct SVnode { ...@@ -387,9 +386,6 @@ struct SVnode {
int32_t blockSec; int32_t blockSec;
int64_t blockSeq; int64_t blockSeq;
SQHandle* pQuery; SQHandle* pQuery;
#if 0
SRpcHandleInfo blockInfo;
#endif
}; };
#define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
...@@ -143,23 +143,13 @@ _exit: ...@@ -143,23 +143,13 @@ _exit:
return code; return code;
} }
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
}
int vnodeShouldCommit(SVnode *pVnode, bool atExit) { int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable(); bool diskAvail = osDataSpaceAvailable();
bool needCommit = false; bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) { if (pVnode->inUse && diskAvail) {
needCommit = needCommit = (pVnode->inUse->size > pVnode->inUse->node.size) || (pVnode->inUse->size > 0 && atExit);
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
((pVnode->inUse->size > 0) && atExit);
} }
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
return needCommit; return needCommit;
...@@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -431,8 +421,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm); pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
vnodeUpdCommitSched(pVnode);
// persist wal before starting // persist wal before starting
if (walPersist(pVnode->pWal) < 0) { if (walPersist(pVnode->pWal) < 0) {
vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr()); vError("vgId:%d, failed to persist wal since %s", TD_VID(pVnode), terrstr());
......
...@@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -286,8 +286,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode); int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool // open buffer pool
......
...@@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak ...@@ -112,9 +112,6 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode->blocked = true; pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec(); pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq; pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
...@@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) { ...@@ -157,8 +154,6 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
} else { } else {
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
} }
vnodeUpdCommitSched(pVnode);
} }
#if BATCH_ENABLE #if BATCH_ENABLE
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册