From decb17fcb16c38f548bd2a1bed1258ba1400038d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 29 Nov 2022 20:21:33 +0800 Subject: [PATCH] fix: use recursive mutex for relocking ring log buffer in syncNodeDoConfigChange --- source/libs/sync/inc/syncPipeline.h | 1 + source/libs/sync/src/syncCommit.c | 1 + source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/src/syncPipeline.c | 16 +++++++++++++++- source/libs/sync/src/syncReplication.c | 2 +- 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a59b14259a..4208d40a69 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -55,6 +55,7 @@ typedef struct SSyncLogBuffer { int64_t endIndex; int64_t size; TdThreadMutex mutex; + TdThreadMutexAttr attr; } SSyncLogBuffer; // SSyncLogRepMgr diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3377efe12c..07b1101256 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -84,6 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) { } void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + ASSERT(false && "deprecated"); if (pSyncNode == NULL) { sError("pSyncNode is NULL"); return; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2b72c8d287..d809c28090 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1602,7 +1602,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); - syncMaybeAdvanceCommitIndex(pSyncNode); + // syncMaybeAdvanceCommitIndex(pSyncNode); } else { syncNodeBecomeFollower(pSyncNode, tmpbuf); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1fc6798471..e655ed13c8 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -916,11 +916,24 @@ SSyncLogBuffer* syncLogBufferCreate() { ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); - if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { + if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { + sError("failed to init log buffer mutexattr due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) { + sError("failed to set log buffer mutexattr type due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) { sError("failed to init log buffer mutex due to %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + return pBuf; _err: @@ -947,6 +960,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { } syncLogBufferClear(pBuf); (void)taosThreadMutexDestroy(&pBuf->mutex); + (void)taosThreadMutexAttrDestroy(&pBuf->attr); (void)taosMemoryFree(pBuf); return; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index ba9fe5b56a..0f56921ec7 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,7 +49,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { - ASSERT(false && "deplicated"); + ASSERT(false && "deprecated"); // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); -- GitLab