From 55c7d1c1e9430c73d8ac303730feddf0720384b9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 31 May 2022 22:17:18 +0800 Subject: [PATCH] fix: close cause flush result error --- source/libs/index/inc/indexCache.h | 1 + source/libs/index/src/index.c | 9 +++++++-- source/libs/index/src/indexCache.c | 7 +++++-- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/source/libs/index/inc/indexCache.h b/source/libs/index/inc/indexCache.h index 1046a04db3..6e68163d74 100644 --- a/source/libs/index/inc/indexCache.h +++ b/source/libs/index/inc/indexCache.h @@ -36,6 +36,7 @@ typedef struct MemTable { typedef struct IndexCache { T_REF_DECLARE() MemTable *mem, *imm; + int32_t merging; SIndex* index; char* colName; int64_t version; diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 4814cc14f7..3d905303d1 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -462,7 +462,10 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { int64_t st = taosGetTimestampUs(); - IndexCache* pCache = (IndexCache*)cache; + IndexCache* pCache = (IndexCache*)cache; + + while (sIdx->quit && atomic_load_32(&pCache->merging) == 1) { + } TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); if (pReader == NULL) { indexWarn("empty tfile reader found"); @@ -475,9 +478,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); if (sIdx->quit) { indexPost(sIdx); - // indexCacheBroadcast(pCache); } indexReleaseRef(sIdx->refId); + atomic_store_32(&pCache->merging, 0); return 0; } @@ -539,6 +542,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { if (sIdx->quit) { indexPost(sIdx); } + atomic_store_32(&pCache->merging, 0); indexReleaseRef(sIdx->refId); return ret; @@ -605,6 +609,7 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { taosThreadMutexLock(&tf->mtx); tfileCachePut(tf->cache, &key, reader); taosThreadMutexUnlock(&tf->mtx); + return ret; END: if (tw != NULL) { diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 3b33006452..586a3ae573 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -494,16 +494,19 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { // TODO: wake up by condition variable indexCacheWait(cache); } else { - bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false; + bool quit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false; indexCacheRef(cache); cache->imm = cache->mem; cache->mem = indexInternalCacheCreate(cache->type); cache->mem->pCache = cache; cache->occupiedMem = 0; + if (quit == false) { + atomic_store_32(&cache->merging, 1); + } // sched to merge // unref cache in bgwork - indexCacheSchedToMerge(cache, notifyQuit); + indexCacheSchedToMerge(cache, quit); } } } -- GitLab