diff --git a/source/libs/index/inc/index_cache.h b/source/libs/index/inc/index_cache.h index 58082294a96d3ba726b6556cf6bffd27af2170ac..544849ff2fab8a1018617db04606e4d4df148502 100644 --- a/source/libs/index/inc/index_cache.h +++ b/source/libs/index/inc/index_cache.h @@ -45,6 +45,7 @@ typedef struct IndexCache { uint64_t suid; pthread_mutex_t mtx; + pthread_cond_t finished; } IndexCache; #define CACHE_VERSION(cache) atomic_load_32(&cache->version) diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 9b7f1c83834374c240ac2fbf505cfdd15859a46d..0bf30d6efabb84b3d060ff6639387b6d8db45c52 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -399,6 +399,8 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; } indexInfo("suid %" PRIu64 " merge cache into tindex", sIdx->suid); + int64_t st = taosGetTimestampUs(); + IndexCache* pCache = (IndexCache*)cache; TFileReader* pReader = tfileGetReaderByCol(sIdx->tindex, pCache->suid, pCache->colName); if (pReader == NULL) { indexWarn("empty tfile reader found"); } @@ -458,6 +460,7 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { } int ret = indexGenTFile(sIdx, pCache, result); indexDestroyTempResult(result); + indexCacheDestroyImm(pCache); indexCacheIteratorDestroy(cacheIter); @@ -465,7 +468,14 @@ int indexFlushCacheTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); indexCacheUnRef(pCache); - return 0; + + int64_t cost = taosGetTimestampUs() - st; + if (ret != 0) { + indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); + } else { + indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); + } + return ret; } void iterateValueDestroy(IterateValue* value, bool destroy) { if (destroy) { @@ -506,7 +516,10 @@ static int indexGenTFile(SIndex* sIdx, IndexCache* cache, SArray* batch) { pthread_mutex_unlock(&sIdx->mtx); return ret; END: - tfileWriterClose(tw); + if (tw != NULL) { + writerCtxDestroy(tw->ctx, true); + free(tw); + } return -1; } diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 7a4beb21192857a8fccc5e9f29b25255041632a1..d43c9f9cce1a421b80995024458417ca6a30c769 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -53,7 +53,10 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in cache->version = 0; cache->suid = suid; cache->occupiedMem = 0; + pthread_mutex_init(&cache->mtx, NULL); + pthread_cond_init(&cache->finished, NULL); + indexCacheRef(cache); return cache; } @@ -124,6 +127,7 @@ void indexCacheDestroyImm(IndexCache* cache) { pthread_mutex_lock(&cache->mtx); tbl = cache->imm; cache->imm = NULL; // or throw int bg thread + pthread_cond_broadcast(&cache->finished); pthread_mutex_unlock(&cache->mtx); indexMemUnRef(tbl); @@ -136,6 +140,9 @@ void indexCacheDestroy(void* cache) { indexMemUnRef(pCache->imm); free(pCache->colName); + pthread_mutex_destroy(&pCache->mtx); + pthread_cond_destroy(&pCache->finished); + free(pCache); } @@ -180,9 +187,10 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { break; } else if (cache->imm != NULL) { // TODO: wake up by condition variable - pthread_mutex_unlock(&cache->mtx); - taosMsleep(50); - pthread_mutex_lock(&cache->mtx); + // pthread_mutex_unlock(&cache->mtx); + pthread_cond_wait(&cache->finished, &cache->mtx); + // taosMsleep(50); + // pthread_mutex_lock(&cache->mtx); } else { indexCacheRef(cache); cache->imm = cache->mem; diff --git a/source/libs/index/test/indexTests.cc b/source/libs/index/test/indexTests.cc index 9c92af26a23a6a994ae3d7b20d36857d52f0b656..5438f88b7688e8c5f6c0c6251400ad73b4296f5f 100644 --- a/source/libs/index/test/indexTests.cc +++ b/source/libs/index/test/indexTests.cc @@ -28,7 +28,7 @@ #include "tutil.h" using namespace std; -#define NUM_OF_THREAD 5 +#define NUM_OF_THREAD 10 class DebugInfo { public: @@ -882,8 +882,8 @@ static void single_write_and_search(IndexObj* idx) { static void multi_write_and_search(IndexObj* idx) { int target = idx->SearchOne("tag1", "Hello"); target = idx->SearchOne("tag2", "Test"); - idx->WriteMultiMillonData("tag1", "Hello", 100 * 10000); - idx->WriteMultiMillonData("tag2", "Test", 100 * 10000); + idx->WriteMultiMillonData("tag1", "hello world test", 100 * 10000); + idx->WriteMultiMillonData("tag2", "world test nothing", 100 * 10000); } TEST_F(IndexEnv2, testIndex_serarch_cache_and_tfile) { std::string path = "/tmp/cache_and_tfile";