未验证 提交 74fb9433 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10560 from taosdata/feature/index_oper

merge add/delete/update opera on index
...@@ -96,7 +96,7 @@ typedef struct SIndexTermQuery { ...@@ -96,7 +96,7 @@ typedef struct SIndexTermQuery {
typedef struct Iterate Iterate; typedef struct Iterate Iterate;
typedef struct IterateValue { typedef struct IterateValue {
int8_t type; int8_t type; // opera type, ADD_VALUE/DELETE_VALUE
char* colVal; char* colVal;
SArray* val; SArray* val;
} IterateValue; } IterateValue;
......
...@@ -54,7 +54,22 @@ extern "C" { ...@@ -54,7 +54,22 @@ extern "C" {
* output:[4, 5] * output:[4, 5]
*/ */
void iIntersection(SArray *interResults, SArray *finalResult); void iIntersection(SArray *interResults, SArray *finalResult);
/* multi sorted result intersection
* input: [1, 2, 4, 5]
* [2, 3, 4, 5]
* [1, 4, 5]
* output:[1, 2, 3, 4, 5]
*/
void iUnion(SArray *interResults, SArray *finalResult); void iUnion(SArray *interResults, SArray *finalResult);
/* sorted array
* total: [1, 2, 4, 5, 7, 8]
* except: [4, 5]
* return: [1, 2, 7, 8]
*/
void iExcept(SArray *total, SArray *except);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -32,6 +32,18 @@ void* indexQhandle = NULL; ...@@ -32,6 +32,18 @@ void* indexQhandle = NULL;
static char JSON_COLUMN[] = "JSON"; static char JSON_COLUMN[] = "JSON";
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
for (int i = 0; i < taosArrayGetSize(src); i++) { \
if (*(uint64_t*)taosArrayGet(src, i) == tgt) { \
f = true; \
} \
} \
if (f == false) { \
taosArrayPush(dst, &tgt); \
} \
}
void indexInit() { void indexInit() {
// refactor later // refactor later
indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index"); indexQhandle = taosInitScheduler(INDEX_QUEUE_SIZE, INDEX_NUM_OF_THREADS, "index");
...@@ -52,6 +64,14 @@ typedef struct SIdxColInfo { ...@@ -52,6 +64,14 @@ typedef struct SIdxColInfo {
int cVersion; int cVersion;
} SIdxColInfo; } SIdxColInfo;
typedef struct SIdxMergeHelper {
char* colVal;
SArray* total;
SArray* added;
SArray* deled;
bool reset;
} SIdxMergeHelper;
static pthread_once_t isInit = PTHREAD_ONCE_INIT; static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit(); // static void indexInit();
static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result); static int indexTermSearch(SIndex* sIdx, SIndexTermQuery* term, SArray** result);
...@@ -62,8 +82,8 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp ...@@ -62,8 +82,8 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type // merge cache and tfile by opera type
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv); static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper);
static void indexMergeSameKey(SArray* result, TFileValue* tv); static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf);
...@@ -398,7 +418,32 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType ...@@ -398,7 +418,32 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0; return 0;
} }
static void indexMergeSameKey(SArray* result, TFileValue* tv) { SIdxMergeHelper* sIdxMergeHelperCreate() {
SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper));
hp->total = taosArrayInit(4, sizeof(uint64_t));
hp->added = taosArrayInit(4, sizeof(uint64_t));
hp->deled = taosArrayInit(4, sizeof(uint64_t));
hp->reset = false;
return hp;
}
void sIdxMergeHelperClear(SIdxMergeHelper* hp) {
if (hp == NULL) {
return;
}
hp->reset = false;
taosArrayClear(hp->total);
taosArrayClear(hp->added);
taosArrayClear(hp->deled);
}
void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) {
if (hp == NULL) {
return;
}
taosArrayDestroy(hp->total);
taosArrayDestroy(hp->added);
taosArrayDestroy(hp->deled);
}
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) {
int32_t sz = result ? taosArrayGetSize(result) : 0; int32_t sz = result ? taosArrayGetSize(result) : 0;
if (sz > 0) { if (sz > 0) {
// TODO(yihao): remove duplicate tableid // TODO(yihao): remove duplicate tableid
...@@ -414,26 +459,55 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) { ...@@ -414,26 +459,55 @@ static void indexMergeSameKey(SArray* result, TFileValue* tv) {
taosArrayPush(result, &tv); taosArrayPush(result, &tv);
} }
} }
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv) { static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) {
// opt taosArraySort(mh->total, uidCompare);
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; taosArraySort(mh->added, uidCompare);
// design merge-algorithm later, too complicated to handle all kind of situation taosArraySort(mh->deled, uidCompare);
SArray* arrs = taosArrayInit(2, sizeof(void*));
taosArrayPush(arrs, &mh->total);
taosArrayPush(arrs, &mh->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, mh->deled);
}
static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) {
int32_t sz = taosArrayGetSize(result);
if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
sIdxMergeResult(lv->tableId, help);
sIdxMergeHelperClear(help);
taosArrayPush(result, &tfv);
} else if (tfv == NULL) {
sIdxMergeResult(lv->tableId, help);
} else {
tfileValueDestroy(tfv);
}
} else {
taosArrayPush(result, &tfv);
}
}
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) {
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
TFileValue* tfv = tfileValueCreate(colVal); TFileValue* tfv = tfileValueCreate(colVal);
indexMayMergeToFinalResult(result, tfv, mh);
if (cv != NULL) { if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
if (cv->type == ADD_VALUE) { if (cv->type == ADD_VALUE) {
taosArrayAddAll(tfv->tableId, cv->val); INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id)
} else if (cv->type == DEL_VALUE) { } else if (cv->type == DEL_VALUE) {
} else if (cv->type == UPDATE_VALUE) { INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id)
} else {
// do nothing
} }
} }
if (tv != NULL) { if (tv != NULL) {
// opt later taosArrayAddAll(mh->total, tv->val);
taosArrayAddAll(tfv->tableId, tv->val);
} }
indexMergeSameKey(result, tfv);
} }
static void indexDestroyTempResult(SArray* result) { static void indexDestroyTempResult(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0; int32_t sz = result ? taosArrayGetSize(result) : 0;
...@@ -443,6 +517,7 @@ static void indexDestroyTempResult(SArray* result) { ...@@ -443,6 +517,7 @@ static void indexDestroyTempResult(SArray* result) {
} }
taosArrayDestroy(result); taosArrayDestroy(result);
} }
int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
if (sIdx == NULL) { if (sIdx == NULL) {
return -1; return -1;
...@@ -467,6 +542,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -467,6 +542,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxMergeHelper* help = sIdxMergeHelperCreate();
while (cn == true || tn == true) { while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
...@@ -480,17 +557,19 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -480,17 +557,19 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
comp = 1; comp = 1;
} }
if (comp == 0) { if (comp == 0) {
indexMergeCacheAndTFile(result, cv, tv); indexMergeCacheAndTFile(result, cv, tv, help);
cn = cacheIter->next(cacheIter); cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
} else if (comp < 0) { } else if (comp < 0) {
indexMergeCacheAndTFile(result, cv, NULL); indexMergeCacheAndTFile(result, cv, NULL, help);
cn = cacheIter->next(cacheIter); cn = cacheIter->next(cacheIter);
} else { } else {
indexMergeCacheAndTFile(result, NULL, tv); indexMergeCacheAndTFile(result, NULL, tv, help);
tn = tfileIter->next(tfileIter); tn = tfileIter->next(tfileIter);
} }
} }
indexMayMergeToFinalResult(result, NULL, help);
int ret = indexGenTFile(sIdx, pCache, result); int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyTempResult(result); indexDestroyTempResult(result);
...@@ -502,6 +581,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { ...@@ -502,6 +581,8 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
tfileReaderUnRef(pReader); tfileReaderUnRef(pReader);
indexCacheUnRef(pCache); indexCacheUnRef(pCache);
sIdxMergeHelperDestroy(help);
int64_t cost = taosGetTimestampUs() - st; int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) { if (ret != 0) {
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
......
...@@ -267,6 +267,10 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA ...@@ -267,6 +267,10 @@ static int indexQueryMem(MemTable* mem, CacheTerm* ct, EIndexQueryType qtype, SA
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
if (node != NULL) { if (node != NULL) {
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
// if (c->operaType == ADD_VALUE) {
//} else if (c->operaType == DEL_VALUE) {
//}
if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) { if (c->operaType == ADD_VALUE || qtype == QUERY_TERM) {
if (strcmp(c->colVal, ct->colVal) == 0) { if (strcmp(c->colVal, ct->colVal) == 0) {
taosArrayPush(result, &c->uid); taosArrayPush(result, &c->uid);
...@@ -411,17 +415,8 @@ static bool indexCacheIteratorNext(Iterate* itera) { ...@@ -411,17 +415,8 @@ static bool indexCacheIteratorNext(Iterate* itera) {
SSkipListNode* node = tSkipListIterGet(iter); SSkipListNode* node = tSkipListIterGet(iter);
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node); CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
// equal func
// if (iv->colVal != NULL && ct->colVal != NULL) {
// if (0 == strcmp(iv->colVal, ct->colVal)) { if (iv->type == ADD_VALUE) }
//} else {
// tIterVal.colVal = calloc(1, strlen(ct->colVal) + 1);
// tIterval.colVal = tstrdup(ct->colVal);
//}
iv->type = ct->operaType; iv->type = ct->operaType;
iv->colVal = tstrdup(ct->colVal); iv->colVal = tstrdup(ct->colVal);
// iv->colVal = calloc(1, strlen(ct->colVal) + 1);
// memcpy(iv->colVal, ct->colVal, strlen(ct->colVal));
taosArrayPush(iv->val, &ct->uid); taosArrayPush(iv->val, &ct->uid);
} }
......
...@@ -429,6 +429,7 @@ static bool tfileIteratorNext(Iterate* iiter) { ...@@ -429,6 +429,7 @@ static bool tfileIteratorNext(Iterate* iiter) {
return false; return false;
} }
iv->type = ADD_VALUE; // value in tfile always ADD_VALUE
iv->colVal = colVal; iv->colVal = colVal;
return true; return true;
// std::string key(ch, sz); // std::string key(ch, sz);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "index_util.h" #include "index_util.h"
#include "index.h" #include "index.h"
typedef struct MergeIndex { typedef struct MergeIndex {
int idx; int idx;
int len; int len;
...@@ -111,6 +112,26 @@ void iUnion(SArray *inters, SArray *final) { ...@@ -111,6 +112,26 @@ void iUnion(SArray *inters, SArray *final) {
break; break;
} }
} }
tfree(mi); tfree(mi);
} }
void iExcept(SArray *total, SArray *except) {
int32_t tsz = taosArrayGetSize(total);
int32_t esz = taosArrayGetSize(except);
if (esz == 0 || tsz == 0) {
return;
}
int vIdx = 0;
for (int i = 0; i < tsz; i++) {
uint64_t val = *(uint64_t *)taosArrayGet(total, i);
int idx = iBinarySearch(except, 0, esz - 1, val);
if (idx >= 0 && idx < esz && *(uint64_t *)taosArrayGet(except, idx) == val) {
continue;
}
taosArraySet(total, vIdx, &val);
vIdx += 1;
}
taosArrayPopTailBatch(total, tsz - vIdx);
}
...@@ -224,3 +224,84 @@ TEST_F(UtilEnv, 04union) { ...@@ -224,3 +224,84 @@ TEST_F(UtilEnv, 04union) {
iUnion(src, rslt); iUnion(src, rslt);
assert(taosArrayGetSize(rslt) == 12); assert(taosArrayGetSize(rslt) == 12);
} }
TEST_F(UtilEnv, 01Except) {
SArray *total = taosArrayInit(4, sizeof(uint64_t));
{
uint64_t arr1[] = {1, 4, 5, 6};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(total, &arr1[i]);
}
}
SArray *except = taosArrayInit(4, sizeof(uint64_t));
{
uint64_t arr1[] = {1, 4, 5, 6};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(except, &arr1[i]);
}
}
iExcept(total, except);
ASSERT_EQ(taosArrayGetSize(total), 0);
taosArrayClear(total);
taosArrayClear(except);
{
uint64_t arr1[] = {1, 4, 5, 6, 7, 8};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(total, &arr1[i]);
}
}
{
uint64_t arr1[] = {2, 4, 5, 6};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(except, &arr1[i]);
}
}
iExcept(total, except);
ASSERT_EQ(taosArrayGetSize(total), 3);
taosArrayClear(total);
taosArrayClear(except);
{
uint64_t arr1[] = {1, 4, 5, 6, 7, 8, 10, 100};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(total, &arr1[i]);
}
}
{
uint64_t arr1[] = {2, 4, 5, 6};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(except, &arr1[i]);
}
}
iExcept(total, except);
ASSERT_EQ(taosArrayGetSize(total), 5);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 7);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 2), 8);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 3), 10);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 4), 100);
taosArrayClear(total);
taosArrayClear(except);
{
uint64_t arr1[] = {1, 100};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(total, &arr1[i]);
}
}
{
uint64_t arr1[] = {2, 4, 5, 6};
for (int i = 0; i < sizeof(arr1) / sizeof(arr1[0]); i++) {
taosArrayPush(except, &arr1[i]);
}
}
iExcept(total, except);
ASSERT_EQ(taosArrayGetSize(total), 2);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册