提交 ee5b67a9 编写于 作者: dengyihao's avatar dengyihao

enh(index): support index filter

上级 60b05c8a
...@@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC}) ...@@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
# ) # )
target_link_libraries(executor target_link_libraries(executor
PRIVATE os util common function parser planner qcom vnode scalar nodes PRIVATE os util common function parser planner qcom vnode scalar nodes index
) )
target_include_directories( target_include_directories(
...@@ -19,4 +19,4 @@ target_include_directories( ...@@ -19,4 +19,4 @@ target_include_directories(
#if(${BUILD_TEST}) #if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST}) #endif(${BUILD_TEST})
\ No newline at end of file
...@@ -15,7 +15,9 @@ ...@@ -15,7 +15,9 @@
#include "indexoperator.h" #include "indexoperator.h"
#include "executorimpl.h" #include "executorimpl.h"
#include "index.h"
#include "nodes.h" #include "nodes.h"
#include "tdatablock.h"
typedef struct SIFCtx { typedef struct SIFCtx {
int32_t code; int32_t code;
...@@ -48,11 +50,19 @@ typedef struct SIFCtx { ...@@ -48,11 +50,19 @@ typedef struct SIFCtx {
} while (0) } while (0)
typedef struct SIFParam { typedef struct SIFParam {
SArray * result;
SHashObj *pFilter; SHashObj *pFilter;
SArray *result;
char * condValue;
col_id_t colId;
int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
} SIFParam; } SIFParam;
typedef int32_t (*sif_func_t)(SNode *left, SNode *rigth, SIFParam *output); typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
// construct tag filter operator later // construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) { static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param; STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
...@@ -60,7 +70,10 @@ static void destroyTagFilterOperatorInfo(void *param) { ...@@ -60,7 +70,10 @@ static void destroyTagFilterOperatorInfo(void *param) {
static void sifFreeParam(SIFParam *param) { static void sifFreeParam(SIFParam *param) {
if (param == NULL) return; if (param == NULL) return;
taosArrayDestroy(param->result); taosArrayDestroy(param->result);
taosMemoryFree(param->condValue);
taosHashCleanup(param->pFilter);
} }
static int32_t sifGetOperParamNum(EOperatorType ty) { static int32_t sifGetOperParamNum(EOperatorType ty) {
...@@ -71,15 +84,70 @@ static int32_t sifGetOperParamNum(EOperatorType ty) { ...@@ -71,15 +84,70 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
} }
return 2; return 2;
} }
static int32_t sifValidateColumn(SColumnNode *cn) {
// add more check
if (cn == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cn->colType != COLUMN_TYPE_TAG) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
static int32_t sifGetValueFromNode(SNode *node, char **value) {
// covert data From snode;
SValueNode *vn = (SValueNode *)node;
char * pData = nodesGetValueFromNode(vn);
SDataType *pType = &vn->node.resType;
int32_t type = pType->type;
int32_t valLen = 0;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData);
if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
valLen = dataLen;
} else {
valLen = pType->bytes;
}
char *tv = taosMemoryCalloc(1, valLen + 1);
if (tv == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(tv, pData, valLen);
*value = tv;
return TSDB_CODE_SUCCESS;
}
static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
switch (nodeType(node)) { switch (nodeType(node)) {
case QUERY_NODE_VALUE: { case QUERY_NODE_VALUE: {
SValueNode *vn = (SValueNode *)node; SValueNode *vn = (SValueNode *)node;
SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue));
param->colId = -1;
break; break;
} }
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
SColumnNode *cn = (SColumnNode *)node; SColumnNode *cn = (SColumnNode *)node;
/*only support tag column*/
SIF_ERR_RET(sifValidateColumn(cn));
param->colId = cn->colId;
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
memcpy(param->colName, cn->colName, sizeof(cn->colName));
break; break;
} }
...@@ -89,7 +157,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) { ...@@ -89,7 +157,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList)); qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList));
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SIF_ERR_RET(scalarGenerateSetFromList((void **)&param->pFilter, node, nl->dataType.type));
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) { if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pFilter); taosHashCleanup(param->pFilter);
qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param)); qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
...@@ -163,58 +231,63 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu ...@@ -163,58 +231,63 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
qError("index-filter not support buildin function"); qError("index-filter not support buildin function");
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) { SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST);
// impl later
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t sifLessEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
return TSDB_CODE_SUCCESS; int id = OP_TYPE_LOWER_THAN;
return sifIndex(left, right, id, output);
} }
static int32_t sifGreaterThanFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_LOWER_EQUAL;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifGreaterEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
return TSDB_CODE_SUCCESS; int id = OP_TYPE_GREATER_THAN;
return sifIndex(left, right, id, output);
}
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_GREATER_EQUAL;
return sifIndex(left, right, id, output);
} }
static int32_t sifEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_EQUAL;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifNotEqualFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_NOT_EQUAL;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifInFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_IN;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifNotInFunc(SNode *left, SNode *right, SIFParam *output) { static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_NOT_IN;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifLikeFunc(SNode *left, SNode *right, SIFParam *output) { static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_LIKE;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifNotLikeFunc(SNode *left, SNode *right, SIFParam *output) { static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_NOT_LIKE;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_MATCH;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifNotMatchFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// impl later int id = OP_TYPE_NMATCH;
return TSDB_CODE_SUCCESS; return sifIndex(left, right, id, output);
} }
static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) { static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// add more except // add more except
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
...@@ -252,17 +325,18 @@ static sif_func_t sifGetOperFn(int32_t funcId) { ...@@ -252,17 +325,18 @@ static sif_func_t sifGetOperFn(int32_t funcId) {
return sifDefaultFunc; return sifDefaultFunc;
} }
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = 0; int32_t code = 0;
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
int32_t nParam = sifGetOperParamNum(node->opType); int32_t nParam = sifGetOperParamNum(node->opType);
if (nParam <= 1) { if (nParam <= 1) {
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
sif_func_t operFn = sifGetOperFn(node->opType); sif_func_t operFn = sifGetOperFn(node->opType);
return operFn(node->pLeft, node->pRight, output); return operFn(&params[0], nParam > 1 ? &params[1] : NULL, output);
_return: _return:
taosMemoryFree(params); taosMemoryFree(params);
SIF_RET(code); SIF_RET(code);
...@@ -335,7 +409,6 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) { ...@@ -335,7 +409,6 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
if (ctx->code) { if (ctx->code) {
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) { if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY; ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
......
...@@ -258,13 +258,13 @@ void indexOptsDestroy(SIndexOpts* opts) { ...@@ -258,13 +258,13 @@ void indexOptsDestroy(SIndexOpts* opts) {
* *
*/ */
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) { SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery)); SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) { if (mtq == NULL) {
return NULL; return NULL;
} }
p->opera = opera; mtq->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery)); mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p; return mtq;
} }
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) { void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) { for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
...@@ -282,23 +282,24 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde ...@@ -282,23 +282,24 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName, SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) { int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm))); SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
if (t == NULL) { if (tm == NULL) {
return NULL; return NULL;
} }
t->suid = suid; tm->suid = suid;
t->operType = oper; tm->operType = oper;
t->colType = colType; tm->colType = colType;
t->colName = (char*)taosMemoryCalloc(1, nColName + 1); tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
memcpy(t->colName, colName, nColName); memcpy(tm->colName, colName, nColName);
t->nColName = nColName; tm->nColName = nColName;
t->colVal = (char*)taosMemoryCalloc(1, nColVal + 1); tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(t->colVal, colVal, nColVal); memcpy(tm->colVal, colVal, nColVal);
t->nColVal = nColVal; tm->nColVal = nColVal;
return t;
return tm;
} }
void indexTermDestroy(SIndexTerm* p) { void indexTermDestroy(SIndexTerm* p) {
taosMemoryFree(p->colName); taosMemoryFree(p->colName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册