diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index a59b14259a5a24db2235b2c91365a08b20c5caac..4208d40a69584e4fcd7df0e9e4a5f853ed6f62b6 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -55,6 +55,7 @@ typedef struct SSyncLogBuffer { int64_t endIndex; int64_t size; TdThreadMutex mutex; + TdThreadMutexAttr attr; } SSyncLogBuffer; // SSyncLogRepMgr diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 3377efe12c9eb20c60269a983fc784b6d2f63de4..07b1101256a047d7de1442086d605f7b8da2a26c 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -84,6 +84,7 @@ void syncOneReplicaAdvance(SSyncNode* pSyncNode) { } void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { + ASSERT(false && "deprecated"); if (pSyncNode == NULL) { sError("pSyncNode is NULL"); return; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2b72c8d287b257b8bb78761252d122578d94c35b..d809c28090f8f8f849882862b75edc61c5bae265 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1602,7 +1602,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // Raft 3.6.2 Committing entries from previous terms syncNodeAppendNoop(pSyncNode); - syncMaybeAdvanceCommitIndex(pSyncNode); + // syncMaybeAdvanceCommitIndex(pSyncNode); } else { syncNodeBecomeFollower(pSyncNode, tmpbuf); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 1fc6798471f18f542bc60631f2d7f1f3259a587c..e655ed13c8b6e3631b94e2abd8332789ae978e11 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -916,11 +916,24 @@ SSyncLogBuffer* syncLogBufferCreate() { ASSERT(pBuf->size == TSDB_SYNC_LOG_BUFFER_SIZE); - if (taosThreadMutexInit(&pBuf->mutex, NULL) < 0) { + if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { + sError("failed to init log buffer mutexattr due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) { + sError("failed to set log buffer mutexattr type due to %s", strerror(errno)); + terrno = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) { sError("failed to init log buffer mutex due to %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); goto _err; } + return pBuf; _err: @@ -947,6 +960,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { } syncLogBufferClear(pBuf); (void)taosThreadMutexDestroy(&pBuf->mutex); + (void)taosThreadMutexAttrDestroy(&pBuf->attr); (void)taosMemoryFree(pBuf); return; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index ba9fe5b56abaa8be2562c3e408ce2ce9df61ebcb..0f56921ec77570e85c081837e596071aeecbe483 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,7 +49,7 @@ int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg); int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { - ASSERT(false && "deplicated"); + ASSERT(false && "deprecated"); // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);