未验证 提交 96d2c5b9 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20927 from taosdata/FIX/TD-23552-main

enh: try to propose vnode commit at vnode closing
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "vmInt.h" #include "vmInt.h"
#include "vnd.h"
SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pVnode = NULL; SVnodeObj *pVnode = NULL;
...@@ -78,6 +79,11 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -78,6 +79,11 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
bool atExit = true;
if (vnodeIsLeader(pVnode->pImpl)) {
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
}
taosThreadRwlockWrlock(&pMgmt->lock); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
......
...@@ -92,7 +92,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -92,7 +92,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode); void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
......
...@@ -96,7 +96,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); ...@@ -96,7 +96,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode, bool atExit);
void vnodeUpdCommitSched(SVnode* pVnode); void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode); void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
...@@ -115,7 +115,6 @@ void vnodeSyncClose(SVnode* pVnode); ...@@ -115,7 +115,6 @@ void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -149,7 +149,7 @@ void vnodeUpdCommitSched(SVnode *pVnode) { ...@@ -149,7 +149,7 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs); pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
} }
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode, bool atExit) {
SVCommitSched *pSched = &pVnode->commitSched; SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable(); bool diskAvail = osDataSpaceAvailable();
...@@ -158,7 +158,8 @@ int vnodeShouldCommit(SVnode *pVnode) { ...@@ -158,7 +158,8 @@ int vnodeShouldCommit(SVnode *pVnode) {
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
if (pVnode->inUse && diskAvail) { if (pVnode->inUse && diskAvail) {
needCommit = needCommit =
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)); ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
((pVnode->inUse->size > 0) && atExit);
} }
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
return needCommit; return needCommit;
......
...@@ -129,8 +129,8 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak ...@@ -129,8 +129,8 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
return code; return code;
} }
void vnodeProposeCommitOnNeed(SVnode *pVnode) { void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit) {
if (!vnodeShouldCommit(pVnode)) { if (!vnodeShouldCommit(pVnode, atExit)) {
return; return;
} }
...@@ -145,18 +145,20 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) { ...@@ -145,18 +145,20 @@ void vnodeProposeCommitOnNeed(SVnode *pVnode) {
rpcMsg.pCont = pHead; rpcMsg.pCont = pHead;
rpcMsg.info.noResp = 1; rpcMsg.info.noResp = 1;
vInfo("vgId:%d, propose vnode commit", pVnode->config.vgId);
bool isWeak = false; bool isWeak = false;
if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
goto _out;
}
vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId); if (!atExit) {
if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
}
rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL;
} else {
tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
_out:
vnodeUpdCommitSched(pVnode); vnodeUpdCommitSched(pVnode);
rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL;
} }
#if BATCH_ENABLE #if BATCH_ENABLE
...@@ -236,7 +238,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -236,7 +238,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode); bool atExit = false;
vnodeProposeCommitOnNeed(pVnode, atExit);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
...@@ -288,7 +291,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -288,7 +291,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode); bool atExit = false;
vnodeProposeCommitOnNeed(pVnode, atExit);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册