提交 e270cf37 编写于 作者: W wpan

code refact

上级 d5a8aedb
......@@ -5051,7 +5051,7 @@ static int32_t getQueryTimeRange(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr
goto _ret;
}
ret = filterInitFromTree(p, &filter, FILTER_NO_REWRITE);
ret = filterInitFromTree(p, &filter, FI_OPTION_NO_REWRITE|FI_OPTION_TIMESTAMP);
if (ret != TSDB_CODE_SUCCESS) {
goto _ret;
}
......
......@@ -38,28 +38,31 @@ enum {
enum {
MR_ST_START = 1,
MR_ST_FIN = 2,
MR_ALL = 4,
MR_NONE = 8,
MR_ST_ALL = 4,
MR_ST_EMPTY = 8,
};
enum {
MR_OPT_TS = 1,
RA_EXCLUDE = 1,
RA_NULL = 2,
};
enum {
RA_EXCLUDE = 1,
RA_NULL = 2,
FI_OPTION_NO_REWRITE = 1,
FI_OPTION_TIMESTAMP = 2,
FI_OPTION_NEED_UNIQE = 4,
};
enum {
FILTER_ALL = 1,
FILTER_NONE = 2,
FILTER_NO_REWRITE = 4,
FI_STATUS_ALL = 1,
FI_STATUS_EMPTY = 2,
FI_STATUS_REWRITE = 4,
};
enum {
RANGE_TYPE_UNIT = 1,
RANGE_TYPE_COL_RANGE,
RANGE_TYPE_COL_RANGE = 2,
RANGE_TYPE_MR_CTX = 3,
};
typedef struct OptrStr {
......@@ -78,6 +81,7 @@ typedef struct SFilterColRange {
uint16_t idx; //column field idx
bool isNull;
bool notNull;
bool isRange;
SFilterRange ra;
} SFilterColRange;
......@@ -92,6 +96,9 @@ typedef struct SFilterRMCtx {
int32_t type;
int32_t options;
int8_t status;
bool isnull;
bool notnull;
bool isrange;
__compar_fn_t pCompareFunc;
SFilterRangeNode *rf; //freed
SFilterRangeNode *rs;
......@@ -124,15 +131,20 @@ typedef struct SFilterGroup {
typedef struct SFilterColInfo {
uint8_t type;
int32_t dataType;
void *info;
} SFilterColInfo;
typedef struct SFilterGroupCtx {
uint16_t colNum;
uint16_t *colIdx;
SArray **colInfo;
SFilterColInfo *colInfo;
} SFilterGroupCtx;
typedef struct SFilterColCtx {
uint16_t colIdx;
void* ctx;
} SFilterColCtx;
typedef struct SFilterCompare {
__compar_fn_t pCompareFunc;
......@@ -146,8 +158,15 @@ typedef struct SFilterUnit {
SFilterFieldId right;
} SFilterUnit;
typedef struct SFilterPCtx {
SHashObj *colHash;
SHashObj *valHash;
SHashObj *unitHash;
} SFilterPCtx;
typedef struct SFilterInfo {
uint32_t flags;
uint32_t options;
uint32_t status;
uint16_t unitSize;
uint16_t unitNum;
uint16_t groupNum;
......@@ -156,6 +175,7 @@ typedef struct SFilterInfo {
SFilterUnit *units;
uint8_t *unitRes; // result
uint8_t *unitFlags; // got result
SFilterPCtx *pctx;
} SFilterInfo;
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
......@@ -165,10 +185,10 @@ typedef struct SFilterInfo {
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
#define SET_AND_OPTR(o) do {if (o == TSDB_RELATION_ISNULL) { isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { if (!isrange) { notnull = true; } } else { isrange = true; notnull = false; } } while (0)
#define SET_OR_OPTR(o) do {if (o == TSDB_RELATION_ISNULL) { isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { notnull = true; } } while (0)
#define CHK_OR_OPTR() (isnull == true && notnull == true)
#define CHK_AND_OPTR() (isnull == true && ((notnull == true) || (isrange == true)))
#define SET_AND_OPTR(ctx, o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { if (!(ctx)->isrange) { (ctx)->notnull = true; } } else { (ctx)->isrange = true; (ctx)->notnull = false; } } while (0)
#define SET_OR_OPTR(ctx,o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { (ctx)->notnull = true; (ctx)->isrange = false; } else { if (!(ctx)->notnull) { (ctx)->isrange = true; } } } while (0)
#define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true)
#define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true)))
#define FILTER_GET_FLAG(st, f) (st & f)
......@@ -206,11 +226,6 @@ typedef struct SFilterInfo {
#define FILTER_GET_VAL_FIELD_DATA(fi) ((fi)->data)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_PUSH_UNIT(colInfo, u) do { SFilterColInfo* _info = malloc(sizeof(SFilterColInfo)); _info->type = RANGE_TYPE_UNIT; _info->info = u; taosArrayPush((SArray *)(colInfo), &_info);} while (0)
#define FILTER_PUSH_RANGE(colInfo, cra) do { SFilterColInfo* _info = malloc(sizeof(SFilterColInfo)); _info->type = RANGE_TYPE_COL_RANGE; _info->info = cra; taosArrayPush((SArray *)(colInfo), &_info);} while (0)
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0)
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left)
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
......@@ -227,6 +242,14 @@ typedef struct SFilterInfo {
#define FILTER_UNIT_GET_R(i, idx) ((i)->unitRes[idx])
#define FILTER_UNIT_SET_R(i, idx, v) (i)->unitRes[idx] = (v)
#define FILTER_PUSH_UNIT(colInfo, u) do { (colInfo).type = RANGE_TYPE_UNIT; (colInfo).dataType = FILTER_UNIT_DATA_TYPE(u);taosArrayPush((SArray *)((colInfo).info), &u);} while (0)
#define FILTER_PUSH_RANGE(colInfo, cra) do { SFilterColInfo* _info = malloc(sizeof(SFilterColInfo)); _info->type = RANGE_TYPE_COL_RANGE; _info->info = cra; taosArrayPush((SArray *)(colInfo), &_info);} while (0)
#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0)
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0)
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
......
......@@ -122,6 +122,10 @@ int32_t filterResetMergeRangeCtx(SFilterRMCtx *ctx) {
return TSDB_CODE_SUCCESS;
}
ctx->isnull = false;
ctx->notnull = false;
ctx->isrange = false;
SFilterRangeNode *r = ctx->rf;
while (r && r->next) {
......@@ -169,14 +173,35 @@ int32_t filterPostProcessRange(SFilterRMCtx *cur, SFilterRange *ra, bool *notNul
return TSDB_CODE_SUCCESS;
}
int32_t filterAddMergeOptr(void* h, uint8_t raOptr, int32_t optr, bool *empty, bool *all) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
if (optr == TSDB_RELATION_AND) {
SET_AND_OPTR(ctx, raOptr);
if (CHK_AND_OPTR(ctx)) {
FILTER_SET_FLAG(ctx->status, MR_ST_EMPTY);
*empty = true;
}
} else {
SET_OR_OPTR(ctx, raOptr);
if (CHK_OR_OPTR(ctx)) {
FILTER_SET_FLAG(ctx->status, MR_ST_ALL);
*all = true;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t filterAddMergeRangeImpl(void* h, SFilterRange* ra, int32_t optr) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
if (ctx->rs == NULL) {
if ((FILTER_GET_FLAG(ctx->status, MR_ST_START) == 0)
|| (FILTER_GET_FLAG(ctx->status, MR_ALL) && (optr == TSDB_RELATION_AND))
|| ((!FILTER_GET_FLAG(ctx->status, MR_ALL)) && (optr == TSDB_RELATION_OR))) {
|| (FILTER_GET_FLAG(ctx->status, MR_ST_ALL) && (optr == TSDB_RELATION_AND))
|| ((!FILTER_GET_FLAG(ctx->status, MR_ST_ALL)) && (optr == TSDB_RELATION_OR))) {
APPEND_RANGE(ctx, ctx->rs, ra);
FILTER_SET_FLAG(ctx->status, MR_ST_START);
}
......@@ -298,8 +323,12 @@ int32_t filterAddMergeRangeImpl(void* h, SFilterRange* ra, int32_t optr) {
bool notnull;
filterPostProcessRange(ctx, &ctx->rs->ra, &notnull);
if (notnull) {
bool all = false;
FREE_FROM_RANGE(ctx, ctx->rs);
FILTER_SET_FLAG(ctx->status, MR_ALL);
filterAddMergeOptr(h, TSDB_RELATION_NOTNULL, optr, NULL, &all);
if (all) {
FILTER_SET_FLAG(ctx->status, MR_ST_ALL);
}
}
}
......@@ -322,10 +351,21 @@ int32_t filterAddMergeRange(void* h, SFilterRange* ra, int32_t optr) {
return filterAddMergeRangeImpl(h, ra, optr);
}
int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) {
SFilterRMCtx *dctx = (SFilterRMCtx *)dst;
SFilterRMCtx *sctx = (SFilterRMCtx *)src;
if (optr == TSDB_RELATION_AND) {
dctx->isnull &= sctx->isnull;
dctx->notnull &= sctx->notnull;
dctx->isrange &= sctx->isrange;
} else {
dctx->isnull |= sctx->isnull;
dctx->notnull |= sctx->notnull;
dctx->isrange |= sctx->isrange;
}
if (sctx->rs == NULL) {
return TSDB_CODE_SUCCESS;
}
......@@ -340,6 +380,33 @@ int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) {
return TSDB_CODE_SUCCESS;
}
int32_t filterCopyMergeRangeCtx(void *dst, void *src) {
SFilterRMCtx *dctx = (SFilterRMCtx *)dst;
SFilterRMCtx *sctx = (SFilterRMCtx *)src;
dctx->status = sctx->status;
dctx->isnull = sctx->isnull;
dctx->notnull = sctx->notnull;
dctx->isrange = sctx->isrange;
SFilterRangeNode *r = sctx->rs;
SFilterRangeNode *dr = dctx->rs;
while (r) {
APPEND_RANGE(dctx, dr, &r->ra);
if (dr == NULL) {
dr = dctx->rs;
} else {
dr = dr->next;
}
r = r->next;
}
return TSDB_CODE_SUCCESS;
}
int32_t filterFinMergeRange(void* h) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
......@@ -348,7 +415,7 @@ int32_t filterFinMergeRange(void* h) {
return TSDB_CODE_SUCCESS;
}
if (FILTER_GET_FLAG(ctx->options, MR_OPT_TS)) {
if (FILTER_GET_FLAG(ctx->options, FI_OPTION_TIMESTAMP)) {
SFilterRangeNode *r = ctx->rs;
SFilterRangeNode *rn = NULL;
......@@ -414,6 +481,41 @@ int32_t filterGetMergeRangeRes(void* h, SFilterRange *ra) {
return TSDB_CODE_SUCCESS;
}
int32_t filterSourceRangeFromCtx(SFilterRMCtx *ctx, void *sctx, int32_t optr, bool *empty, bool *all) {
SFilterRMCtx *src = (SFilterRMCtx *)sctx;
if (src->isnull){
filterAddMergeOptr(ctx, TSDB_RELATION_ISNULL, optr, empty, all);
if (FILTER_GET_FLAG(ctx->status, MR_ST_ALL)) {
*all = true;
}
}
if (src->notnull) {
filterAddMergeOptr(ctx, TSDB_RELATION_NOTNULL, optr, empty, all);
if (FILTER_GET_FLAG(ctx->status, MR_ST_ALL)) {
*all = true;
}
}
if (src->isrange) {
filterAddMergeOptr(ctx, 0, optr, empty, all);
if (!(optr == TSDB_RELATION_OR && ctx->notnull)) {
filterAddMergeRangeCtx(ctx, src, optr);
}
if (FILTER_GET_FLAG(ctx->status, MR_ST_ALL)) {
*all = true;
}
}
return TSDB_CODE_SUCCESS;
}
int32_t filterFreeMergeRange(void* h) {
if (h == NULL) {
return TSDB_CODE_SUCCESS;
......@@ -435,7 +537,7 @@ int32_t filterFreeMergeRange(void* h) {
}
int32_t filterMergeGroup(SFilterGroup *gp1, SFilterGroup *gp2, SArray* group) {
int32_t filterDetachCnfGroup(SFilterGroup *gp1, SFilterGroup *gp2, SArray* group) {
SFilterGroup gp = {0};
//TODO CHECK DUP
......@@ -453,7 +555,7 @@ int32_t filterMergeGroup(SFilterGroup *gp1, SFilterGroup *gp2, SArray* group) {
}
int32_t filterMergeGroups(SArray* group, SArray* left, SArray* right) {
int32_t filterDetachCnfGroups(SArray* group, SArray* left, SArray* right) {
int32_t leftSize = (int32_t)taosArrayGetSize(left);
int32_t rightSize = (int32_t)taosArrayGetSize(right);
......@@ -466,7 +568,7 @@ int32_t filterMergeGroups(SArray* group, SArray* left, SArray* right) {
for (int32_t r = 0; r < rightSize; ++r) {
SFilterGroup *gp2 = taosArrayGet(right, r);
filterMergeGroup(gp1, gp2, group);
filterDetachCnfGroup(gp1, gp2, group);
}
}
......@@ -513,7 +615,7 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type,
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t filterAddFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) {
static FORCE_INLINE int32_t filterAddColFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) {
filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid);
FILTER_SET_FLAG(field->flag, FLD_DESC_NO_FREE);
......@@ -648,67 +750,84 @@ int32_t filterAddGroupUnitFromNode(SFilterInfo *info, tExprNode* tree, SArray *g
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u) {
SFilterFieldId left, right;
SFilterFieldId left, right, *pright = &right;
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left);
filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right);
SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DESC_NO_FREE);
if (u->right.type == FLD_TYPE_VALUE) {
filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right);
t = FILTER_UNIT_RIGHT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DATA_NO_FREE);
} else {
pright = NULL;
}
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, &right);
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, pright);
}
int32_t filterAddGroupUnitFromRange(SFilterInfo *dst, SFilterInfo *src, SFilterColRange *cra, SFilterGroup *g, int32_t optr, SArray *res) {
int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRMCtx *ctx, uint16_t cidx, SFilterGroup *g, int32_t optr, SArray *res) {
SFilterFieldId left, right;
SFilterField *col = FILTER_GET_COL_FIELD(src, cra->idx);
SFilterField *col = FILTER_GET_COL_FIELD(src, cidx);
filterAddFieldFromField(dst, col, &left);
filterAddColFieldFromField(dst, col, &left);
if (optr == TSDB_RELATION_AND) {
if (cra->isNull) {
assert(cra->notNull == false);
if (ctx->isnull) {
assert(ctx->notnull == false && ctx->isrange == false);
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
return filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnitToGroup(g, dst->unitNum - 1);
return TSDB_CODE_SUCCESS;
}
if (cra->notNull) {
assert(cra->isNull == false);
if (ctx->notnull) {
assert(ctx->isnull == false && ctx->isrange == false);
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
return filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnitToGroup(g, dst->unitNum - 1);
return TSDB_CODE_SUCCESS;
}
assert(!((FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) && (FILTER_GET_FLAG(cra->ra.eflag, RA_NULL))));
if (!ctx->isrange) {
assert(ctx->isnull || ctx->notnull);
return TSDB_CODE_SUCCESS;
}
assert(ctx->rs && ctx->rs->next == NULL);
if ((!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) &&(!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL))) {
SFilterRange *ra = &ctx->rs->ra;
assert(!((FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (FILTER_GET_FLAG(ra->eflag, RA_NULL))));
if ((!FILTER_GET_FLAG(ra->sflag, RA_NULL)) && (!FILTER_GET_FLAG(ra->eflag, RA_NULL))) {
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left));
__compar_fn_t func = getComparFunc(type, 0);
if (func(&cra->ra.s, &cra->ra.e) == 0) {
if (func(&ra->s, &ra->e) == 0) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &cra->ra.s);
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right);
return filterAddUnitToGroup(g, dst->unitNum - 1);
filterAddUnitToGroup(g, dst->unitNum - 1);
return TSDB_CODE_SUCCESS;
}
}
if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) {
if (!FILTER_GET_FLAG(ra->sflag, RA_NULL)) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &cra->ra.s);
SIMPLE_COPY_VALUES(data, &ra->s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnit(dst, FILTER_GET_FLAG(ra->sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) {
if (!FILTER_GET_FLAG(ra->eflag, RA_NULL)) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &cra->ra.e);
SIMPLE_COPY_VALUES(data, &ra->e);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnit(dst, FILTER_GET_FLAG(ra->eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
......@@ -720,13 +839,16 @@ int32_t filterAddGroupUnitFromRange(SFilterInfo *dst, SFilterInfo *src, SFilterC
SFilterGroup ng = {0};
g = &ng;
if (cra->isNull) {
assert(ctx->isnull || ctx->notnull || ctx->isrange);
if (ctx->isnull) {
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
taosArrayPush(res, g);
}
if (cra->notNull) {
if (ctx->notnull) {
assert(!ctx->isrange);
memset(g, 0, sizeof(*g));
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
......@@ -734,39 +856,55 @@ int32_t filterAddGroupUnitFromRange(SFilterInfo *dst, SFilterInfo *src, SFilterC
taosArrayPush(res, g);
}
if (!ctx->isrange) {
assert(ctx->isnull || ctx->notnull);
g->unitNum = 0;
return TSDB_CODE_SUCCESS;
}
SFilterRangeNode *r = ctx->rs;
while (r) {
memset(g, 0, sizeof(*g));
if ((!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) &&(!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL))) {
if ((!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) &&(!FILTER_GET_FLAG(r->ra.eflag, RA_NULL))) {
int32_t type = FILTER_GET_COL_FIELD_TYPE(FILTER_GET_FIELD(dst, left));
__compar_fn_t func = getComparFunc(type, 0);
if (func(&cra->ra.s, &cra->ra.e) == 0) {
if (func(&r->ra.s, &r->ra.e) == 0) {
void *data = malloc(sizeof(int64_t));
SIMPLE_COPY_VALUES(data, &cra->ra.s);
SIMPLE_COPY_VALUES(data, &r->ra.s);
filterAddField(dst, NULL, data, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, TSDB_RELATION_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
taosArrayPush(res, g);
return TSDB_CODE_SUCCESS;
r = r->next;
continue;
}
}
if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.s, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
if (!FILTER_GET_FLAG(r->ra.sflag, RA_NULL)) {
filterAddField(dst, NULL, &r->ra.s, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.e, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
if (!FILTER_GET_FLAG(r->ra.eflag, RA_NULL)) {
filterAddField(dst, NULL, &r->ra.e, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(r->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (g->unitNum > 0) {
assert (g->unitNum > 0);
taosArrayPush(res, g);
r = r->next;
}
g->unitNum = 0;
return TSDB_CODE_SUCCESS;
}
......@@ -796,7 +934,7 @@ int32_t filterTreeToGroup(tExprNode* tree, SFilterInfo *info, SArray* group) {
ERR_JRET(filterTreeToGroup(tree->_node.pLeft, info, leftGroup));
ERR_JRET(filterTreeToGroup(tree->_node.pRight, info, rightGroup));
ERR_JRET(filterMergeGroups(group, leftGroup, rightGroup));
ERR_JRET(filterDetachCnfGroups(group, leftGroup, rightGroup));
taosArrayDestroyEx(leftGroup, filterFreeGroup);
taosArrayDestroyEx(rightGroup, filterFreeGroup);
......@@ -849,9 +987,13 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg) {
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
if (field->desc) {
tVariant *var = field->desc;
if (var->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
qDebug("VAL%d => [type:TS][val:[%" PRIi64"] - [%" PRId64 "]]", i, *(int64_t *)field->data, *(((int64_t *)field->data) + 1));
} else {
qDebug("VAL%d => [type:%d][val:%" PRIi64"]", i, var->nType, var->i64); //TODO
}
} else {
qDebug("VAL%d => [type:NIL][val:%" PRIi64" or %f]", i, *(int64_t *)field->data, *(double *)field->data); //TODO
qDebug("VAL%d => [type:NIL][val:0x%" PRIx64"]", i, *(int64_t *)field->data); //TODO
}
}
......@@ -894,6 +1036,36 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg) {
}
}
void filterFreeColInfo(void *data) {
SFilterColInfo* info = (SFilterColInfo *)data;
if (info->info == NULL) {
return;
}
if (info->type == RANGE_TYPE_COL_RANGE) {
tfree(info->info);
} else if (info->type == RANGE_TYPE_MR_CTX) {
filterFreeMergeRange(info->info);
} else if (info->type == RANGE_TYPE_UNIT) {
taosArrayDestroy((SArray *)info->info);
}
//NO NEED TO FREE UNIT
info->type = 0;
info->info = NULL;
}
void filterFreeColCtx(void *data) {
SFilterColCtx* ctx = (SFilterColCtx *)data;
if (ctx->ctx) {
filterFreeMergeRange(ctx->ctx);
}
}
void filterFreeGroupCtx(SFilterGroupCtx* gRes) {
if (gRes == NULL) {
return;
......@@ -904,8 +1076,8 @@ void filterFreeGroupCtx(SFilterGroupCtx* gRes) {
int16_t i = 0, j = 0;
while (i < gRes->colNum) {
if (gRes->colInfo[j]) {
taosArrayDestroy(gRes->colInfo[j]);
if (gRes->colInfo[j].info) {
filterFreeColInfo(&gRes->colInfo[j]);
++i;
}
......@@ -1066,71 +1238,75 @@ int32_t filterAddUnitRange(SFilterInfo *info, SFilterUnit* u, SFilterRMCtx *ctx,
return TSDB_CODE_SUCCESS;
}
int32_t filterCompareRMCtx(SFilterRMCtx *ctx1, SFilterRMCtx *ctx2, bool *equal) {
CHK_JMP(ctx1->status != ctx2->status);
CHK_JMP(ctx1->isnull != ctx2->isnull);
CHK_JMP(ctx1->notnull != ctx2->notnull);
CHK_JMP(ctx1->isrange != ctx2->isrange);
SFilterRangeNode *r1 = ctx1->rs;
SFilterRangeNode *r2 = ctx2->rs;
int32_t filterMergeBetweenColumns(SFilterInfo *info, SFilterGroupCtx* gRes, uint16_t colIdx) {
bool isnull = false, notnull = false, isrange = false;
int32_t num = 0;
SArray* colArray = gRes->colInfo[colIdx];
int32_t size = (int32_t)taosArrayGetSize(colArray);
SFilterRMCtx* cur = filterInitMergeRange(type, 0);
while (r1 && r2) {
CHK_JMP(r1->ra.sflag != r2->ra.sflag);
CHK_JMP(r1->ra.eflag != r2->ra.eflag);
CHK_JMP(r1->ra.s != r2->ra.s);
CHK_JMP(r1->ra.e != r2->ra.e);
for (uint32_t i = 0; i < size; ++i) {
SFilterColInfo* colInfo = taosArrayGet(colArray, i);
SFilterUnit* u = colInfo->info;
int32_t type = FILTER_UNIT_DATA_TYPE(u);
uint8_t optr = FILTER_UNIT_OPTR(u);
r1 = r1->next;
r2 = r2->next;
}
SET_AND_OPTR(optr);
CHK_JMP(CHK_AND_OPTR());
CHK_JMP(r1 != r2);
if (!FILTER_NO_MERGE_OPTR(optr)) {
filterAddUnitRange(info, u, cur, TSDB_RELATION_AND);
CHK_JMP(MR_EMPTY_RES(cur));
}
}
*equal = true;
SFilterColRange *cra = calloc(1, sizeof(*cra));
cra->idx = colIdx;
return TSDB_CODE_SUCCESS;
filterGetMergeRangeNum(cur, &num);
assert(num == 1 || num == 0);
_err_return:
*equal = false;
return TSDB_CODE_SUCCESS;
}
if (num == 1) {
filterGetMergeRangeRes(cur, &cra->ra);
filterPostProcessRange(cur, &cra->ra, &cra->notNull);
} else {
if (isnull) {
cra->isNull = true;
}
if (notnull) {
cra->notNull = true;
int32_t filterMergeUnits(SFilterInfo *info, SFilterGroupCtx* gRes, uint16_t colIdx, bool *empty) {
SArray* colArray = (SArray *)gRes->colInfo[colIdx].info;
int32_t size = (int32_t)taosArrayGetSize(colArray);
int32_t type = gRes->colInfo[colIdx].dataType;
SFilterRMCtx* ctx = filterInitMergeRange(type, 0);
for (uint32_t i = 0; i < size; ++i) {
SFilterUnit* u = taosArrayGetP(colArray, i);
uint8_t optr = FILTER_UNIT_OPTR(u);
filterAddMergeOptr(ctx, optr, TSDB_RELATION_AND, empty, NULL);
CHK_JMP(*empty);
if (!FILTER_NO_MERGE_OPTR(optr)) {
filterAddUnitRange(info, u, ctx, TSDB_RELATION_AND);
CHK_JMP(MR_EMPTY_RES(ctx));
}
}
taosArrayClear(colArray);
FILTER_PUSH_RANGE(colArray, cra);
taosArrayDestroy(colArray);
filterFreeMergeRange(cur);
FILTER_PUSH_CTX(gRes->colInfo[colIdx], ctx);
return TSDB_CODE_SUCCESS;
_err_return:
taosArrayDestroy(colArray);
gRes->colInfo[colIdx] = NULL;
*empty = true;
filterFreeMergeRange(cur);
filterFreeMergeRange(ctx);
return TSDB_CODE_SUCCESS;
}
int32_t filterMergeBetweenUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t* gResNum) {
bool emptyGroup = false;
int32_t filterMergeGroupUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t* gResNum) {
bool empty = false;
uint16_t *colIdx = malloc(info->fields[FLD_TYPE_COLUMN].num * sizeof(uint16_t));
uint16_t colIdxi = 0;
uint16_t gResIdx = 0;
......@@ -1139,52 +1315,44 @@ int32_t filterMergeBetweenUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32
SFilterGroup* g = info->groups + i;
gRes[gResIdx] = calloc(1, sizeof(SFilterGroupCtx));
gRes[gResIdx]->colInfo = calloc(info->fields[FLD_TYPE_COLUMN].num, POINTER_BYTES);
gRes[gResIdx]->colInfo = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(SFilterColInfo));
colIdxi = 0;
emptyGroup = false;
empty = false;
for (uint16_t j = 0; j < g->unitNum; ++j) {
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, j);
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
if (gRes[gResIdx]->colInfo[cidx] == NULL) {
gRes[gResIdx]->colInfo[cidx] = (SArray *)taosArrayInit(4, POINTER_BYTES);
if (gRes[gResIdx]->colInfo[cidx].info == NULL) {
gRes[gResIdx]->colInfo[cidx].info = (SArray *)taosArrayInit(4, POINTER_BYTES);
colIdx[colIdxi++] = cidx;
++gRes[gResIdx]->colNum;
} else {
FILTER_SET_FLAG(info->status, FI_STATUS_REWRITE);
}
FILTER_PUSH_UNIT(gRes[gResIdx]->colInfo[cidx], u);
}
if (colIdxi > 1) {
qsort(colIdx, colIdxi, sizeof(uint16_t), compareUint16Val);
qsort(colIdx, colIdxi, sizeof(uint16_t), getComparFunc(TSDB_DATA_TYPE_USMALLINT, 0));
}
for (uint16_t l = 0; l < colIdxi; ++l) {
SArray* colArray = gRes[gResIdx]->colInfo[colIdx[l]];
int32_t size = (int32_t)taosArrayGetSize(colArray);
if (size <= 1) {
continue;
}
SFilterColInfo* colInfo = taosArrayGet(colArray, 0);
SFilterUnit* u = colInfo->info;
int32_t type = FILTER_UNIT_DATA_TYPE(u);
int32_t type = gRes[gResIdx]->colInfo[colIdx[l]].dataType;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
continue;
}
filterMergeBetweenColumns(info, gRes[gResIdx], colIdx[l]);
filterMergeUnits(info, gRes[gResIdx], colIdx[l], &empty);
if (gRes[gResIdx]->colInfo[colIdx[l]] == NULL) {
emptyGroup = true;
if (empty) {
break;
}
}
if (emptyGroup) {
if (empty) {
filterFreeGroupCtx(gRes[gResIdx]);
gRes[gResIdx] = NULL;
......@@ -1200,165 +1368,190 @@ int32_t filterMergeBetweenUnits(SFilterInfo *info, SFilterGroupCtx** gRes, int32
*gResNum = gResIdx;
if (gResIdx == 0) {
FILTER_SET_FLAG(info->status, FI_STATUS_EMPTY);
}
return TSDB_CODE_SUCCESS;
}
void filterCheckColumns(SFilterGroupCtx* gRes1, SFilterGroupCtx* gRes2, bool *conflict) {
uint16_t idx1 = 0, idx2 = 0, m = 0, n = 0;
bool equal = false;
for (; m < gRes1->colNum; ++m) {
idx1 = gRes1->colIdx[m];
equal = false;
int32_t filterMergeTwoGroups(SFilterInfo *info, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum, SFilterGroupCtx** unitRes) {
bool isnull = false, notnull = false;
int32_t num = 0;
SFilterColRange *cra = NULL;
SFilterGroup *g = NULL;
SFilterUnit* u1 = FILTER_GROUP_UNIT(info, info->groups + id1, 0);
uint8_t optr = FILTER_UNIT_OPTR(u1);
uint16_t cidx = FILTER_UNIT_COL_IDX(u1);
uint16_t cidx2 = 0;
int32_t type = FILTER_UNIT_DATA_TYPE(u1);
SFilterRMCtx *cur = filterInitMergeRange(type, 0);
for (int32_t i = 0; i < info->groupNum; ++i) {
g = info->groups + i;
uint16_t unitNum = (unitRes && unitRes[i]) ? unitRes[i]->num : g->unitNum;
if (unitNum == 0) {
continue;
for (; n < gRes2->colNum; ++n) {
idx2 = gRes2->colIdx[n];
if (idx1 < idx2) {
*conflict = true;
return;
}
if (unitNum > 1) {
if (idx1 > idx2) {
continue;
}
if (unitRes && unitRes[i]) {
assert(taosArrayGetSize(unitRes[i]->colRange) == 1);
if (notnull) {
continue;
if (FILTER_NO_MERGE_DATA_TYPE(gRes1->colInfo[idx1].dataType)) {
*conflict = true;
return;
}
cra = taosArrayGet(unitRes[i]->colRange, 0);
++n;
equal = true;
break;
}
if (cra->notNull) {
notnull = true;
CHK_JMP(CHK_OR_OPTR());
if (!equal) {
*conflict = true;
return;
}
}
cidx2 = cra->idx;
*conflict = false;
return;
}
int32_t filterMergeTwoGroupsImpl(SFilterInfo *info, SFilterRMCtx **ctx, int32_t optr, uint16_t cidx, SFilterGroupCtx* gRes1, SFilterGroupCtx* gRes2, bool *empty, bool *all) {
SFilterField *fi = FILTER_GET_COL_FIELD(info, cidx);
int32_t type = FILTER_GET_COL_FIELD_TYPE(fi);
if ((*ctx) == NULL) {
*ctx = filterInitMergeRange(type, 0);
} else {
u1 = FILTER_GROUP_UNIT(info, g, 0);
type = FILTER_UNIT_DATA_TYPE(u1);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
continue;
filterReuseMergeRangeCtx(*ctx, type, 0);
}
optr = FILTER_UNIT_OPTR(u1);
assert(gRes2->colInfo[cidx].type == RANGE_TYPE_MR_CTX);
assert(gRes1->colInfo[cidx].type == RANGE_TYPE_MR_CTX);
cidx2 = FILTER_UNIT_COL_IDX(u1);
}
filterCopyMergeRangeCtx(*ctx, gRes2->colInfo[cidx].info);
filterSourceRangeFromCtx(*ctx, gRes1->colInfo[cidx].info, optr, empty, all);
if (cidx != cidx2) {
continue;
}
return TSDB_CODE_SUCCESS;
}
g->unitNum = 0;
if (unitRes && unitRes[i]) {
unitRes[i]->num = 0;
if (notnull) {
continue;
}
int32_t filterMergeTwoGroups(SFilterInfo *info, SFilterGroupCtx** gRes1, SFilterGroupCtx** gRes2, bool *all) {
bool conflict = false;
filterAddMergeRange(cur, &cra->ra, TSDB_RELATION_OR);
if (MR_EMPTY_RES(cur)) {
notnull = true;
CHK_JMP(CHK_OR_OPTR());
filterCheckColumns(*gRes1, *gRes2, &conflict);
if (conflict) {
return TSDB_CODE_SUCCESS;
}
FILTER_SET_FLAG(info->status, FI_STATUS_REWRITE);
uint16_t idx1 = 0, idx2 = 0, m = 0, n = 0;
bool numEqual = (*gRes1)->colNum == (*gRes2)->colNum;
bool equal = false;
uint16_t equal1 = 0, equal2 = 0, merNum = 0;
SFilterRMCtx *ctx = NULL;
SFilterColCtx colCtx = {0};
SArray* colCtxs = taosArrayInit((*gRes2)->colNum, sizeof(SFilterColCtx));
for (; m < (*gRes1)->colNum; ++m) {
idx1 = (*gRes1)->colIdx[m];
for (; n < (*gRes2)->colNum; ++n) {
idx2 = (*gRes2)->colIdx[n];
if (idx1 > idx2) {
continue;
}
optr = FILTER_UNIT_OPTR(u1);
SET_OR_OPTR(optr);
CHK_JMP(CHK_OR_OPTR());
assert(idx1 == idx2);
if (!FILTER_NO_MERGE_OPTR(optr) && notnull == false) {
filterAddUnitRange(info, u1, cur, TSDB_RELATION_OR);
if (MR_EMPTY_RES(cur)) {
notnull = true;
CHK_JMP(CHK_OR_OPTR());
}
}
}
++merNum;
filterMergeTwoGroupsImpl(info, &ctx, TSDB_RELATION_OR, idx1, *gRes1, *gRes2, NULL, all);
SFilterColRange ncra;
SFilterRange *ra;
ncra.idx = cidx;
CHK_JMP(*all);
ncra.isNull = isnull;
ncra.notNull = notnull;
if (numEqual) {
if ((*gRes1)->colNum == 1) {
++equal1;
colCtx.colIdx = idx1;
colCtx.ctx = ctx;
taosArrayPush(colCtxs, &colCtx);
break;
} else {
filterCompareRMCtx(ctx, (*gRes1)->colInfo[idx1].info, &equal);
if (equal) {
++equal1;
}
if (isnull) {
--removedNum;
filterCompareRMCtx(ctx, (*gRes2)->colInfo[idx2].info, &equal);
if (equal) {
++equal2;
}
if (!notnull) {
filterGetMergeRangeNum(cur, &num);
if (num > 0) {
if (num > 1) {
ra = calloc(num, sizeof(*ra));
CHK_JMP(equal1 != merNum && equal2 != merNum);
colCtx.colIdx = idx1;
colCtx.ctx = ctx;
ctx = NULL;
taosArrayPush(colCtxs, &colCtx);
}
} else {
ra = &ncra.ra;
filterCompareRMCtx(ctx, (*gRes1)->colInfo[idx1].info, &equal);
if (equal) {
++equal1;
}
filterGetMergeRangeRes(cur, ra);
CHK_JMP(equal1 != merNum);
colCtx.colIdx = idx1;
colCtx.ctx = ctx;
ctx = NULL;
taosArrayPush(colCtxs, &colCtx);
}
SFilterRange *tra = ra;
for (int32_t i = 0; i < num; ++i) {
filterPostProcessRange(cur, tra, &ncra.notNull);
assert(ncra.notNull == false);
++tra;
++n;
break;
}
}
if (num > 1) {
for (int32_t i = 0; i < num; ++i) {
FILTER_COPY_RA(&ncra.ra, ra);
assert(merNum > 0);
taosArrayPush(res, &ncra);
SFilterColInfo *colInfo = NULL;
assert (merNum == equal1 || merNum == equal2);
++ra;
}
} else {
taosArrayPush(res, &ncra);
}
} else {
ncra.ra.sflag = RA_NULL;
ncra.ra.eflag = RA_NULL;
taosArrayPush(res, &ncra);
}
} else {
ncra.ra.sflag = RA_NULL;
ncra.ra.eflag = RA_NULL;
taosArrayPush(res, &ncra);
filterFreeGroupCtx(*gRes2);
*gRes2 = NULL;
assert(colCtxs && taosArrayGetSize(colCtxs) > 0);
int32_t ctxSize = (int32_t)taosArrayGetSize(colCtxs);
SFilterColCtx *pctx = NULL;
for (int32_t i = 0; i < ctxSize; ++i) {
pctx = taosArrayGet(colCtxs, i);
colInfo = &(*gRes1)->colInfo[pctx->colIdx];
filterFreeColInfo(colInfo);
FILTER_PUSH_CTX((*gRes1)->colInfo[pctx->colIdx], pctx->ctx);
}
filterFreeMergeRange(cur);
taosArrayDestroy(colCtxs);
return TSDB_CODE_SUCCESS;
_err_return:
FILTER_SET_FLAG(info->flags, FILTER_ALL);
if (colCtxs && taosArrayGetSize(colCtxs) > 0) {
taosArrayDestroyEx(colCtxs, filterFreeColCtx);
}
filterFreeMergeRange(cur);
filterFreeMergeRange(ctx);
return TSDB_CODE_SUCCESS;
}
int32_t filterMergeBetweenGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t *gResNum) {
int32_t filterMergeGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t *gResNum) {
if (*gResNum <= 1) {
return TSDB_CODE_SUCCESS;
}
......@@ -1368,11 +1561,12 @@ int32_t filterMergeBetweenGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int3
int32_t pEnd = 0, cStart = 0, cEnd = 0;
uint16_t pColNum = 0, cColNum = 0;
int32_t movedNum = 0;
bool all = false;
cColNum = gRes[0]->colNum;
for (int32_t i = 1; i < *gResNum; ++i) {
if (gRes[i]->colNum == cColNum) {
for (int32_t i = 1; i <= *gResNum; ++i) {
if (i < (*gResNum) && gRes[i]->colNum == cColNum) {
continue;
}
......@@ -1382,124 +1576,64 @@ int32_t filterMergeBetweenGroups(SFilterInfo *info, SFilterGroupCtx** gRes, int3
if (pColNum > 0) {
for (int32_t m = 0; m <= pEnd; ++m) {
for (int32_t n = cStart; n <= cEnd; ++n) {
filterMergeTwoGroups(&gRes[m], &gRes[n]);
assert(m < n);
filterMergeTwoGroups(info, &gRes[m], &gRes[n], &all);
CHK_JMP(all);
if (gRes[n] == NULL) {
if (n < ((*gResNum) - 1)) {
memmove(&gRes[n], &gRes[n+1], (*gResNum-n-1) * POINTER_BYTES);
}
--cEnd;
--(*gResNum);
++movedNum;
--n;
}
}
}
}
for (int32_t m = cStart; m <= cEnd; ++m) {
for (int32_t n = cStart + 1; n <= cEnd; ++n) {
filterMergeTwoGroups(&gRes[m], &gRes[n]);
}
}
for (int32_t m = cStart; m < cEnd; ++m) {
for (int32_t n = m + 1; n <= cEnd; ++n) {
assert(m < n);
filterMergeTwoGroups(info, &gRes[m], &gRes[n], &all);
pColNum = cColNum;
pEnd = cEnd;
CHK_JMP(all);
i -= movedNum;
cStart = i;
cEnd = i;
cColNum = gRes[i]->colNum;
if (gRes[n] == NULL) {
if (n < ((*gResNum) - 1)) {
memmove(&gRes[n], &gRes[n+1], (*gResNum-n-1) * POINTER_BYTES);
}
int32_t *col = NULL;
uint16_t cidx;
uint16_t removedNum = 0;
bool merged = false;
SArray *colRange = NULL;
for (uint16_t i = 0; i < info->groupNum; ++i) {
SFilterGroup* g = info->groups + i;
uint16_t unitNum = g->unitNum;
if (unitRes && unitRes[i]) {
unitNum = unitRes[i]->num;
merged = true;
--cEnd;
--(*gResNum);
++movedNum;
--n;
}
if (unitNum == 0) {
++removedNum;
continue;
}
if (unitNum > 1) {
continue;
}
if (unitRes && unitRes[i]) {
assert(taosArrayGetSize(unitRes[i]->colRange) == 1);
SFilterColRange *ra = taosArrayGet(unitRes[i]->colRange, 0);
cidx = ra->idx;
} else {
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, 0);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
continue;
}
pColNum = cColNum;
pEnd = cEnd;
cidx = FILTER_UNIT_COL_IDX(u);
}
i -= movedNum;
if (col == NULL) {
if (i < (info->groupNum - 1)) {
col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t));
} else {
if (i >= (*gResNum)) {
break;
}
}
if (col[cidx] == 0) {
col[cidx] = i + 1;
continue;
} else if (col[cidx] == -1) {
continue;
} else {
if (colRange == NULL) {
colRange = taosArrayInit(4, sizeof(SFilterColRange));
}
removedNum += 2;
filterProcessGroupsSameColumn(info, col[cidx] - 1, i, colRange, &removedNum, unitRes);
CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL));
col[cidx] = -1;
}
}
uint16_t num = info->groupNum - removedNum;
assert(num >= 0);
if (colRange) {
num += taosArrayGetSize(colRange);
}
if (num == 0) {
FILTER_SET_FLAG(info->flags, FILTER_NONE);
goto _err_return;
}
if (colRange || removedNum > 0 || merged) {
groupRes->num = num;
groupRes->col = col;
groupRes->colRange = colRange;
} else {
tfree(col);
cStart = i;
cEnd = i;
cColNum = gRes[i]->colNum;
}
return TSDB_CODE_SUCCESS;
_err_return:
tfree(gColNum);
FILTER_SET_FLAG(info->status, FI_STATUS_ALL);
return TSDB_CODE_SUCCESS;
}
......@@ -1522,163 +1656,55 @@ int32_t filterConvertGroupFromArray(SFilterInfo *info, SArray* group) {
return TSDB_CODE_SUCCESS;
}
int32_t filterCheckRangeCoverage(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx** uctx) {
if (gctx->num == 0) {
int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx** gRes, int32_t gResNum) {
if (!FILTER_GET_FLAG(info->status, FI_STATUS_REWRITE)) {
qDebug("no need rewrite");
return TSDB_CODE_SUCCESS;
}
SFilterInfo oinfo = *info;
uint16_t gNum = 0;
SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup));
SFilterGroupCtx *res = NULL;
SFilterColInfo *colInfo = NULL;
int32_t optr = 0;
memset(info, 0, sizeof(*info));
filterInitUnitsFields(info);
for (uint16_t i = 0; i < oinfo.groupNum; ++i) {
SFilterGroup* g = oinfo.groups + i;
SFilterGroup ng = {0};
uint16_t unitNum = (uctx && uctx[i]) ? uctx[i]->num : g->unitNum;
if (unitNum == 0) {
continue;
}
++gNum;
FILTER_SET_FLAG(info->options, FI_OPTION_NEED_UNIQE);
if ((uctx == NULL) || (uctx[i] == NULL)) {
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
taosArrayPush(group, &ng);
continue;
}
SFilterGroupCtx* ctx = uctx[i];
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
continue;
}
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
assert(ctx->col[cidx] > 0 || ctx->col[cidx] == -1);
if (ctx->col[cidx] != -1) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
}
if (ctx->colRange && taosArrayGetSize(ctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(ctx->colRange);
for (int32_t m = 0; m < size; ++m) {
SFilterColRange *cra = taosArrayGet(ctx->colRange, m);
filterAddGroupUnitFromRange(info, &oinfo, cra, &ng, TSDB_RELATION_AND, NULL);
}
}
taosArrayPush(group, &ng);
}
if (gctx->colRange && taosArrayGetSize(gctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(gctx->colRange);
for (int32_t i = 0; i < size; ++i) {
SFilterColRange *cra = taosArrayGet(gctx->colRange, i);
filterAddGroupUnitFromRange(info, &oinfo, cra, NULL, TSDB_RELATION_OR, group);
}
}
filterConvertGroupFromArray(info, group);
taosArrayDestroy(group);
return TSDB_CODE_SUCCESS;
}
int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx** uctx) {
if (gctx->num == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterInfo oinfo = *info;
uint16_t gNum = 0;
SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup));
filterInitUnitsFields(info);
memset(info, 0, sizeof(*info));
for (int32_t i = 0; i < gResNum; ++i) {
res = gRes[i];
filterInitUnitsFields(info);
optr = (res->colNum > 1) ? TSDB_RELATION_AND : TSDB_RELATION_OR;
for (uint16_t i = 0; i < oinfo.groupNum; ++i) {
SFilterGroup* g = oinfo.groups + i;
SFilterGroup ng = {0};
uint16_t unitNum = (uctx && uctx[i]) ? uctx[i]->num : g->unitNum;
if (unitNum == 0) {
continue;
}
for (uint16_t m = 0; m < res->colNum; ++m) {
colInfo = &res->colInfo[res->colIdx[m]];
if (FILTER_NO_MERGE_DATA_TYPE(colInfo->dataType)) {
assert(colInfo->type == RANGE_TYPE_UNIT);
int32_t usize = (int32_t)taosArrayGetSize((SArray *)colInfo->info);
++gNum;
for (int32_t n = 0; n < usize; ++n) {
SFilterUnit* u = taosArrayGetP((SArray *)colInfo->info, n);
if ((uctx == NULL) || (uctx[i] == NULL)) {
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
taosArrayPush(group, &ng);
continue;
}
SFilterGroupCtx* ctx = uctx[i];
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
continue;
}
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
assert(ctx->col[cidx] > 0 || ctx->col[cidx] == -1);
assert(colInfo->type == RANGE_TYPE_MR_CTX);
if (ctx->col[cidx] != -1) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
}
if (ctx->colRange && taosArrayGetSize(ctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(ctx->colRange);
for (int32_t m = 0; m < size; ++m) {
SFilterColRange *cra = taosArrayGet(ctx->colRange, m);
filterAddGroupUnitFromRange(info, &oinfo, cra, &ng, TSDB_RELATION_AND, NULL);
}
filterAddGroupUnitFromCtx(info, &oinfo, colInfo->info, res->colIdx[m], &ng, optr, group);
}
if (ng.unitNum > 0) {
taosArrayPush(group, &ng);
}
if (gctx->colRange && taosArrayGetSize(gctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(gctx->colRange);
for (int32_t i = 0; i < size; ++i) {
SFilterColRange *cra = taosArrayGet(gctx->colRange, i);
filterAddGroupUnitFromRange(info, &oinfo, cra, NULL, TSDB_RELATION_OR, group);
}
}
filterConvertGroupFromArray(info, group);
......@@ -1692,25 +1718,24 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx*
int32_t filterPreprocess(SFilterInfo *info) {
SFilterGroupCtx** gRes = calloc(info->groupNum, sizeof(SFilterGroupCtx *));
int32_t gResNum = 0;
SFilterGroupCtx groupRes = {0};
filterMergeBetweenUnits(info, gRes, &gResNum);
filterMergeGroupUnits(info, gRes, &gResNum);
filterMergeBetweenGroups(info, gRes, gResNum);
filterMergeGroups(info, gRes, &gResNum);
if (FILTER_GET_FLAG(info->flags, FILTER_ALL)) {
if (FILTER_GET_FLAG(info->status, FI_STATUS_ALL)) {
qInfo("Final - FilterInfo: [ALL]");
return TSDB_CODE_SUCCESS;
}
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
qInfo("Final - FilterInfo: [NONE]");
if (FILTER_GET_FLAG(info->status, FI_STATUS_EMPTY)) {
qInfo("Final - FilterInfo: [EMPTY]");
return TSDB_CODE_SUCCESS;
}
//TODO GET COLUMN RANGE
filterRewrite(info, &groupRes, unitsRes);
filterRewrite(info, gRes, gResNum);
return TSDB_CODE_SUCCESS;
}
......@@ -1736,7 +1761,7 @@ int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) {
bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
bool all = true;
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
if (FILTER_GET_FLAG(info->status, FI_STATUS_EMPTY)) {
return false;
}
......@@ -1807,7 +1832,7 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t option
info = *pinfo;
info->flags = options;
info->options = options;
SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup));
......@@ -1821,14 +1846,14 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t option
ERR_JRET(filterInitValFieldData(info));
if (!FILTER_GET_FLAG(info->flags, FILTER_NO_REWRITE)) {
if (!FILTER_GET_FLAG(info->options, FI_OPTION_NO_REWRITE)) {
filterDumpInfoToString(info, "Before preprocess");
ERR_JRET(filterPreprocess(info));
CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL));
CHK_JMP(FILTER_GET_FLAG(info->status, FI_STATUS_ALL));
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
if (FILTER_GET_FLAG(info->status, FI_STATUS_EMPTY)) {
taosArrayDestroy(group);
return code;
}
......@@ -1859,12 +1884,13 @@ _err_return:
int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
SFilterRange ra = {0};
SFilterRMCtx *prev = filterInitMergeRange(TSDB_DATA_TYPE_TIMESTAMP, MR_OPT_TS);
SFilterRMCtx *tmpc = filterInitMergeRange(TSDB_DATA_TYPE_TIMESTAMP, MR_OPT_TS);
SFilterRMCtx *prev = filterInitMergeRange(TSDB_DATA_TYPE_TIMESTAMP, FI_OPTION_TIMESTAMP);
SFilterRMCtx *tmpc = filterInitMergeRange(TSDB_DATA_TYPE_TIMESTAMP, FI_OPTION_TIMESTAMP);
SFilterRMCtx *cur = NULL;
int32_t num = 0;
int32_t optr = 0;
int32_t code = 0;
int32_t code = TSDB_CODE_QRY_INVALID_TIME_CONDITION;
bool empty = false;
for (int32_t i = 0; i < info->groupNum; ++i) {
SFilterGroup *group = &info->groups[i];
......@@ -1879,6 +1905,16 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
for (int32_t u = 0; u < group->unitNum; ++u) {
uint16_t uidx = group->unitIdxs[u];
SFilterUnit *unit = &info->units[uidx];
uint8_t raOptr = FILTER_UNIT_OPTR(unit);
filterAddMergeOptr(cur, raOptr, TSDB_RELATION_AND, &empty, NULL);
CHK_JMP(empty);
if (FILTER_NO_MERGE_OPTR(raOptr)) {
continue;
}
SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit);
void *s = FILTER_GET_VAL_FIELD_DATA(right);
void *e = FILTER_GET_VAL_FIELD_DATA(right) + tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes;
......@@ -1886,18 +1922,11 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
SIMPLE_COPY_VALUES(&ra.s, s);
SIMPLE_COPY_VALUES(&ra.e, e);
code = filterAddMergeRange(cur, &ra, optr);
if (code) {
break;
}
}
if (code != TSDB_CODE_SUCCESS) {
break;
filterAddMergeRange(cur, &ra, optr);
}
if (FILTER_GET_FLAG(cur->status, MR_ALL)) {
FILTER_SET_FLAG(prev->status, MR_ALL);
if (cur->notnull) {
prev->notnull = true;
break;
}
......@@ -1907,8 +1936,7 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
}
}
if (code == TSDB_CODE_SUCCESS) {
if (FILTER_GET_FLAG(prev->status, MR_ALL)) {
if (prev->notnull) {
*win = TSWINDOW_INITIALIZER;
} else {
filterGetMergeRangeNum(prev, &num);
......@@ -1922,13 +1950,22 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
win->skey = tra.s;
win->ekey = tra.e;
}
}
filterFreeMergeRange(prev);
filterFreeMergeRange(tmpc);
qDebug("qFilter time range:[%"PRId64 "]-[%"PRId64 "]", win->skey, win->ekey);
return TSDB_CODE_SUCCESS;
_err_return:
*win = TSWINDOW_DESC_INITIALIZER;
filterFreeMergeRange(prev);
filterFreeMergeRange(tmpc);
qDebug("qFilter time range:[%"PRId64 "]-[%"PRId64 "]", win->skey, win->ekey);
return code;
}
......@@ -2000,7 +2037,6 @@ int32_t filterFreeNcharColumns(SFilterInfo* info) {
}
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -78,7 +78,7 @@ void intDataTest() {
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, MR_OPT_TS);
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, FI_OPTION_TIMESTAMP);
for (int32_t i = 0; i < asize; ++i) {
ra[0].s = s[i];
ra[0].e = e[i];
......
......@@ -168,6 +168,7 @@ if $rows != 27 then
return -1
endi
#xxx
sql select * from stb1 where c8 = '51' and c8 != '51';
if $rows != 0 then
return -1
......@@ -1247,7 +1248,7 @@ if $data00 != @21-05-05 18:19:04.000@ then
return -1
endi
xxx sql select * from stb1 where (c1 > 60 and c2 > 40) or (c1 > 62 and c2 > 50);
sql select * from stb1 where (c1 > 60 and c2 > 40) or (c1 > 62 and c2 > 50);
if $rows != 4 then
return -1
endi
......@@ -1828,6 +1829,17 @@ sql_error select ts,c1,c2 from stb1 where (ts > '2021-05-05 18:19:20.000' or ts
sql_error select ts,c1,c2 from stb1 where ((ts >= '2021-05-05 18:19:05.000' and ts <= '2021-05-05 18:19:10.000') or (ts >= '2021-05-05 18:19:15.000' and ts <= '2021-05-05 18:19:20.000') or (ts >= '2021-05-05 18:19:11.000' and ts <= '2021-05-05 18:19:14.000'));
sql_error select ts,c1,c2 from stb1 where ts >= '2021-05-05 18:19:25.000' or ts < '2021-05-05 18:19:24.000';
sql_error select ts,c1,c2 from stb1 where ts >= '2021-05-05 18:19:25.000' and ts < '2021-05-05 18:19:10.000';
sql_error select * from stb1 where ts is null;
sql_error select * from stb1 where ts is not null and ts is null;
sql select * from stb1 where ts is not null;
if $rows != 29 then
return -1
endi
sql select * from stb1 where ts is not null or ts is null;
if $rows != 29 then
return -1
endi
sql select ts,c1,c2 from stb1 where ts >= '2021-05-05 18:19:25.000' or ts < '2021-05-05 18:19:25.000';
if $rows != 29 then
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册