diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d7515a14956d7029d909241b2985023d33e58622..0cdb24a4da2bcb63b94f1d668eb79e8494704f03 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -324,6 +324,8 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(int nthreads); void tsdbDestroyCommitQueue(); int tsdbSyncCommit(TSDB_REPO_T *repo); +void tsdbIncCommitRef(int vgId); +void tsdbDecCommitRef(int vgId); #ifdef __cplusplus } diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 6eb4515f3098f89991640d2fdee2b0aca47c1703..23fc88b8e1f5d4c0e2fe6b2140278b1e32d35abb 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -132,7 +132,7 @@ int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { // if (leftbytes > 1000000000) leftbytes = 1000000000; sentbytes = sendfile(dfd, sfd, offset, leftbytes); if (sentbytes == -1) { - if (errno == EINTR) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; } else { return -1; diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 5c92801cc389df6745377d81b4110ccf8fbc8441..f1ce1d43aa01f5e903f220b472be0090ceaecae5 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -1222,7 +1222,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle // always update version nodeVersion = pWalHead->version; - sDebug("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, + sTrace("vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%" PRIu64, pNode->vgId, pNode->replica, syncRole[nodeRole], qtypeStr[qtype], pWalHead->version); if (pNode->replica == 1 || nodeRole != TAOS_SYNC_ROLE_MASTER) return 0; diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 3c158a2201c1641d57b8c0902b2b3a9c1c828c9e..c86b8f32b7ff6bfb30e7b734c7b38504200e44e6 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -15,6 +15,7 @@ #include "os.h" #include "tlist.h" +#include "tref.h" #include "tsdbMain.h" typedef struct { @@ -22,6 +23,7 @@ typedef struct { pthread_mutex_t lock; pthread_cond_t queueNotEmpty; int nthreads; + int refCount; SList * queue; pthread_t * threads; } SCommitQueue; @@ -103,7 +105,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo) { pthread_mutex_lock(&(pQueue->lock)); - ASSERT(!pQueue->stop); + // ASSERT(pQueue->stop); tdListAppendNode(pQueue->queue, pNode); pthread_cond_signal(&(pQueue->queueNotEmpty)); @@ -123,7 +125,7 @@ static void *tsdbLoopCommit(void *arg) { while (true) { pNode = tdListPopHead(pQueue->queue); if (pNode == NULL) { - if (pQueue->stop) { + if (pQueue->stop && pQueue->refCount <= 0) { pthread_mutex_unlock(&(pQueue->lock)); goto _exit; } else { @@ -145,3 +147,14 @@ static void *tsdbLoopCommit(void *arg) { _exit: return NULL; } + +void tsdbIncCommitRef(int vgId) { + int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1); + tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount); +} + +void tsdbDecCommitRef(int vgId) { + int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); + pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty)); + tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); +} \ No newline at end of file diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 199619e8514faac9e663914ddf59a3ea0462afde..7813c5217b90c9de99fe14c8aef012c084d6af3c 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -355,6 +355,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->status = TAOS_VN_STATUS_READY; vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); + tsdbIncCommitRef(pVnode->vgId); taosHashPut(tsVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); return TSDB_CODE_SUCCESS; @@ -446,6 +447,7 @@ void vnodeRelease(void *pVnodeRaw) { tsem_destroy(&pVnode->sem); free(pVnode); + tsdbDecCommitRef(vgId); int32_t count = taosHashGetSize(tsVnodesHash); vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);