提交 5a0e7dfa 编写于 作者: dengyihao's avatar dengyihao

enh: refator index/transport code

上级 8f75b96e
...@@ -146,6 +146,6 @@ option( ...@@ -146,6 +146,6 @@ option(
option( option(
BUILD_WITH_INVERTEDINDEX BUILD_WITH_INVERTEDINDEX
"If use invertedIndex" "If use invertedIndex"
ON OFF
) )
...@@ -65,6 +65,8 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in ...@@ -65,6 +65,8 @@ IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, in
void indexCacheForceToMerge(void* cache); void indexCacheForceToMerge(void* cache);
void indexCacheDestroy(void* cache); void indexCacheDestroy(void* cache);
void indexCacheBroadcast(void* cache);
void indexCacheWait(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache); Iterate* indexCacheIteratorCreate(IndexCache* cache);
void indexCacheIteratorDestroy(Iterate* iiter); void indexCacheIteratorDestroy(Iterate* iiter);
......
...@@ -58,6 +58,7 @@ struct SIndex { ...@@ -58,6 +58,7 @@ struct SIndex {
SIndexStat stat; SIndexStat stat;
TdThreadMutex mtx; TdThreadMutex mtx;
bool quit;
}; };
struct SIndexOpts { struct SIndexOpts {
......
...@@ -124,29 +124,28 @@ END: ...@@ -124,29 +124,28 @@ END:
void indexDestroy(void* handle) { void indexDestroy(void* handle) {
SIndex* sIdx = handle; SIndex* sIdx = handle;
// indexAcquireRef(sIdx->refId); taosThreadMutexDestroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx);
return;
}
void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
bool ref = 0;
if (sIdx->colObj != NULL) { if (sIdx->colObj != NULL) {
void* iter = taosHashIterate(sIdx->colObj, NULL); void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) { while (iter) {
IndexCache** pCache = iter; IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache)); indexCacheForceToMerge((void*)(*pCache));
indexCacheWait((void*)(*pCache));
iter = taosHashIterate(sIdx->colObj, iter); iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache); indexCacheUnRef(*pCache);
} }
taosHashCleanup(sIdx->colObj); taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL; sIdx->colObj = NULL;
return; }
} // indexReleaseRef(sIdx->refId); // taosMsleep(1000 * 5);
taosThreadMutexDestroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path);
taosMemoryFree(sIdx);
return;
}
void indexClose(SIndex* sIdx) {
indexReleaseRef(sIdx->refId);
indexRemoveRef(sIdx->refId); indexRemoveRef(sIdx->refId);
} }
int64_t indexAddRef(void* p) { int64_t indexAddRef(void* p) {
...@@ -183,6 +182,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -183,6 +182,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*)); taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
} }
} }
taosThreadMutexUnlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) { for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i); SIndexTerm* p = taosArrayGetP(fVals, i);
...@@ -198,7 +198,6 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) { ...@@ -198,7 +198,6 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
return ret; return ret;
} }
} }
taosThreadMutexUnlock(&index->mtx);
return 0; return 0;
} }
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) { int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
...@@ -461,6 +460,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -461,6 +460,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexError("%p immtable is empty, ignore merge opera", pCache); indexError("%p immtable is empty, ignore merge opera", pCache);
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
if (sIdx->quit) {
indexCacheBroadcast(pCache);
}
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
return 0; return 0;
} }
...@@ -507,6 +509,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -507,6 +509,9 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexDestroyFinalResult(result); indexDestroyFinalResult(result);
indexCacheDestroyImm(pCache); indexCacheDestroyImm(pCache);
if (sIdx->quit) {
indexCacheBroadcast(pCache);
}
indexCacheIteratorDestroy(cacheIter); indexCacheIteratorDestroy(cacheIter);
tfileIteratorDestroy(tfileIter); tfileIteratorDestroy(tfileIter);
...@@ -521,6 +526,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -521,6 +526,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000); indexInfo("success to merge , time cost: %" PRId64 "ms", cost / 1000);
} }
indexReleaseRef(sIdx->refId); indexReleaseRef(sIdx->refId);
return ret; return ret;
} }
void iterateValueDestroy(IterateValue* value, bool destroy) { void iterateValueDestroy(IterateValue* value, bool destroy) {
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#define MEM_TERM_LIMIT 10 * 10000 #define MEM_TERM_LIMIT 10 * 10000
#define MEM_THRESHOLD 64 * 1024 #define MEM_THRESHOLD 64 * 1024
#define MEM_SIGNAL_QUIT MEM_THRESHOLD * 20
#define MEM_ESTIMATE_RADIO 1.5 #define MEM_ESTIMATE_RADIO 1.5
static void indexMemRef(MemTable* tbl); static void indexMemRef(MemTable* tbl);
...@@ -396,17 +397,24 @@ void indexCacheDestroySkiplist(SSkipList* slt) { ...@@ -396,17 +397,24 @@ void indexCacheDestroySkiplist(SSkipList* slt) {
tSkipListDestroyIter(iter); tSkipListDestroyIter(iter);
tSkipListDestroy(slt); tSkipListDestroy(slt);
} }
void indexCacheBroadcast(void* cache) {
IndexCache* pCache = cache;
taosThreadCondBroadcast(&pCache->finished);
}
void indexCacheWait(void* cache) {
IndexCache* pCache = cache;
taosThreadCondWait(&pCache->finished, &pCache->mtx);
}
void indexCacheDestroyImm(IndexCache* cache) { void indexCacheDestroyImm(IndexCache* cache) {
if (cache == NULL) { if (cache == NULL) {
return; return;
} }
MemTable* tbl = NULL; MemTable* tbl = NULL;
taosThreadMutexLock(&cache->mtx); taosThreadMutexLock(&cache->mtx);
tbl = cache->imm; tbl = cache->imm;
cache->imm = NULL; // or throw int bg thread cache->imm = NULL; // or throw int bg thread
taosThreadCondBroadcast(&cache->finished); indexCacheBroadcast(cache);
taosThreadMutexUnlock(&cache->mtx); taosThreadMutexUnlock(&cache->mtx);
...@@ -460,11 +468,13 @@ void indexCacheIteratorDestroy(Iterate* iter) { ...@@ -460,11 +468,13 @@ void indexCacheIteratorDestroy(Iterate* iter) {
taosMemoryFree(iter); taosMemoryFree(iter);
} }
int indexCacheSchedToMerge(IndexCache* pCache) { int indexCacheSchedToMerge(IndexCache* pCache, bool notify) {
SSchedMsg schedMsg = {0}; SSchedMsg schedMsg = {0};
schedMsg.fp = doMergeWork; schedMsg.fp = doMergeWork;
schedMsg.ahandle = pCache; schedMsg.ahandle = pCache;
schedMsg.thandle = NULL; if (notify) {
schedMsg.thandle = taosMemoryMalloc(1);
}
schedMsg.msg = NULL; schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId); indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg); taosScheduleTask(indexQhandle, &schedMsg);
...@@ -477,8 +487,10 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -477,8 +487,10 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
break; break;
} else if (cache->imm != NULL) { } else if (cache->imm != NULL) {
// TODO: wake up by condition variable // TODO: wake up by condition variable
taosThreadCondWait(&cache->finished, &cache->mtx); indexCacheWait(cache);
} else { } else {
bool notifyQuit = cache->occupiedMem >= MEM_SIGNAL_QUIT ? true : false;
indexCacheRef(cache); indexCacheRef(cache);
cache->imm = cache->mem; cache->imm = cache->mem;
cache->mem = indexInternalCacheCreate(cache->type); cache->mem = indexInternalCacheCreate(cache->type);
...@@ -486,7 +498,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) { ...@@ -486,7 +498,7 @@ static void indexCacheMakeRoomForWrite(IndexCache* cache) {
cache->occupiedMem = 0; cache->occupiedMem = 0;
// sched to merge // sched to merge
// unref cache in bgwork // unref cache in bgwork
indexCacheSchedToMerge(cache); indexCacheSchedToMerge(cache, notifyQuit);
} }
} }
} }
...@@ -538,7 +550,7 @@ void indexCacheForceToMerge(void* cache) { ...@@ -538,7 +550,7 @@ void indexCacheForceToMerge(void* cache) {
taosThreadMutexLock(&pCache->mtx); taosThreadMutexLock(&pCache->mtx);
indexInfo("%p is forced to merge into tfile", pCache); indexInfo("%p is forced to merge into tfile", pCache);
pCache->occupiedMem += MEM_THRESHOLD * 5; pCache->occupiedMem += MEM_SIGNAL_QUIT;
indexCacheMakeRoomForWrite(pCache); indexCacheMakeRoomForWrite(pCache);
taosThreadMutexUnlock(&pCache->mtx); taosThreadMutexUnlock(&pCache->mtx);
...@@ -703,6 +715,9 @@ static MemTable* indexInternalCacheCreate(int8_t type) { ...@@ -703,6 +715,9 @@ static MemTable* indexInternalCacheCreate(int8_t type) {
static void doMergeWork(SSchedMsg* msg) { static void doMergeWork(SSchedMsg* msg) {
IndexCache* pCache = msg->ahandle; IndexCache* pCache = msg->ahandle;
SIndex* sidx = (SIndex*)pCache->index; SIndex* sidx = (SIndex*)pCache->index;
sidx->quit = msg->thandle ? true : false;
taosMemoryFree(msg->thandle);
indexFlushCacheToTFile(sidx, pCache); indexFlushCacheToTFile(sidx, pCache);
} }
static bool indexCacheIteratorNext(Iterate* itera) { static bool indexCacheIteratorNext(Iterate* itera) {
......
...@@ -834,7 +834,10 @@ class IndexObj { ...@@ -834,7 +834,10 @@ class IndexObj {
class IndexEnv2 : public ::testing::Test { class IndexEnv2 : public ::testing::Test {
protected: protected:
virtual void SetUp() { index = new IndexObj(); } virtual void SetUp() {
initLog();
index = new IndexObj();
}
virtual void TearDown() { delete index; } virtual void TearDown() { delete index; }
IndexObj* index; IndexObj* index;
}; };
...@@ -906,6 +909,29 @@ TEST_F(IndexEnv2, testIndexOpen) { ...@@ -906,6 +909,29 @@ TEST_F(IndexEnv2, testIndexOpen) {
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
} }
TEST_F(IndexEnv2, testEmptyIndexOpen) {
std::string path = "/tmp/test";
if (index->Init(path) != 0) {
std::cout << "failed to init index" << std::endl;
exit(1);
}
int targetSize = 1;
{
std::string colName("tag1"), colVal("Hello");
SIndexTerm* term = indexTermCreate(0, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < targetSize; i++) {
int tableId = i;
int ret = index->Put(terms, tableId);
assert(ret == 0);
}
indexMultiTermDestroy(terms);
}
}
TEST_F(IndexEnv2, testIndex_TrigeFlush) { TEST_F(IndexEnv2, testIndex_TrigeFlush) {
std::string path = "/tmp/testxxx"; std::string path = "/tmp/testxxx";
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册