diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 57e965a753c08ca323a364d2ac016209ddf4045d..75dc05fc5b8cfdf254eebf9383723f82a3507bed 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,6 +16,7 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_comm.h" #include "index_tfile.h" #include "index_util.h" #include "tdef.h" @@ -30,8 +31,6 @@ void* indexQhandle = NULL; -static char JSON_COLUMN[] = "JSON"; - #define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ { \ bool f = false; \ @@ -64,13 +63,11 @@ typedef struct SIdxColInfo { int cVersion; } SIdxColInfo; -typedef struct SIdxMergeHelper { - char* colVal; +typedef struct SIdxTempResult { SArray* total; SArray* added; SArray* deled; - bool reset; -} SIdxMergeHelper; +} SIdxTempResult; static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); @@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper); -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) { static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) { // refactor, merge interResults into fResults by oType - for (int i = 0; i < taosArrayGetSize(interResults); i--) { SArray* t = taosArrayGetP(interResults, i); taosArraySort(t, uidCompare); @@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -SIdxMergeHelper* sIdxMergeHelperCreate() { - SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper)); - hp->total = taosArrayInit(4, sizeof(uint64_t)); - hp->added = taosArrayInit(4, sizeof(uint64_t)); - hp->deled = taosArrayInit(4, sizeof(uint64_t)); - hp->reset = false; - return hp; +SIdxTempResult* sIdxTempResultCreate() { + SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; } -void sIdxMergeHelperClear(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultClear(SIdxTempResult* tr) { + if (tr == NULL) { return; } - hp->reset = false; - taosArrayClear(hp->total); - taosArrayClear(hp->added); - taosArrayClear(hp->deled); + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); } -void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultDestroy(SIdxTempResult* tr) { + if (tr == NULL) { return; } - taosArrayDestroy(hp->total); - taosArrayDestroy(hp->added); - taosArrayDestroy(hp->deled); + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); } -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) { - int32_t sz = result ? taosArrayGetSize(result) : 0; - if (sz > 0) { - // TODO(yihao): remove duplicate tableid - TFileValue* lv = taosArrayGetP(result, sz - 1); - // indexError("merge colVal: %s", lv->colVal); - if (strcmp(lv->colVal, tv->colVal) == 0) { - taosArrayAddAll(lv->tableId, tv->tableId); - tfileValueDestroy(tv); - } else { - taosArrayPush(result, &tv); - } - } else { - taosArrayPush(result, &tv); - } -} -static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) { - taosArraySort(mh->total, uidCompare); - taosArraySort(mh->added, uidCompare); - taosArraySort(mh->deled, uidCompare); +static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); SArray* arrs = taosArrayInit(2, sizeof(void*)); - taosArrayPush(arrs, &mh->total); - taosArrayPush(arrs, &mh->added); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); iUnion(arrs, result); taosArrayDestroy(arrs); - iExcept(result, mh->deled); + iExcept(result, tr->deled); } -static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) { +static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - sIdxMergeResult(lv->tableId, help); - sIdxMergeHelperClear(help); + sIdxTempResultMergeTo(lv->tableId, tr); + sIdxTempResultClear(tr); taosArrayPush(result, &tfv); } else if (tfv == NULL) { - sIdxMergeResult(lv->tableId, help); + // handle last iterator + sIdxTempResultMergeTo(lv->tableId, tr); } else { + // temp result saved in help tfileValueDestroy(tfv); } } else { taosArrayPush(result, &tfv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) { +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); - indexMayMergeToFinalResult(result, tfv, mh); + indexMayMergeTempToFinalResult(result, tfv, tr); if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); if (cv->type == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id) + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id) + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) } } if (tv != NULL) { - taosArrayAddAll(mh->total, tv->val); + taosArrayAddAll(tr->total, tv->val); } } -static void indexDestroyTempResult(SArray* result) { +static void indexDestroyFinalResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { TFileValue* tv = taosArrayGetP(result, i); @@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - SIdxMergeHelper* help = sIdxMergeHelperCreate(); + SIdxTempResult* tr = sIdxTempResultCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { comp = 1; } if (comp == 0) { - indexMergeCacheAndTFile(result, cv, tv, help); + indexMergeCacheAndTFile(result, cv, tv, tr); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - indexMergeCacheAndTFile(result, cv, NULL, help); + indexMergeCacheAndTFile(result, cv, NULL, tr); cn = cacheIter->next(cacheIter); } else { - indexMergeCacheAndTFile(result, NULL, tv, help); + indexMergeCacheAndTFile(result, NULL, tv, tr); tn = tfileIter->next(tfileIter); } } - indexMayMergeToFinalResult(result, NULL, help); + indexMayMergeTempToFinalResult(result, NULL, tr); + sIdxTempResultDestroy(tr); int ret = indexGenTFile(sIdx, pCache, result); - indexDestroyTempResult(result); + indexDestroyFinalResult(result); indexCacheDestroyImm(pCache); @@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); indexCacheUnRef(pCache); - sIdxMergeHelperDestroy(help); - int64_t cost = taosGetTimestampUs() - st; if (ret != 0) { indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6cfc199afd7fefcef3267806989402bb62da324c..f5f46b061714fa372a2aa52607598cf69214990e 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) { if (tcache == NULL) { return; } - // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e5c79d137f43cb7350396211d81c71ec734e3957..df9f8b8439db379e99cfc96b69ea150b7a51577a 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) { } indexMultiTermDestroy(terms); } + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + for (uint i = 0; i < 1000; i++) { + colVal[i % colVal.size()] = '0' + i % 128; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + } { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx");