提交 8f75b96e 编写于 作者: dengyihao's avatar dengyihao

enh: refator index/transport code

上级 4d18fa08
......@@ -63,6 +63,7 @@ typedef struct CacheTerm {
IndexCache* indexCacheCreate(SIndex* idx, uint64_t suid, const char* colName, int8_t type);
void indexCacheForceToMerge(void* cache);
void indexCacheDestroy(void* cache);
Iterate* indexCacheIteratorCreate(IndexCache* cache);
......
......@@ -124,15 +124,21 @@ END:
void indexDestroy(void* handle) {
SIndex* sIdx = handle;
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
if (*pCache) {
// indexAcquireRef(sIdx->refId);
if (sIdx->colObj != NULL) {
void* iter = taosHashIterate(sIdx->colObj, NULL);
while (iter) {
IndexCache** pCache = iter;
indexCacheForceToMerge((void*)(*pCache));
iter = taosHashIterate(sIdx->colObj, iter);
indexCacheUnRef(*pCache);
}
iter = taosHashIterate(sIdx->colObj, iter);
}
taosHashCleanup(sIdx->colObj);
taosHashCleanup(sIdx->colObj);
sIdx->colObj = NULL;
return;
} // indexReleaseRef(sIdx->refId);
taosThreadMutexDestroy(&sIdx->mtx);
indexTFileDestroy(sIdx->tindex);
taosMemoryFree(sIdx->path);
......@@ -177,7 +183,6 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
taosHashPut(index->colObj, buf, sz, &pCache, sizeof(void*));
}
}
taosThreadMutexUnlock(&index->mtx);
for (int i = 0; i < taosArrayGetSize(fVals); i++) {
SIndexTerm* p = taosArrayGetP(fVals, i);
......@@ -193,6 +198,7 @@ int indexPut(SIndex* index, SIndexMultiTerm* fVals, uint64_t uid) {
return ret;
}
}
taosThreadMutexUnlock(&index->mtx);
return 0;
}
int indexSearch(SIndex* index, SIndexMultiTermQuery* multiQuerys, SArray* result) {
......@@ -451,6 +457,14 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
}
// handle flush
Iterate* cacheIter = indexCacheIteratorCreate(pCache);
if (cacheIter == NULL) {
indexError("%p immtable is empty, ignore merge opera", pCache);
indexCacheDestroyImm(pCache);
tfileReaderUnRef(pReader);
indexReleaseRef(sIdx->refId);
return 0;
}
Iterate* tfileIter = tfileIteratorCreate(pReader);
if (tfileIter == NULL) {
indexWarn("empty tfile reader iterator");
......
......@@ -429,11 +429,13 @@ void indexCacheDestroy(void* cache) {
}
Iterate* indexCacheIteratorCreate(IndexCache* cache) {
if (cache->imm == NULL) {
return NULL;
}
Iterate* iiter = taosMemoryCalloc(1, sizeof(Iterate));
if (iiter == NULL) {
return NULL;
}
taosThreadMutexLock(&cache->mtx);
indexMemRef(cache->imm);
......@@ -463,12 +465,9 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
schedMsg.fp = doMergeWork;
schedMsg.ahandle = pCache;
schedMsg.thandle = NULL;
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg);
return 0;
}
......@@ -533,6 +532,19 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
return 0;
// encode end
}
void indexCacheForceToMerge(void* cache) {
IndexCache* pCache = cache;
indexCacheRef(pCache);
taosThreadMutexLock(&pCache->mtx);
indexInfo("%p is forced to merge into tfile", pCache);
pCache->occupiedMem += MEM_THRESHOLD * 5;
indexCacheMakeRoomForWrite(pCache);
taosThreadMutexUnlock(&pCache->mtx);
indexCacheUnRef(pCache);
return;
}
int indexCacheDel(void* cache, const char* fieldValue, int32_t fvlen, uint64_t uid, int8_t operType) {
IndexCache* pCache = cache;
return 0;
......
......@@ -272,9 +272,26 @@ void validateFst() {
}
delete m;
}
static std::string logDir = "/tmp/log";
static void initLog() {
const char* defaultLogFileNamePrefix = "taoslog";
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
sDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
printf("failed to open log file in directory:%s\n", tsLogDir);
}
}
class IndexEnv : public ::testing::Test {
protected:
virtual void SetUp() {
initLog();
taosRemoveDir(path);
opts = indexOptsCreate();
int ret = indexOpen(opts, path, &index);
......@@ -804,7 +821,7 @@ class IndexObj {
}
~IndexObj() {
indexCleanUp();
// indexCleanUp();
indexClose(idx);
}
......@@ -884,7 +901,7 @@ TEST_F(IndexEnv2, testIndexOpen) {
SArray* result = (SArray*)taosArrayInit(1, sizeof(uint64_t));
index->Search(mq, result);
std::cout << "target size: " << taosArrayGetSize(result) << std::endl;
assert(taosArrayGetSize(result) == 400);
EXPECT_EQ(400, taosArrayGetSize(result));
taosArrayDestroy(result);
indexMultiTermQueryDestroy(mq);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册