未验证 提交 7dc3c6b4 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20056 from taosdata/FIX/TD-22595-main

fix: synchronize access to pVnode->inUse
...@@ -61,7 +61,7 @@ int32_t tsHeartbeatInterval = 1000; ...@@ -61,7 +61,7 @@ int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsHeartbeatTimeout = 20 * 1000;
// vnode // vnode
int64_t tsVndCommitMaxIntervalMs = 60 * 1000; int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
// monitor // monitor
bool tsEnableMonitor = true; bool tsEnableMonitor = true;
......
...@@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) { ...@@ -94,6 +94,7 @@ int vnodeOpenBufPool(SVnode *pVnode) {
int vnodeCloseBufPool(SVnode *pVnode) { int vnodeCloseBufPool(SVnode *pVnode) {
SVBufPool *pPool; SVBufPool *pPool;
taosThreadMutexLock(&pVnode->mutex);
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) {
pVnode->pPool = pPool->next; pVnode->pPool = pPool->next;
vnodeBufPoolDestroy(pPool); vnodeBufPoolDestroy(pPool);
...@@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) { ...@@ -103,8 +104,9 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pVnode->inUse); vnodeBufPoolDestroy(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
} }
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode)); taosThreadMutexUnlock(&pVnode->mutex);
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0; return 0;
} }
...@@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -244,6 +246,9 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
pVnode->pPool = pPool; pVnode->pPool = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty); taosThreadCondSignal(&pVnode->poolNotEmpty);
if (pVnode->inUse == pPool) {
pVnode->inUse = NULL;
}
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
} }
} }
...@@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) { ...@@ -87,22 +87,21 @@ void vnodeUpdCommitSched(SVnode *pVnode) {
} }
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) {
if (!pVnode->inUse || !osDataSpaceAvailable()) {
return false;
}
SVCommitSched *pSched = &pVnode->commitSched; SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
bool diskAvail = osDataSpaceAvailable();
bool needCommit = false;
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || taosThreadMutexLock(&pVnode->mutex);
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); if (!pVnode->inUse || !diskAvail) {
} goto _out;
int vnodeShouldCommitOld(SVnode *pVnode) {
if (pVnode->inUse) {
return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
} }
return false; needCommit =
(((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
_out:
taosThreadMutexUnlock(&pVnode->mutex);
return needCommit;
} }
int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) { int vnodeSaveInfo(const char *dir, const SVnodeInfo *pInfo) {
...@@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) { ...@@ -259,7 +258,6 @@ static int32_t vnodePrepareCommit(SVnode *pVnode, SCommitInfo *pInfo) {
metaPrepareAsyncCommit(pVnode->pMeta); metaPrepareAsyncCommit(pVnode->pMeta);
vnodeBufPoolUnRef(pVnode->inUse); vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL;
_exit: _exit:
if (code) { if (code) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册