未验证 提交 e50e3a17 编写于 作者: dengyihao's avatar dengyihao 提交者: GitHub

Merge pull request #12103 from taosdata/enh/supportIdxFilter

enh(index): support index filter
...@@ -47,7 +47,17 @@ typedef enum { ...@@ -47,7 +47,17 @@ typedef enum {
} SIndexOperOnColumn; } SIndexOperOnColumn;
typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType; typedef enum { MUST = 0, SHOULD, NOT } EIndexOperatorType;
typedef enum { QUERY_TERM = 0, QUERY_PREFIX, QUERY_SUFFIX, QUERY_REGEX, QUERY_RANGE } EIndexQueryType; typedef enum {
QUERY_TERM = 0,
QUERY_PREFIX,
QUERY_SUFFIX,
QUERY_REGEX,
QUERY_LESS_THAN,
QUERY_LESS_EQUAL,
QUERY_GREATER_THAN,
QUERY_GREATER_EQUAL,
QUERY_RANGE
} EIndexQueryType;
/* /*
* create multi query * create multi query
......
...@@ -63,9 +63,14 @@ typedef struct SIFParam { ...@@ -63,9 +63,14 @@ typedef struct SIFParam {
} SIFParam; } SIFParam;
static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) { static int32_t sifGetFuncFromSql(EOperatorType src, EIndexQueryType *dst) {
if (src == OP_TYPE_GREATER_THAN || src == OP_TYPE_GREATER_EQUAL || src == OP_TYPE_LOWER_THAN || if (src == OP_TYPE_GREATER_THAN) {
src == OP_TYPE_LOWER_EQUAL) { *dst = QUERY_GREATER_THAN;
*dst = QUERY_RANGE; } else if (src == OP_TYPE_GREATER_EQUAL) {
*dst = QUERY_GREATER_EQUAL;
} else if (src == OP_TYPE_LOWER_THAN) {
*dst = QUERY_LESS_THAN;
} else if (src == OP_TYPE_LOWER_EQUAL) {
*dst = QUERY_LESS_EQUAL;
} else if (src == OP_TYPE_EQUAL) { } else if (src == OP_TYPE_EQUAL) {
*dst = QUERY_TERM; *dst = QUERY_TERM;
} else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) { } else if (src == OP_TYPE_LIKE || src == OP_TYPE_MATCH || src == OP_TYPE_NMATCH) {
...@@ -249,9 +254,6 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu ...@@ -249,9 +254,6 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) { static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName, SIndexTerm *tm = indexTermCreate(left->suid, DEFAULT, operType, left->colValType, left->colName,
strlen(left->colName), right->condValue, strlen(right->condValue)); strlen(left->colName), right->condValue, strlen(right->condValue));
if (operType == OP_TYPE_LOWER_EQUAL || operType == OP_TYPE_GREATER_EQUAL || operType == OP_TYPE_GREATER_THAN ||
operType == OP_TYPE_LOWER_THAN) {
}
if (tm == NULL) { if (tm == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
} }
......
...@@ -52,7 +52,6 @@ typedef struct FstRange { ...@@ -52,7 +52,6 @@ typedef struct FstRange {
uint64_t end; uint64_t end;
} FstRange; } FstRange;
typedef enum { GE, GT, LE, LT } RangeType;
typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State; typedef enum { OneTransNext, OneTrans, AnyTrans, EmptyFinal } State;
typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType; typedef enum { Ordered, OutOfOrdered, DuplicateKey } OrderType;
...@@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date, ...@@ -174,9 +173,9 @@ Output fstStateFinalOutput(FstState* state, uint64_t version, FstSlice* date,
uint64_t fstStateFindInput(FstState* state, FstNode* node, uint8_t b, bool* null); uint64_t fstStateFindInput(FstState* state, FstNode* node, uint8_t b, bool* null);
#define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext) #define FST_STATE_ONE_TRNAS_NEXT(node) (node->state.state == OneTransNext)
#define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans) #define FST_STATE_ONE_TRNAS(node) (node->state.state == OneTrans)
#define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans) #define FST_STATE_ANY_TRANS(node) (node->state.state == AnyTrans)
#define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal) #define FST_STATE_EMPTY_FINAL(node) (node->state.state == EmptyFinal)
typedef struct FstLastTransition { typedef struct FstLastTransition {
uint8_t inp; uint8_t inp;
......
...@@ -34,6 +34,7 @@ ...@@ -34,6 +34,7 @@
extern "C" { extern "C" {
#endif #endif
typedef enum { LT, LE, GT, GE } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType; typedef enum { kTypeValue, kTypeDeletion } STermValueType;
typedef struct SIndexStat { typedef struct SIndexStat {
...@@ -86,7 +87,6 @@ typedef struct SIndexTerm { ...@@ -86,7 +87,6 @@ typedef struct SIndexTerm {
int32_t nColName; int32_t nColName;
char* colVal; char* colVal;
int32_t nColVal; int32_t nColVal;
int8_t qType; // just use for range
} SIndexTerm; } SIndexTerm;
typedef struct SIndexTermQuery { typedef struct SIndexTermQuery {
......
...@@ -262,7 +262,6 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryT ...@@ -262,7 +262,6 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, int8_t queryT
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(tm->colVal, colVal, nColVal); memcpy(tm->colVal, colVal, nColVal);
tm->nColVal = nColVal; tm->nColVal = nColVal;
tm->qType = queryType;
return tm; return tm;
} }
......
...@@ -38,10 +38,29 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S ...@@ -38,10 +38,29 @@ static int32_t cacheSearchTerm(void* cache, CacheTerm* ct, SIdxTempResult* tr, S
static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchPrefix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchSuffix(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s); static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s);
/*comm func of compare, used in (LE/LT/GE/GT compare)*/
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
RangeType type);
typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { return MATCH; }
static TExeCond (*rangeCompare[])(void* a, void* b, int8_t type) = {tCompareLessThan, tCompareLessEqual,
tCompareGreaterThan, tCompareGreaterEqual};
static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = { static int32_t (*cacheSearch[])(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) = {
cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchRange}; cacheSearchTerm, cacheSearchPrefix, cacheSearchSuffix, cacheSearchRegex, cacheSearchLessThan,
cacheSearchLessEqual, cacheSearchGreaterThan, cacheSearchGreaterEqual, cacheSearchRange};
static void doMergeWork(SSchedMsg* msg); static void doMergeWork(SSchedMsg* msg);
static bool indexCacheIteratorNext(Iterate* itera); static bool indexCacheIteratorNext(Iterate* itera);
...@@ -88,6 +107,52 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr, ...@@ -88,6 +107,52 @@ static int32_t cacheSearchRegex(void* cache, CacheTerm* ct, SIdxTempResult* tr,
// impl later // impl later
return 0; return 0;
} }
static int32_t cacheSearchCompareFunc(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s,
RangeType type) {
if (cache == NULL) {
return 0;
}
_cache_range_compare cmpFn = rangeCompare[type];
MemTable* mem = cache;
char* key = indexCacheTermGet(ct);
SSkipListIterator* iter = tSkipListCreateIter(mem->mem);
while (tSkipListIterNext(iter)) {
SSkipListNode* node = tSkipListIterGet(iter);
if (node == NULL) {
break;
}
CacheTerm* c = (CacheTerm*)SL_GET_NODE_DATA(node);
TExeCond cond = cmpFn(c->colVal, ct->colVal, ct->colType);
if (cond == MATCH) {
if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
// taosArrayPush(result, &c->uid);
*s = kTypeValue;
} else if (c->operaType == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, c->uid)
}
} else if (cond == CONTINUE) {
} else if (cond == BREAK) {
break;
}
}
tSkipListDestroyIter(iter);
return TSDB_CODE_SUCCESS;
}
static int32_t cacheSearchLessThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, LT);
}
static int32_t cacheSearchLessEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, LE);
}
static int32_t cacheSearchGreaterThan(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, GT);
}
static int32_t cacheSearchGreaterEqual(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
return cacheSearchCompareFunc(cache, ct, tr, s, GE);
}
static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) { static int32_t cacheSearchRange(void* cache, CacheTerm* ct, SIdxTempResult* tr, STermValueType* s) {
// impl later // impl later
return 0; return 0;
......
...@@ -64,10 +64,17 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr); ...@@ -64,10 +64,17 @@ static int32_t tfSearchTerm(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchPrefix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchSuffix(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr); static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr);
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType ctype);
static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = { static int32_t (*tfSearch[])(void* reader, SIndexTerm* tem, SIdxTempResult* tr) = {
tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchRange}; tfSearchTerm, tfSearchPrefix, tfSearchSuffix, tfSearchRegex, tfSearchLessThan,
tfSearchLessEqual, tfSearchGreaterThan, tfSearchGreaterEqual, tfSearchRange};
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache)); TFileCache* tcache = taosMemoryCalloc(1, sizeof(TFileCache));
...@@ -299,6 +306,47 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr) ...@@ -299,6 +306,47 @@ static int32_t tfSearchRegex(void* reader, SIndexTerm* tem, SIdxTempResult* tr)
fstSliceDestroy(&key); fstSliceDestroy(&key);
return 0; return 0;
} }
static int32_t tfSearchCompareFunc(void* reader, SIndexTerm* tem, SIdxTempResult* tr, RangeType type) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0;
char* p = tem->colVal;
uint64_t sz = tem->nColVal;
if (hasJson) {
p = indexPackJsonData(tem);
sz = strlen(p);
}
SArray* offsets = taosArrayInit(16, sizeof(uint64_t));
AutomationCtx* ctx = automCtxCreate((void*)p, AUTOMATION_ALWAYS);
FstStreamBuilder* sb = fstSearch(((TFileReader*)reader)->fst, ctx);
FstSlice h = fstSliceCreate((uint8_t*)p, sz);
fstStreamBuilderSetRange(sb, &h, type);
fstSliceDestroy(&h);
StreamWithState* st = streamBuilderIntoStream(sb);
StreamWithStateResult* rt = NULL;
while ((rt = streamWithStateNextWith(st, NULL)) != NULL) {
taosArrayPush(offsets, &(rt->out.out));
swsResultDestroy(rt);
}
streamWithStateDestroy(st);
fstStreamBuilderDestroy(sb);
return TSDB_CODE_SUCCESS;
}
static int32_t tfSearchLessThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc(reader, tem, tr, LT);
}
static int32_t tfSearchLessEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc(reader, tem, tr, LE);
}
static int32_t tfSearchGreaterThan(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc(reader, tem, tr, GT);
}
static int32_t tfSearchGreaterEqual(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
return tfSearchCompareFunc(reader, tem, tr, GE);
}
static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) { static int32_t tfSearchRange(void* reader, SIndexTerm* tem, SIdxTempResult* tr) {
bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON); bool hasJson = INDEX_TYPE_CONTAIN_EXTERN_TYPE(tem->colType, TSDB_DATA_TYPE_JSON);
int ret = 0; int ret = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册