提交 d77c190e 编写于 作者: W wpan

support filter duplicated unit

上级 e270cf37
......@@ -21,10 +21,12 @@ extern "C" {
#endif
#include "texpr.h"
#include "hash.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
#define FILTER_DEFAULT_FIELD_SIZE 4
#define FILTER_DEFAULT_VALUE_SIZE 4
#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2
enum {
......@@ -175,7 +177,7 @@ typedef struct SFilterInfo {
SFilterUnit *units;
uint8_t *unitRes; // result
uint8_t *unitFlags; // got result
SFilterPCtx *pctx;
SFilterPCtx pctx;
} SFilterInfo;
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
......@@ -196,7 +198,7 @@ typedef struct SFilterInfo {
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint16_t *)(_t + 1) = idx1; *(uint16_t *)(_t + 3) = idx2; } while (0)
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RA_EXCLUDE) || FILTER_GET_FLAG(eflag,RA_EXCLUDE))))
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
......
......@@ -576,7 +576,7 @@ int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) {
return TSDB_CODE_SUCCESS;
}
int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) {
int32_t filterGetFiledByDesc(SFilterFields* fields, int32_t type, void *v) {
for (uint16_t i = 0; i < fields->num; ++i) {
if (0 == gDescCompare[type](fields->fields[i].desc, v)) {
return i;
......@@ -586,14 +586,36 @@ int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) {
return -1;
}
int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid) {
int32_t filterGetFiledByData(SFilterInfo *info, int32_t type, void *v, int32_t dataLen) {
if (type == FLD_TYPE_VALUE) {
if (info->pctx.valHash == false) {
qError("value hash is empty");
return -1;
}
void *hv = taosHashGet(info->pctx.valHash, v, dataLen);
if (hv) {
return *(int32_t *)hv;
}
}
return -1;
}
int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid, int32_t dataLen) {
int32_t idx = -1;
uint16_t *num;
num = &info->fields[type].num;
if (*num > 0 && type != FLD_TYPE_VALUE) {
idx = filterGetFiled(&info->fields[type], type, desc);
if (*num > 0) {
if (type == FLD_TYPE_COLUMN) {
idx = filterGetFiledByDesc(&info->fields[type], type, desc);
} else if (dataLen > 0 && FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) {
idx = filterGetFiledByData(info, type, data, dataLen);
}
}
if (idx < 0) {
......@@ -607,6 +629,14 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type,
info->fields[type].fields[idx].desc = desc;
info->fields[type].fields[idx].data = data;
++(*num);
if (data && dataLen > 0 && FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) {
if (info->pctx.valHash == NULL) {
info->pctx.valHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
}
taosHashPut(info->pctx.valHash, data, dataLen, &idx, sizeof(idx));
}
}
fid->type = type;
......@@ -616,7 +646,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type,
}
static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) {
filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid);
filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid, 0);
FILTER_SET_FLAG(field->flag, FLD_DESC_NO_FREE);
FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE);
......@@ -642,12 +672,26 @@ int32_t filterAddFieldFromNode(SFilterInfo *info, tExprNode *node, SFilterFieldI
node->pVal = NULL;
}
filterAddField(info, v, NULL, type, fid);
filterAddField(info, v, NULL, type, fid, 0);
return TSDB_CODE_SUCCESS;
}
int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right) {
int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right, uint16_t *uidx) {
if (FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) {
if (info->pctx.unitHash == NULL) {
info->pctx.unitHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, false);
} else {
int64_t v = 0;
FILTER_PACKAGE_UNIT_HASH_KEY(&v, optr, left->idx, right ? right->idx : -1);
void *hu = taosHashGet(info->pctx.unitHash, &v, sizeof(v));
if (hu) {
*uidx = *(uint16_t *)hu;
return TSDB_CODE_SUCCESS;
}
}
}
if (info->unitNum >= info->unitSize) {
uint16_t psize = info->unitSize;
info->unitSize += FILTER_DEFAULT_UNIT_SIZE;
......@@ -674,6 +718,14 @@ int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFi
assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN));
info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col);
*uidx = info->unitNum;
if (FILTER_GET_FLAG(info->options, FI_OPTION_NEED_UNIQE)) {
int64_t v = 0;
FILTER_PACKAGE_UNIT_HASH_KEY(&v, optr, left->idx, right ? right->idx : -1);
taosHashPut(info->pctx.unitHash, &v, sizeof(v), uidx, sizeof(*uidx));
}
++info->unitNum;
......@@ -702,8 +754,10 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
tVariant* var = tree->_node.pRight->pVal;
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(info, left));
int32_t len = 0;
uint16_t uidx = 0;
if (tree->_node.optr == TSDB_RELATION_IN && (!IS_VAR_DATA_TYPE(type))) {
if (tree->_node.optr == TSDB_RELATION_IN && (!IS_VAR_DATA_TYPE(type))) {
void *data = NULL;
convertFilterSetFromBinary((void **)&data, var->pz, var->nLen, type);
CHK_LRET(data == NULL, TSDB_CODE_QRY_APP_ERROR, "failed to convert in param");
......@@ -714,21 +768,23 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
void *fdata = NULL;
if (IS_VAR_DATA_TYPE(type)) {
uint32_t len = taosHashGetDataKeyLen((SHashObj *)data, p);
len = (int32_t)taosHashGetDataKeyLen((SHashObj *)data, p);
fdata = malloc(len + VARSTR_HEADER_SIZE);
varDataLen(fdata) = len;
memcpy(varDataVal(fdata), key, len);
len += VARSTR_HEADER_SIZE;
} else {
fdata = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(fdata, key);
len = tDataTypes[type].bytes;
}
filterAddField(info, NULL, fdata, FLD_TYPE_VALUE, &right);
filterAddField(info, NULL, fdata, FLD_TYPE_VALUE, &right, len);
filterAddUnit(info, TSDB_RELATION_EQUAL, &left, &right);
filterAddUnit(info, TSDB_RELATION_EQUAL, &left, &right, &uidx);
SFilterGroup fgroup = {0};
filterAddUnitToGroup(&fgroup, info->unitNum - 1);
filterAddUnitToGroup(&fgroup, uidx);
taosArrayPush(group, &fgroup);
......@@ -737,10 +793,10 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
} else {
filterAddFieldFromNode(info, tree->_node.pRight, &right);
filterAddUnit(info, tree->_node.optr, &left, &right);
filterAddUnit(info, tree->_node.optr, &left, &right, &uidx);
SFilterGroup fgroup = {0};
filterAddUnitToGroup(&fgroup, info->unitNum - 1);
filterAddUnitToGroup(&fgroup, uidx);
taosArrayPush(group, &fgroup);
}
......@@ -749,15 +805,25 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
}
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u) {
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u, uint16_t *uidx) {
SFilterFieldId left, right, *pright = &right;
int32_t type = FILTER_UNIT_DATA_TYPE(u);
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left);
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0);
SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DESC_NO_FREE);
if (u->right.type == FLD_TYPE_VALUE) {
filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right);
void *data = FILTER_UNIT_VAL_DATA(src, u);
if (IS_VAR_DATA_TYPE(type)) {
if (FILTER_UNIT_OPTR(u) == TSDB_RELATION_IN) {
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, 0);
} else {
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, varDataTLen(data));
}
} else {
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
}
t = FILTER_UNIT_RIGHT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DATA_NO_FREE);
} else {
......@@ -765,29 +831,32 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u
}
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright);
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright, uidx);
}
int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMCtx *ctx, uint16_t cidx, SFilterGroup *g, int32_t optr, SArray *res) {
SFilterFieldId left, right;
uint16_t uidx = 0;
SFilterField *col = FILTER_GET_COL_FIELD(src, cidx);
filterAddColFieldFromField(dst, col, &left);
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left));
if (optr == TSDB_RELATION_AND) {
if (ctx->isnull) {
assert(ctx->notnull == false && ctx->isrange == false);
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL, &uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
}
if (ctx->notnull) {
assert(ctx->isnull == false && ctx->isrange == false);
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL, &uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
}
......@@ -803,14 +872,13 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
assert(!((FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (FILTER_GET_FLAG(ra->eflag, RA_NULL))));
if ((!FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (!FILTER_GET_FLAG(ra->eflag, RA_NULL))) {
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left));
__compar_fn_t func = getComparFunc(type, 0);
if (func(&ra->s, &ra->e) == 0) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
return TSDB_CODE_SUCCESS;
}
}
......@@ -818,17 +886,17 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
if (!FILTER_GET_FLAG(ra->sflag, RA_NULL)) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
}
if (!FILTER_GET_FLAG(ra->eflag, RA_NULL)) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &ra->e);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
}
return TSDB_CODE_SUCCESS;
......@@ -842,8 +910,8 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
assert(ctx->isnull || ctx->notnull || ctx->isrange);
if (ctx->isnull) {
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL, &uidx);
filterAddUnitToGroup(g, uidx);
taosArrayPush(res, g);
}
......@@ -851,8 +919,8 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
assert(!ctx->isrange);
memset(g, 0, sizeof(*g));
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL, &uidx);
filterAddUnitToGroup(g, uidx);
taosArrayPush(res, g);
}
......@@ -868,14 +936,13 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
memset(g, 0, sizeof(*g));
if ((!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) &&(!FILTER_GET_FLAG(r->ra.eflag, RA_NULL))) {
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left));
__compar_fn_t func = getComparFunc(type, 0);
if (func(&r->ra.s, &r->ra.e) == 0) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
taosArrayPush(res, g);
......@@ -885,15 +952,19 @@ int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMC
}
if (!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) {
filterAddField(dst, NULL, &r->ra.s, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
}
if (!FILTER_GET_FLAG(r->ra.eflag, RA_NULL)) {
filterAddField(dst, NULL, &r->ra.e, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &r->ra.e);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right, tDataTypes[type].bytes);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right, &uidx);
filterAddUnitToGroup(g, uidx);
}
assert (g->unitNum > 0);
......@@ -1667,6 +1738,7 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t gResNum
SFilterGroupCtx *res = NULL;
SFilterColInfo *colInfo = NULL;
int32_t optr = 0;
uint16_t uidx = 0;
memset(info, 0, sizeof(*info));
......@@ -1690,8 +1762,8 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t gResNum
for (int32_t n = 0; n < usize; ++n) {
SFilterUnit* u = taosArrayGetP((SArray *)colInfo->info, n);
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
filterAddUnitFromUnit(info, &oinfo, u, &uidx);
filterAddUnitToGroup(&ng, uidx);
}
continue;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册