diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 6cb5142dd83da3c3d5f3153a5d8a34bb82d35256..d47954ac1fe672ebcc9d9698864fc0d43c0f035a 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -96,7 +96,7 @@ typedef struct SIndexTermQuery { typedef struct Iterate Iterate; typedef struct IterateValue { - int8_t type; + int8_t type; // opera type, ADD_VALUE/DELETE_VALUE char* colVal; SArray* val; } IterateValue; diff --git a/source/libs/index/inc/index_util.h b/source/libs/index/inc/index_util.h index 985dd657d088c08b36ffac1f2f169420fe8e76b7..313839bf1da56bc40fa9cabb86cca9d28b6af89f 100644 --- a/source/libs/index/inc/index_util.h +++ b/source/libs/index/inc/index_util.h @@ -54,7 +54,22 @@ extern "C" { * output:[4, 5] */ void iIntersection(SArray *interResults, SArray *finalResult); + +/* multi sorted result intersection + * input: [1, 2, 4, 5] + * [2, 3, 4, 5] + * [1, 4, 5] + * output:[1, 2, 3, 4, 5] + */ void iUnion(SArray *interResults, SArray *finalResult); + +/* sorted array + * total: [1, 2, 4, 5, 7, 8] + * except: [4, 5] + * return: [1, 2, 7, 8] + */ + +void iExcept(SArray *total, SArray *except); #ifdef __cplusplus } #endif diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 168e819073ac236cfb83007878b8e9c5cb8a4566..57e965a753c08ca323a364d2ac016209ddf4045d 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -32,6 +32,18 @@ void* indexQhandle = NULL; static char JSON_COLUMN[] = "JSON"; +#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ + { \ + bool f = false; \ + for (int i = 0; i < taosArrayGetSize(src); i++) { \ + if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \ + f = true; \ + } \ + } \ + if (f == false) { \ + taosArrayPush(dst, &tgt); \ + } \ + } void indexInit() { // refactor later indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); @@ -52,6 +64,14 @@ typedef struct SIdxColInfo { int cVersion; } SIdxColInfo; +typedef struct SIdxMergeHelper { + char* colVal; + SArray* total; + SArray* added; + SArray* deled; + bool reset; +} SIdxMergeHelper; + static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); @@ -62,8 +82,8 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); -static void indexMergeSameKey(SArray* result, TFileValue* tv); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper); +static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -398,7 +418,32 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -static void indexMergeSameKey(SArray* result, TFileValue* tv) { +SIdxMergeHelper* sIdxMergeHelperCreate() { + SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper)); + hp->total = taosArrayInit(4, sizeof(uint64_t)); + hp->added = taosArrayInit(4, sizeof(uint64_t)); + hp->deled = taosArrayInit(4, sizeof(uint64_t)); + hp->reset = false; + return hp; +} +void sIdxMergeHelperClear(SIdxMergeHelper* hp) { + if (hp == NULL) { + return; + } + hp->reset = false; + taosArrayClear(hp->total); + taosArrayClear(hp->added); + taosArrayClear(hp->deled); +} +void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) { + if (hp == NULL) { + return; + } + taosArrayDestroy(hp->total); + taosArrayDestroy(hp->added); + taosArrayDestroy(hp->deled); +} +static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) { int32_t sz = result ? taosArrayGetSize(result) : 0; if (sz > 0) { // TODO(yihao): remove duplicate tableid @@ -414,26 +459,55 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { taosArrayPush(result, &tv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) { - // opt - char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; - // design merge-algorithm later, too complicated to handle all kind of situation +static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) { + taosArraySort(mh->total, uidCompare); + taosArraySort(mh->added, uidCompare); + taosArraySort(mh->deled, uidCompare); + + SArray* arrs = taosArrayInit(2, sizeof(void*)); + taosArrayPush(arrs, &mh->total); + taosArrayPush(arrs, &mh->added); + + iUnion(arrs, result); + taosArrayDestroy(arrs); + + iExcept(result, mh->deled); +} +static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) { + int32_t sz = taosArrayGetSize(result); + if (sz > 0) { + TFileValue* lv = taosArrayGetP(result, sz - 1); + if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { + sIdxMergeResult(lv->tableId, help); + sIdxMergeHelperClear(help); + + taosArrayPush(result, &tfv); + } else if (tfv == NULL) { + sIdxMergeResult(lv->tableId, help); + } else { + tfileValueDestroy(tfv); + } + } else { + taosArrayPush(result, &tfv); + } +} +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) { + char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); + + indexMayMergeToFinalResult(result, tfv, mh); + if (cv != NULL) { + uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); if (cv->type == ADD_VALUE) { - taosArrayAddAll(tfv->tableId, cv->val); + INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id) } else if (cv->type == DEL_VALUE) { - } else if (cv->type == UPDATE_VALUE) { - } else { - // do nothing + INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id) } } if (tv != NULL) { - // opt later - taosArrayAddAll(tfv->tableId, tv->val); + taosArrayAddAll(mh->total, tv->val); } - - indexMergeSameKey(result, tfv); } static void indexDestroyTempResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; @@ -443,6 +517,7 @@ static void indexDestroyTempResult(SArray* result) { } taosArrayDestroy(result); } + int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { if (sIdx == NULL) { return -1; @@ -467,6 +542,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; + + SIdxMergeHelper* help = sIdxMergeHelperCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -480,17 +557,19 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { comp = 1; } if (comp == 0) { - indexMergeCacheAndTFile(result, cv, tv); + indexMergeCacheAndTFile(result, cv, tv, help); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - indexMergeCacheAndTFile(result, cv, NULL); + indexMergeCacheAndTFile(result, cv, NULL, help); cn = cacheIter->next(cacheIter); } else { - indexMergeCacheAndTFile(result, NULL, tv); + indexMergeCacheAndTFile(result, NULL, tv, help); tn = tfileIter->next(tfileIter); } } + indexMayMergeToFinalResult(result, NULL, help); + int ret = indexGenTFile(sIdx, pCache, result); indexDestroyTempResult(result); @@ -502,6 +581,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); indexCacheUnRef(pCache); + sIdxMergeHelperDestroy(help); + int64_t cost = taosGetTimestampUs() - st; if (ret != 0) { indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); diff --git a/source/libs/index/src/index_cache.c b/source/libs/index/src/index_cache.c index 599bac3fe6534ed719a08c147d2417015333bcfa..1ac72e10a9bdb207b551578e0c34d367adde7fe1 100644 --- a/source/libs/index/src/index_cache.c +++ b/source/libs/index/src/index_cache.c @@ -267,6 +267,10 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA SSkipListNode* node = tSkipListIterGet(iter); if (node != NULL) { CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + // if (c->operaType == ADD_VALUE) { + //} else if (c->operaType == DEL_VALUE) { + //} + if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { if (strcmp(c->colVal, ct->colVal) == 0) { taosArrayPush(result, &c->uid); @@ -411,17 +415,8 @@ static bool indexCacheIteratorNext(Iterate* itera) { SSkipListNode* node = tSkipListIterGet(iter); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); - // equal func - // if (iv->colVal != NULL && ct->colVal != NULL) { - // if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) } - //} else { - // tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1); - // tIterval.colVal = tstrdup(ct->colVal); - //} iv->type = ct->operaType; iv->colVal = tstrdup(ct->colVal); - // iv->colVal = calloc(1, strlen(ct->colVal) + 1); - // memcpy(iv->colVal, ct->colVal, strlen(ct->colVal)); taosArrayPush(iv->val, &ct->uid); } diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 0947c796b2fb79d5c7eabd17e0d23d35f53e0c84..6cfc199afd7fefcef3267806989402bb62da324c 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -429,6 +429,7 @@ static bool tfileIteratorNext(Iterate* iiter) { return false; } + iv->type = ADD_VALUE; // value in tfile always ADD_VALUE iv->colVal = colVal; return true; // std::string key(ch, sz); diff --git a/source/libs/index/src/index_util.c b/source/libs/index/src/index_util.c index fcaab968c24a296431f2a25039050947887e0aac..fc28484de9dd80fe9a609e457785bc4f035dabd1 100644 --- a/source/libs/index/src/index_util.c +++ b/source/libs/index/src/index_util.c @@ -14,6 +14,7 @@ */ #include "index_util.h" #include "index.h" + typedef struct MergeIndex { int idx; int len; @@ -111,6 +112,26 @@ void iUnion(SArray *inters, SArray *final) { break; } } - tfree(mi); } + +void iExcept(SArray *total, SArray *except) { + int32_t tsz = taosArrayGetSize(total); + int32_t esz = taosArrayGetSize(except); + if (esz == 0 || tsz == 0) { + return; + } + + int vIdx = 0; + for (int i = 0; i < tsz; i++) { + uint64_t val = *(uint64_t *)taosArrayGet(total, i); + int idx = iBinarySearch(except, 0, esz - 1, val); + if (idx >= 0 && idx < esz && *(uint64_t *)taosArrayGet(except, idx) == val) { + continue; + } + taosArraySet(total, vIdx, &val); + vIdx += 1; + } + + taosArrayPopTailBatch(total, tsz - vIdx); +} diff --git a/source/libs/index/test/utilUT.cc b/source/libs/index/test/utilUT.cc index aeff20d488d9c99805a3cdb250309d7fdb038cf3..6ba8cc9525d83de1e8ab8b352f9e81e0153cc772 100644 --- a/source/libs/index/test/utilUT.cc +++ b/source/libs/index/test/utilUT.cc @@ -224,3 +224,84 @@ TEST_F(UtilEnv, 04union) { iUnion(src, rslt); assert(taosArrayGetSize(rslt) == 12); } +TEST_F(UtilEnv, 01Except) { + SArray *total = taosArrayInit(4, sizeof(uint64_t)); + { + uint64_t arr1[] = {1, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + SArray *except = taosArrayInit(4, sizeof(uint64_t)); + { + uint64_t arr1[] = {1, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 0); + + taosArrayClear(total); + taosArrayClear(except); + + { + uint64_t arr1[] = {1, 4, 5, 6, 7, 8}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 3); + + taosArrayClear(total); + taosArrayClear(except); + { + uint64_t arr1[] = {1, 4, 5, 6, 7, 8, 10, 100}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 5); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 7); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 2), 8); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 3), 10); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 4), 100); + + taosArrayClear(total); + taosArrayClear(except); + { + uint64_t arr1[] = {1, 100}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(total, &arr1[i]); + } + } + + { + uint64_t arr1[] = {2, 4, 5, 6}; + for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) { + taosArrayPush(except, &arr1[i]); + } + } + iExcept(total, except); + ASSERT_EQ(taosArrayGetSize(total), 2); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); + ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); +}