未验证 提交 23521373 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #10563 from taosdata/feature/index_oper

add update on index
......@@ -16,6 +16,7 @@
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_comm.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tdef.h"
......@@ -30,8 +31,6 @@
void* indexQhandle = NULL;
static char JSON_COLUMN[] = "JSON";
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
......@@ -64,13 +63,11 @@ typedef struct SIdxColInfo {
int cVersion;
} SIdxColInfo;
typedef struct SIdxMergeHelper {
char* colVal;
typedef struct SIdxTempResult {
SArray* total;
SArray* added;
SArray* deled;
bool reset;
} SIdxMergeHelper;
} SIdxTempResult;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit();
......@@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper);
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper);
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
......@@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) {
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
// refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(interResults); i--) {
SArray* t = taosArrayGetP(interResults, i);
taosArraySort(t, uidCompare);
......@@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0;
}
SIdxMergeHelper* sIdxMergeHelperCreate() {
SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper));
hp->total = taosArrayInit(4, sizeof(uint64_t));
hp->added = taosArrayInit(4, sizeof(uint64_t));
hp->deled = taosArrayInit(4, sizeof(uint64_t));
hp->reset = false;
return hp;
SIdxTempResult* sIdxTempResultCreate() {
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
}
void sIdxMergeHelperClear(SIdxMergeHelper* hp) {
if (hp == NULL) {
void sIdxTempResultClear(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
hp->reset = false;
taosArrayClear(hp->total);
taosArrayClear(hp->added);
taosArrayClear(hp->deled);
taosArrayClear(tr->total);
taosArrayClear(tr->added);
taosArrayClear(tr->deled);
}
void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) {
if (hp == NULL) {
void sIdxTempResultDestroy(SIdxTempResult* tr) {
if (tr == NULL) {
return;
}
taosArrayDestroy(hp->total);
taosArrayDestroy(hp->added);
taosArrayDestroy(hp->deled);
taosArrayDestroy(tr->total);
taosArrayDestroy(tr->added);
taosArrayDestroy(tr->deled);
}
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
if (sz > 0) {
// TODO(yihao): remove duplicate tableid
TFileValue* lv = taosArrayGetP(result, sz - 1);
// indexError("merge colVal: %s", lv->colVal);
if (strcmp(lv->colVal, tv->colVal) == 0) {
taosArrayAddAll(lv->tableId, tv->tableId);
tfileValueDestroy(tv);
} else {
taosArrayPush(result, &tv);
}
} else {
taosArrayPush(result, &tv);
}
}
static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) {
taosArraySort(mh->total, uidCompare);
taosArraySort(mh->added, uidCompare);
taosArraySort(mh->deled, uidCompare);
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray* arrs = taosArrayInit(2, sizeof(void*));
taosArrayPush(arrs, &mh->total);
taosArrayPush(arrs, &mh->added);
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
taosArrayDestroy(arrs);
iExcept(result, mh->deled);
iExcept(result, tr->deled);
}
static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) {
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
int32_t sz = taosArrayGetSize(result);
if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
sIdxMergeResult(lv->tableId, help);
sIdxMergeHelperClear(help);
sIdxTempResultMergeTo(lv->tableId, tr);
sIdxTempResultClear(tr);
taosArrayPush(result, &tfv);
} else if (tfv == NULL) {
sIdxMergeResult(lv->tableId, help);
// handle last iterator
sIdxTempResultMergeTo(lv->tableId, tr);
} else {
// temp result saved in help
tfileValueDestroy(tfv);
}
} else {
taosArrayPush(result, &tfv);
}
}
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) {
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) {
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
TFileValue* tfv = tfileValueCreate(colVal);
indexMayMergeToFinalResult(result, tfv, mh);
indexMayMergeTempToFinalResult(result, tfv, tr);
if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
if (cv->type == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id)
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
} else if (cv->type == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id)
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id)
}
}
if (tv != NULL) {
taosArrayAddAll(mh->total, tv->val);
taosArrayAddAll(tr->total, tv->val);
}
}
static void indexDestroyTempResult(SArray* result) {
static void indexDestroyFinalResult(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
for (size_t i = 0; i < sz; i++) {
TFileValue* tv = taosArrayGetP(result, i);
......@@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxMergeHelper* help = sIdxMergeHelperCreate();
SIdxTempResult* tr = sIdxTempResultCreate();
while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
......@@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
comp = 1;
}
if (comp == 0) {
indexMergeCacheAndTFile(result, cv, tv, help);
indexMergeCacheAndTFile(result, cv, tv, tr);
cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter);
} else if (comp < 0) {
indexMergeCacheAndTFile(result, cv, NULL, help);
indexMergeCacheAndTFile(result, cv, NULL, tr);
cn = cacheIter->next(cacheIter);
} else {
indexMergeCacheAndTFile(result, NULL, tv, help);
indexMergeCacheAndTFile(result, NULL, tv, tr);
tn = tfileIter->next(tfileIter);
}
}
indexMayMergeToFinalResult(result, NULL, help);
indexMayMergeTempToFinalResult(result, NULL, tr);
sIdxTempResultDestroy(tr);
int ret = indexGenTFile(sIdx, pCache, result);
indexDestroyTempResult(result);
indexDestroyFinalResult(result);
indexCacheDestroyImm(pCache);
......@@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
tfileReaderUnRef(pReader);
indexCacheUnRef(pCache);
sIdxMergeHelperDestroy(help);
int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) {
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
......
......@@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) {
if (tcache == NULL) {
return;
}
// free table cache
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
while (reader) {
......
......@@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) {
}
indexMultiTermDestroy(terms);
}
{
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
for (uint i = 0; i < 1000; i++) {
colVal[i % colVal.size()] = '0' + i % 128;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
}
indexMultiTermDestroy(terms);
}
}
{
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册