diff --git a/include/libs/index/index.h b/include/libs/index/index.h index bf260d4899d3b51a53939be5677e71bf4cc0626a..c94d75338aa97c5c5eb790ffbcf9b835d71f18e1 100644 --- a/include/libs/index/index.h +++ b/include/libs/index/index.h @@ -47,7 +47,17 @@ typedef enum { } SIndexOperOnColumn; typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType; -typedef enum { QUERY_TERM = 0, QUERY_PREFIX, QUERY_SUFFIX, QUERY_REGEX, QUERY_RANGE } EIndexQueryType; +typedef enum { + QUERY_TERM = 0, + QUERY_PREFIX, + QUERY_SUFFIX, + QUERY_REGEX, + QUERY_LESS_THAN, + QUERY_LESS_EQUAL, + QUERY_GREATER_THAN, + QUERY_GREATER_EQUAL, + QUERY_RANGE +} EIndexQueryType; /* * create multi query diff --git a/source/libs/executor/src/indexoperator.c b/source/libs/executor/src/indexoperator.c index 0f075c5de2ff44527cde203bec19f7c22540939f..3b62bdd664747a8b145a5bb4e883a3a327f3e146 100644 --- a/source/libs/executor/src/indexoperator.c +++ b/source/libs/executor/src/indexoperator.c @@ -63,9 +63,14 @@ typedef struct SIFParam { } SIFParam; static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { - if (src == OP_TYPE_GREATER_THAN || src == OP_TYPE_GREATER_EQUAL || src == OP_TYPE_LOWER_THAN || - src == OP_TYPE_LOWER_EQUAL) { - *dst = QUERY_RANGE; + if (src == OP_TYPE_GREATER_THAN) { + *dst = QUERY_GREATER_THAN; + } else if (src == OP_TYPE_GREATER_EQUAL) { + *dst = QUERY_GREATER_EQUAL; + } else if (src == OP_TYPE_LOWER_THAN) { + *dst = QUERY_LESS_THAN; + } else if (src == OP_TYPE_LOWER_EQUAL) { + *dst = QUERY_LESS_EQUAL; } else if (src == OP_TYPE_EQUAL) { *dst = QUERY_TERM; } else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) { @@ -249,9 +254,6 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName, strlen(left->colName), right->condValue, strlen(right->condValue)); - if (operType == OP_TYPE_LOWER_EQUAL || operType == OP_TYPE_GREATER_EQUAL || operType == OP_TYPE_GREATER_THAN || - operType == OP_TYPE_LOWER_THAN) { - } if (tm == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } diff --git a/source/libs/index/inc/indexFst.h b/source/libs/index/inc/indexFst.h index 39ad5ffa8c5acddf543982ebba540b7d422f4183..0a360c1c72e621615796e8c5f383110e8ec11db6 100644 --- a/source/libs/index/inc/indexFst.h +++ b/source/libs/index/inc/indexFst.h @@ -52,7 +52,6 @@ typedef struct FstRange { uint64_t end; } FstRange; -typedef enum { GE, GT, LE, LT } RangeType; typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State; typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType; @@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date, uint64_t fstStateFindInput(FstState* state, FstNode* node, uint8_t b, bool* null); #define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext) -#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans) -#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans) -#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal) +#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans) +#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans) +#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal) typedef struct FstLastTransition { uint8_t inp; diff --git a/source/libs/index/inc/indexInt.h b/source/libs/index/inc/indexInt.h index 62ddf8598528159a409ed7fcd6554d5d470edb29..5c7b8b9afe39f8e59b63e65f93c30e51c8b2d60c 100644 --- a/source/libs/index/inc/indexInt.h +++ b/source/libs/index/inc/indexInt.h @@ -34,6 +34,7 @@ extern "C" { #endif +typedef enum { LT, LE, GT, GE } RangeType; typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef struct SIndexStat { @@ -86,7 +87,6 @@ typedef struct SIndexTerm { int32_t nColName; char* colVal; int32_t nColVal; - int8_t qType; // just use for range } SIndexTerm; typedef struct SIndexTermQuery { diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index ee77aa92e9cd9c593f8e292a6b9487a0f87b4148..83b5025ad0b3f50cad1c1272d19bdb1bfd95e78b 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -262,7 +262,6 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryT tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); memcpy(tm->colVal, colVal, nColVal); tm->nColVal = nColVal; - tm->qType = queryType; return tm; } diff --git a/source/libs/index/src/indexCache.c b/source/libs/index/src/indexCache.c index 78368981b354f7b5da99d6a571a4bab4fcd3e1e4..13768ce68248b736f3d362d0822fab73889f443c 100644 --- a/source/libs/index/src/indexCache.c +++ b/source/libs/index/src/indexCache.c @@ -38,10 +38,29 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); +/*comm func of compare, used in (LE/LT/GE/GT compare)*/ +static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s, + RangeType type); + +typedef enum { MATCH, CONTINUE, BREAK } TExeCond; +typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); + +static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; } +static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; } +static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; } +static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; } + +static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual, + tCompareGreaterThan, tCompareGreaterEqual}; static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = { - cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchRange}; + cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan, + cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange}; static void doMergeWork(SSchedMsg* msg); static bool indexCacheIteratorNext(Iterate* itera); @@ -88,6 +107,52 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, // impl later return 0; } +static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s, + RangeType type) { + if (cache == NULL) { + return 0; + } + _cache_range_compare cmpFn = rangeCompare[type]; + + MemTable* mem = cache; + char* key = indexCacheTermGet(ct); + + SSkipListIterator* iter = tSkipListCreateIter(mem->mem); + while (tSkipListIterNext(iter)) { + SSkipListNode* node = tSkipListIterGet(iter); + if (node == NULL) { + break; + } + CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node); + TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType); + if (cond == MATCH) { + if (c->operaType == ADD_VALUE) { + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) + // taosArrayPush(result, &c->uid); + *s = kTypeValue; + } else if (c->operaType == DEL_VALUE) { + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid) + } + } else if (cond == CONTINUE) { + } else if (cond == BREAK) { + break; + } + } + tSkipListDestroyIter(iter); + return TSDB_CODE_SUCCESS; +} +static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, ct, tr, s, LT); +} +static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, ct, tr, s, LE); +} +static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, ct, tr, s, GT); +} +static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { + return cacheSearchCompareFunc(cache, ct, tr, s, GE); +} static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { // impl later return 0; diff --git a/source/libs/index/src/indexTfile.c b/source/libs/index/src/indexTfile.c index 7072baf57470d438ae0e6142c9c832182d0a1998..5aed2bd6b0e2df673cc796278a3bb6baf899cd88 100644 --- a/source/libs/index/src/indexTfile.c +++ b/source/libs/index/src/indexTfile.c @@ -64,10 +64,17 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); +static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype); + static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { - tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchRange}; + tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan, + tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange}; TFileCache* tfileCacheCreate(const char* path) { TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache)); @@ -299,6 +306,47 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) fstSliceDestroy(&key); return 0; } + +static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) { + bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); + int ret = 0; + char* p = tem->colVal; + uint64_t sz = tem->nColVal; + if (hasJson) { + p = indexPackJsonData(tem); + sz = strlen(p); + } + SArray* offsets = taosArrayInit(16, sizeof(uint64_t)); + + AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS); + FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx); + + FstSlice h = fstSliceCreate((uint8_t*)p, sz); + fstStreamBuilderSetRange(sb, &h, type); + fstSliceDestroy(&h); + + StreamWithState* st = streamBuilderIntoStream(sb); + StreamWithStateResult* rt = NULL; + while ((rt = streamWithStateNextWith(st, NULL)) != NULL) { + taosArrayPush(offsets, &(rt->out.out)); + swsResultDestroy(rt); + } + streamWithStateDestroy(st); + fstStreamBuilderDestroy(sb); + return TSDB_CODE_SUCCESS; +} +static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc(reader, tem, tr, LT); +} +static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc(reader, tem, tr, LE); +} +static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc(reader, tem, tr, GT); +} +static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { + return tfSearchCompareFunc(reader, tem, tr, GE); +} static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); int ret = 0;