diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d7515a14956d7029d909241b2985023d33e58622..499aedfeedc1c0f8be30e0ca741ea97a39f7ef6a 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -324,6 +324,8 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(int nthreads); void tsdbDestroyCommitQueue(); int tsdbSyncCommit(TSDB_REPO_T *repo); +int tsdbIncCommitRef(int vgId); +void tsdbDecCommitRef(int vgId); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 3c158a2201c1641d57b8c0902b2b3a9c1c828c9e..ad4e433274502e66ebabd8d391542d9fdbb2d6f1 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -15,6 +15,7 @@ #include "os.h" #include "tlist.h" +#include "tref.h" #include "tsdbMain.h" typedef struct { @@ -22,6 +23,7 @@ typedef struct { pthread_mutex_t lock; pthread_cond_t queueNotEmpty; int nthreads; + int refCount; SList * queue; pthread_t * threads; } SCommitQueue; @@ -123,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) { while (true) { pNode = tdListPopHead(pQueue->queue); if (pNode == NULL) { - if (pQueue->stop) { + if (pQueue->stop && pQueue->refCount == 0) { pthread_mutex_unlock(&(pQueue->lock)); goto _exit; } else { @@ -145,3 +147,13 @@ static void *tsdbLoopCommit(void *arg) { _exit: return NULL; } + +int tsdbIncCommitRef(int vgId) { + int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1); + tsdbDebug("vgId:%d, inc commit queue ref to %d", refCount); +} + +void tsdbDecCommitRef(int vgId) { + int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); + tsdbDebug("vgId:%d, dec commit queue ref to %d", refCount); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 199619e8514faac9e663914ddf59a3ea0462afde..7813c5217b90c9de99fe14c8aef012c084d6af3c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -355,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); + tsdbIncCommitRef(pVnode->vgId); taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); return TSDB_CODE_SUCCESS; @@ -446,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) { tsem_destroy(&pVnode->sem); free(pVnode); + tsdbDecCommitRef(vgId); int32_t count = taosHashGetSize(tsVnodesHash); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);