提交 69f25dbf 编写于 作者: W wpan

support condition rewrite

上级 014740de
......@@ -5006,7 +5006,7 @@ static int32_t getQueryTimeRange(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr
goto _ret;
}
ret = filterInitFromTree(p, &filter);
ret = filterInitFromTree(p, &filter, FILTER_NO_REWRITE);
if (ret != TSDB_CODE_SUCCESS) {
goto _ret;
}
......
......@@ -81,7 +81,23 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le
*(str + bufSize + 1) = '"';
n = bufSize + 2;
break;
case TSDB_DATA_TYPE_UTINYINT:
n = sprintf(str, "%d", *(uint8_t*)buf);
break;
case TSDB_DATA_TYPE_USMALLINT:
n = sprintf(str, "%d", *(uint16_t*)buf);
break;
case TSDB_DATA_TYPE_UINT:
n = sprintf(str, "%u", *(uint32_t*)buf);
break;
case TSDB_DATA_TYPE_UBIGINT:
n = sprintf(str, "%" PRIu64, *(uint64_t*)buf);
break;
default:
tscError("unsupported type:%d", type);
return TSDB_CODE_TSC_INVALID_VALUE;
......
......@@ -22,19 +22,24 @@ extern "C" {
#include "texpr.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
#define FILTER_DEFAULT_FIELD_SIZE 4
#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2
enum {
F_FIELD_COLUMN = 0,
F_FIELD_VALUE,
F_FIELD_MAX
FLD_TYPE_COLUMN = 1,
FLD_TYPE_VALUE = 2,
FLD_TYPE_MAX = 3,
FLD_DESC_NO_FREE = 4,
FLD_DATA_NO_FREE = 8,
};
enum {
MR_ST_START = 1,
MR_ST_FIN = 2,
MR_ALL = 4,
MR_NONE = 8,
};
enum {
......@@ -46,17 +51,17 @@ enum {
RA_NULL = 2,
};
enum {
FILTER_ALL = 1,
FILTER_NONE = 2,
FILTER_NO_REWRITE = 4,
};
typedef struct OptrStr {
uint16_t optr;
char *str;
} OptrStr;
typedef struct SFilterColRange {
uint16_t idx; //column field idx
int64_t s;
int64_t e;
} SFilterColRange;
typedef struct SFilterRange {
char sflag;
char eflag;
......@@ -64,6 +69,13 @@ typedef struct SFilterRange {
int64_t e;
} SFilterRange;
typedef struct SFilterColRange {
uint16_t idx; //column field idx
bool isNull;
bool notNull;
SFilterRange ra;
} SFilterColRange;
typedef struct SFilterRangeNode {
struct SFilterRangeNode* prev;
......@@ -81,7 +93,7 @@ typedef struct SFilterRMCtx {
} SFilterRMCtx ;
typedef struct SFilterField {
uint16_t type;
uint16_t flag;
void* desc;
void* data;
int64_t range[];
......@@ -99,13 +111,21 @@ typedef struct SFilterFieldId {
} SFilterFieldId;
typedef struct SFilterGroup {
uint16_t unitSize;
uint16_t unitNum;
uint16_t *unitIdxs;
uint8_t *unitFlags; // !unit result
} SFilterGroup;
typedef struct SFilterGroupCtx {
uint16_t num;
int32_t *col;
SArray *colRange;
} SFilterGroupCtx;
typedef struct SFilterCompare {
__compar_fn_t pCompareFunc;
int32_t type;
uint8_t optr;
} SFilterCompare;
......@@ -116,10 +136,11 @@ typedef struct SFilterUnit {
} SFilterUnit;
typedef struct SFilterInfo {
uint32_t flags;
uint16_t unitSize;
uint16_t unitNum;
uint16_t groupNum;
SFilterFields fields[F_FIELD_MAX];
SFilterFields fields[FLD_TYPE_MAX];
SFilterGroup *groups;
SFilterUnit *units;
uint8_t *unitRes; // result
......@@ -133,16 +154,24 @@ typedef struct SFilterInfo {
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
#define MR_GET_FLAG(st, f) (st & f)
#define MR_SET_FLAG(st, f) st |= (f)
#define SET_OPTR(o) do {if (o == TSDB_RELATION_ISNULL) { isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { notnull = true; } } while (0)
#define CHK_OPTR() (isnull == true && notnull == true)
#define FILTER_GET_FLAG(st, f) (st & f)
#define FILTER_SET_FLAG(st, f) st |= (f)
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define RESET_RANGE(ctx, r) do { r->next = ctx->rf; ctx->rf = r; } while (0)
#define FREE_RANGE(ctx, r) do { if (r->prev) { r->prev->next = r->next; } else { ctx->rs = r->next;} if (r->next) { r->next->prev = r->prev; } RESET_RANGE(ctx, r); } while (0)
#define FREE_FROM_RANGE(ctx, r) do { if (r->prev) { r->prev->next = NULL; } else { ctx->rs = NULL;} while (r) {SFilterRangeNode *n = r->next; RESET_RANGE(ctx, r); r = n; } } while (0)
#define INSERT_RANGE(ctx, r, t, s, e) do { SFilterRangeNode *n = filterNewRange(ctx, t, s, e); n->prev = r->prev; if (r->prev) { r->prev->next = n; } else { ctx->rs = n; } r->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, t, s, e) do { SFilterRangeNode *n = filterNewRange(ctx, t, s, e); n->prev = r; if (r) { r->next = n; } else { ctx->rs = n; } } while (0)
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RA_EXCLUDE) || FILTER_GET_FLAG(eflag,RA_EXCLUDE))))
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
#define RESET_RANGE(ctx, r) do { (r)->next = (ctx)->rf; (ctx)->rf = r; } while (0)
#define FREE_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = (r)->next; } else { (ctx)->rs = (r)->next;} if ((r)->next) { (r)->next->prev = (r)->prev; } RESET_RANGE(ctx, r); } while (0)
#define FREE_FROM_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = NULL; } else { (ctx)->rs = NULL;} while (r) {SFilterRangeNode *n = (r)->next; RESET_RANGE(ctx, r); r = n; } } while (0)
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
......@@ -155,16 +184,21 @@ typedef struct SFilterInfo {
#define CHK_LRET(c, r,...) do { if (c) { qError(__VA_ARGS__); return r; } } while (0)
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((fi)->data)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units[(g)->unitIdxs[uid]])
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left)
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
#define FILTER_UNIT_DATA_TYPE(i, u) FILTER_GET_COL_FIELD_TYPE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_VAL(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
......@@ -177,12 +211,12 @@ typedef struct SFilterInfo {
typedef int32_t(*filter_desc_compare_func)(const void *, const void *);
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo);
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p);
extern int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data);
extern void* filterInitMergeRange(int32_t type, int32_t options);
extern int32_t filterGetMergeRangeNum(void* h, int32_t* num);
extern int32_t filterGetMergeRangeRes(void* h, void *s, void* e);
extern int32_t filterGetMergeRangeRes(void* h, SFilterRange *ra);
extern int32_t filterFreeMergeRange(void* h);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
#ifdef __cplusplus
......
......@@ -6966,7 +6966,7 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) {
return TSDB_CODE_QRY_APP_ERROR;
}
int32_t ret = filterInitFromTree(expr, pFilters);
int32_t ret = filterInitFromTree(expr, pFilters, 0);
tExprTreeDestroy(expr, NULL);
return ret;
......
......@@ -16,6 +16,8 @@
#include "queryLog.h"
#include "qFilter.h"
#include "tcompare.h"
#include "hash.h"
#include "tscUtil.h"
OptrStr gOptrStr[] = {
{TSDB_RELATION_INVALID, "invalid"},
......@@ -49,12 +51,28 @@ static FORCE_INLINE int32_t filterFieldValDescCompare(const void *desc1, const v
}
filter_desc_compare_func gDescCompare [F_FIELD_MAX] = {
filter_desc_compare_func gDescCompare [FLD_TYPE_MAX] = {
NULL,
filterFieldColDescCompare,
filterFieldValDescCompare
};
static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, int32_t t, void *s, void *e) {
int32_t filterInitUnitsFields(SFilterInfo *info) {
info->unitSize = FILTER_DEFAULT_UNIT_SIZE;
info->units = calloc(info->unitSize, sizeof(SFilterUnit));
info->fields[FLD_TYPE_COLUMN].num = 0;
info->fields[FLD_TYPE_COLUMN].size = FILTER_DEFAULT_FIELD_SIZE;
info->fields[FLD_TYPE_COLUMN].fields = calloc(info->fields[FLD_TYPE_COLUMN].size, COL_FIELD_SIZE);
info->fields[FLD_TYPE_VALUE].num = 0;
info->fields[FLD_TYPE_VALUE].size = FILTER_DEFAULT_FIELD_SIZE;
info->fields[FLD_TYPE_VALUE].fields = calloc(info->fields[FLD_TYPE_VALUE].size, sizeof(SFilterField));
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, SFilterRange* ra) {
SFilterRangeNode *r = NULL;
if (ctx->rf) {
......@@ -65,9 +83,8 @@ static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, int32_t
} else {
r = calloc(1, sizeof(SFilterRangeNode));
}
SIMPLE_COPY_VALUES((char*)&r->ra.s, s);
SIMPLE_COPY_VALUES((char*)&r->ra.e, e);
FILTER_COPY_RA(&r->ra, ra);
return r;
}
......@@ -119,14 +136,41 @@ int32_t filterReuseMergeRangeCtx(SFilterRMCtx *ctx, int32_t type, int32_t option
}
int32_t filterPostProcessRange(SFilterRMCtx *cur, SFilterRange *ra, bool *notNull) {
if (!FILTER_GET_FLAG(ra->sflag, RA_NULL)) {
int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type));
if (sr == 0) {
FILTER_SET_FLAG(ra->sflag, RA_NULL);
}
}
int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char eflag, int32_t optr) {
if (!FILTER_GET_FLAG(ra->eflag, RA_NULL)) {
int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type));
if (er == 0) {
FILTER_SET_FLAG(ra->eflag, RA_NULL);
}
}
if (FILTER_GET_FLAG(ra->sflag, RA_NULL) && FILTER_GET_FLAG(ra->eflag, RA_NULL)) {
*notNull = true;
} else {
*notNull = false;
}
return TSDB_CODE_SUCCESS;
}
int32_t filterAddMergeRangeImpl(void* h, SFilterRange* ra, int32_t optr) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
if (ctx->rs == NULL) {
if (MR_GET_FLAG(ctx->status, MR_ST_START) == 0 || optr == TSDB_RELATION_OR) {
APPEND_RANGE(ctx, ctx->rs, ctx->type, s, e);
MR_SET_FLAG(ctx->status, MR_ST_START);
if ((FILTER_GET_FLAG(ctx->status, MR_ST_START) == 0)
|| (FILTER_GET_FLAG(ctx->status, MR_ALL) && (optr == TSDB_RELATION_AND))
|| ((!FILTER_GET_FLAG(ctx->status, MR_ALL)) && (optr == TSDB_RELATION_OR))) {
APPEND_RANGE(ctx, ctx->rs, ra);
FILTER_SET_FLAG(ctx->status, MR_ST_START);
}
return TSDB_CODE_SUCCESS;
......@@ -134,27 +178,34 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla
SFilterRangeNode *r = ctx->rs;
SFilterRangeNode *rn = NULL;
int32_t cr = 0;
if (optr == TSDB_RELATION_AND) {
while (r != NULL) {
if (ctx->pCompareFunc(&r->ra.s, e) > 0) {
cr = ctx->pCompareFunc(&r->ra.s, &ra->e);
if (FILTER_GREATER(cr, r->ra.sflag, ra->eflag)) {
FREE_FROM_RANGE(ctx, r);
break;
}
if (ctx->pCompareFunc(s, &r->ra.e) > 0) {
cr = ctx->pCompareFunc(&ra->s, &r->ra.e);
if (FILTER_GREATER(cr, ra->sflag, r->ra.eflag)) {
rn = r->next;
FREE_RANGE(ctx, r);
r = rn;
continue;
}
if (ctx->pCompareFunc(s, &r->ra.s) > 0) {
SIMPLE_COPY_VALUES((char *)&r->ra.s, s);
cr = ctx->pCompareFunc(&ra->s, &r->ra.s);
if (FILTER_GREATER(cr, ra->sflag, r->ra.sflag)) {
SIMPLE_COPY_VALUES((char *)&r->ra.s, &ra->s);
cr == 0 ? (r->ra.sflag = 0) : (r->ra.sflag = ra->sflag);
}
if (ctx->pCompareFunc(&r->ra.e, e) > 0) {
SIMPLE_COPY_VALUES((char *)&r->ra.e, e);
cr = ctx->pCompareFunc(&r->ra.e, &ra->e);
if (FILTER_GREATER(cr, r->ra.eflag, ra->eflag)) {
SIMPLE_COPY_VALUES((char *)&r->ra.e, &ra->e);
cr == 0 ? (r->ra.eflag = 0) : (r->ra.eflag = ra->eflag);
break;
}
......@@ -166,41 +217,52 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla
//TSDB_RELATION_OR
bool smerged = false;
bool emerged = false;
while (r != NULL) {
if (ctx->pCompareFunc(&r->ra.s, e) > 0) {
cr = ctx->pCompareFunc(&r->ra.s, &ra->e);
if (FILTER_GREATER(cr, r->ra.sflag, ra->eflag)) {
if (emerged == false) {
INSERT_RANGE(ctx, r, ctx->type, s, e);
INSERT_RANGE(ctx, r, ra);
}
break;
}
if (ctx->pCompareFunc(s, &r->ra.e) > 0) {
if (r->next) {
r= r->next;
continue;
}
if (smerged == false) {
cr = ctx->pCompareFunc(&ra->s, &r->ra.e);
if (FILTER_GREATER(cr, ra->sflag, r->ra.eflag)) {
if (r->next) {
r= r->next;
continue;
}
APPEND_RANGE(ctx, r, ctx->type, s, e);
break;
}
APPEND_RANGE(ctx, r, ra);
break;
}
if (smerged == false) {
if (ctx->pCompareFunc(&r->ra.s, s) > 0) {
SIMPLE_COPY_VALUES((char *)&r->ra.s, s);
cr = ctx->pCompareFunc(&r->ra.s, &ra->s);
if (FILTER_GREATER(cr, r->ra.sflag, ra->sflag)) {
SIMPLE_COPY_VALUES((char *)&r->ra.s, &ra->s);
cr == 0 ? (r->ra.sflag &= ra->sflag) : (r->ra.sflag = ra->sflag);
}
smerged = true;
}
if (emerged == false) {
if (ctx->pCompareFunc(e, &r->ra.e) > 0) {
SIMPLE_COPY_VALUES((char *)&r->ra.e, e);
cr = ctx->pCompareFunc(&ra->e, &r->ra.e);
if (FILTER_GREATER(cr, ra->eflag, r->ra.eflag)) {
SIMPLE_COPY_VALUES((char *)&r->ra.e, &ra->e);
if (cr == 0) {
r->ra.eflag &= ra->eflag;
break;
}
r->ra.eflag = ra->eflag;
emerged = true;
e = &r->ra.e;
r = r->next;
continue;
}
......@@ -208,43 +270,48 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla
break;
}
if (ctx->pCompareFunc(e, &r->ra.e) > 0) {
cr = ctx->pCompareFunc(&ra->e, &r->ra.e);
if (FILTER_GREATER(cr, ra->eflag, r->ra.eflag)) {
rn = r->next;
FREE_RANGE(ctx, r);
r = rn;
continue;
} else {
SIMPLE_COPY_VALUES(e, (char *)&r->ra.e);
SIMPLE_COPY_VALUES(&r->prev->ra.e, (char *)&r->ra.e);
cr == 0 ? (r->prev->ra.eflag &= r->ra.eflag) : (r->prev->ra.eflag = r->ra.eflag);
FREE_RANGE(ctx, r);
break;
}
}
if (ctx->rs && ctx->rs->next == NULL) {
bool notnull;
filterPostProcessRange(ctx, &ctx->rs->ra, &notnull);
if (notnull) {
FREE_FROM_RANGE(ctx, ctx->rs);
FILTER_SET_FLAG(ctx->status, MR_ALL);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t filterAddMergeRange(void* h, SFilterRange* ra, int32_t optr) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
int64_t sv, ev;
void *s, *e;
if (MR_GET_FLAG(ra->sflag, RA_NULL)) {
SIMPLE_COPY_VALUES(&sv, getDataMin(ctx->type));
s = &sv;
} else {
s = &ra->s;
if (FILTER_GET_FLAG(ra->sflag, RA_NULL)) {
SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type));
//FILTER_CLR_FLAG(ra->sflag, RA_NULL);
}
if (MR_GET_FLAG(ra->eflag, RA_NULL)) {
SIMPLE_COPY_VALUES(&ev, getDataMax(ctx->type));
e = &ev;
} else {
e = &ra->e;
if (FILTER_GET_FLAG(ra->eflag, RA_NULL)) {
SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type));
//FILTER_CLR_FLAG(ra->eflag, RA_NULL);
}
return filterAddMergeRangeImpl(h, s, e, ra->sflag, ra->eflag, optr);
return filterAddMergeRangeImpl(h, ra, optr);
}
int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) {
......@@ -269,11 +336,11 @@ int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) {
int32_t filterFinMergeRange(void* h) {
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
if (MR_GET_FLAG(ctx->status, MR_ST_FIN)) {
if (FILTER_GET_FLAG(ctx->status, MR_ST_FIN)) {
return TSDB_CODE_SUCCESS;
}
if (MR_GET_FLAG(ctx->options, MR_OPT_TS)) {
if (FILTER_GET_FLAG(ctx->options, MR_OPT_TS)) {
SFilterRangeNode *r = ctx->rs;
SFilterRangeNode *rn = NULL;
......@@ -293,7 +360,7 @@ int32_t filterFinMergeRange(void* h) {
}
}
MR_SET_FLAG(ctx->status, MR_ST_FIN);
FILTER_SET_FLAG(ctx->status, MR_ST_FIN);
return TSDB_CODE_SUCCESS;
}
......@@ -316,7 +383,7 @@ int32_t filterGetMergeRangeNum(void* h, int32_t* num) {
}
int32_t filterGetMergeRangeRes(void* h, void *s, void* e) {
int32_t filterGetMergeRangeRes(void* h, SFilterRange *ra) {
filterFinMergeRange(h);
SFilterRMCtx *ctx = (SFilterRMCtx *)h;
......@@ -324,11 +391,11 @@ int32_t filterGetMergeRangeRes(void* h, void *s, void* e) {
SFilterRangeNode* r = ctx->rs;
while (r) {
assignVal(s + num * tDataTypes[ctx->type].bytes, (char *)&r->ra.s, 0, ctx->type);
assignVal(e + num * tDataTypes[ctx->type].bytes, (char *)&r->ra.e, 0, ctx->type);
FILTER_COPY_RA(ra, &r->ra);
++num;
r = r->next;
++ra;
}
if (num == 0) {
......@@ -409,29 +476,14 @@ int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) {
return -1;
}
int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) {
CHK_LRET(node == NULL, TSDB_CODE_QRY_APP_ERROR, "empty node");
CHK_LRET(node->nodeType != TSQL_NODE_COL && node->nodeType != TSQL_NODE_VALUE, TSDB_CODE_QRY_APP_ERROR, "invalid nodeType:%d", node->nodeType);
int32_t type, idx = -1;
int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid) {
int32_t idx = -1;
uint16_t *num;
void *v;
if (node->nodeType == TSQL_NODE_COL) {
type = F_FIELD_COLUMN;
v = node->pSchema;
node->pSchema = NULL;
} else {
type = F_FIELD_VALUE;
v = node->pVal;
node->pVal = NULL;
}
num = &info->fields[type].num;
if (*num > 0 && type != F_FIELD_VALUE) {
idx = filterGetFiled(&info->fields[type], type, v);
if (*num > 0 && type != FLD_TYPE_VALUE) {
idx = filterGetFiled(&info->fields[type], type, desc);
}
if (idx < 0) {
......@@ -441,8 +493,9 @@ int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid)
info->fields[type].fields = realloc(info->fields[type].fields, info->fields[type].size * sizeof(SFilterField));
}
info->fields[type].fields[idx].type = type;
info->fields[type].fields[idx].desc = v;
info->fields[type].fields[idx].flag = type;
info->fields[type].fields[idx].desc = desc;
info->fields[type].fields[idx].data = data;
++(*num);
}
......@@ -452,29 +505,186 @@ int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid)
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t filterAddFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) {
filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid);
FILTER_SET_FLAG(field->flag, FLD_DESC_NO_FREE);
FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE);
return TSDB_CODE_SUCCESS;
}
int32_t filterAddFieldFromNode(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) {
CHK_LRET(node == NULL, TSDB_CODE_QRY_APP_ERROR, "empty node");
CHK_LRET(node->nodeType != TSQL_NODE_COL && node->nodeType != TSQL_NODE_VALUE, TSDB_CODE_QRY_APP_ERROR, "invalid nodeType:%d", node->nodeType);
int32_t type;
void *v;
if (node->nodeType == TSQL_NODE_COL) {
type = FLD_TYPE_COLUMN;
v = node->pSchema;
node->pSchema = NULL;
} else {
type = FLD_TYPE_VALUE;
v = node->pVal;
node->pVal = NULL;
}
filterAddField(info, v, NULL, type, fid);
return TSDB_CODE_SUCCESS;
}
int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right) {
if (info->unitNum >= info->unitSize) {
uint16_t psize = info->unitSize;
info->unitSize += FILTER_DEFAULT_UNIT_SIZE;
info->units = realloc(info->units, info->unitSize * sizeof(SFilterUnit));
memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE);
}
SFilterUnit *u = &info->units[info->unitNum];
info->units[info->unitNum].compare.optr = optr;
info->units[info->unitNum].left = *left;
info->units[info->unitNum].right = *right;
u->compare.optr = optr;
u->left = *left;
if (right) {
u->right = *right;
}
if (u->right.type == FLD_TYPE_VALUE) {
SFilterField *val = FILTER_UNIT_RIGHT_FIELD(info, u);
assert(FILTER_GET_FLAG(val->flag, FLD_TYPE_VALUE));
} else {
assert(optr == TSDB_RELATION_ISNULL || optr == TSDB_RELATION_NOTNULL);
}
SFilterField *col = FILTER_UNIT_LEFT_FIELD(info, u);
assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN));
info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col);
++info->unitNum;
return TSDB_CODE_SUCCESS;
}
int32_t filterAddGroup(SFilterGroup *group, uint16_t unitIdx) {
group->unitNum = 1;
group->unitIdxs= calloc(group->unitNum, sizeof(*group->unitIdxs));
group->unitIdxs[0] = unitIdx;
int32_t filterAddUnitToGroup(SFilterGroup *group, uint16_t unitIdx) {
if (group->unitNum >= group->unitSize) {
group->unitSize += FILTER_DEFAULT_UNIT_SIZE;
group->unitIdxs = realloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs));
}
group->unitIdxs[group->unitNum++] = unitIdx;
return TSDB_CODE_SUCCESS;
}
int32_t filterAddUnitFromNode(SFilterInfo *info, tExprNode* tree) {
SFilterFieldId left = {0}, right = {0};
filterAddFieldFromNode(info, tree->_node.pLeft, &left);
filterAddFieldFromNode(info, tree->_node.pRight, &right);
filterAddUnit(info, tree->_node.optr, &left, &right);
return TSDB_CODE_SUCCESS;
}
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u) {
SFilterFieldId left, right;
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left);
filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right);
SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DESC_NO_FREE);
t = FILTER_UNIT_RIGHT_FIELD(src, u);
FILTER_SET_FLAG(t->flag, FLD_DATA_NO_FREE);
return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, &right);
}
int32_t filterAddGroupUnitFromRange(SFilterInfo *dst, SFilterInfo *src, SFilterColRange *cra, SFilterGroup *g, int32_t optr, SArray *res) {
SFilterFieldId left, right;
SFilterField *col = FILTER_GET_COL_FIELD(src, cra->idx);
filterAddFieldFromField(dst, col, &left);
if (optr == TSDB_RELATION_AND) {
if (cra->isNull) {
assert(cra->notNull == false);
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
return filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (cra->notNull) {
assert(cra->isNull == false);
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
return filterAddUnitToGroup(g, dst->unitNum - 1);
}
assert(!((FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) && (FILTER_GET_FLAG(cra->ra.eflag, RA_NULL))));
if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.s, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.e, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
} else {
SFilterGroup ng = {0};
g = &ng;
if (cra->isNull) {
filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
taosArrayPush(res, g);
}
if (cra->notNull) {
memset(g, 0, sizeof(*g));
filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL);
filterAddUnitToGroup(g, dst->unitNum - 1);
taosArrayPush(res, g);
}
memset(g, 0, sizeof(*g));
if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.s, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) {
filterAddField(dst, NULL, &cra->ra.e, FLD_TYPE_VALUE, &right);
filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right);
filterAddUnitToGroup(g, dst->unitNum - 1);
}
if (g->unitNum > 0) {
taosArrayPush(res, g);
}
}
return TSDB_CODE_SUCCESS;
}
static void filterFreeGroup(void *pItem) {
SFilterGroup* p = (SFilterGroup*) pItem;
if (p) {
......@@ -515,14 +725,10 @@ int32_t filterTreeToGroup(tExprNode* tree, SFilterInfo *info, SArray* group) {
return TSDB_CODE_SUCCESS;
}
SFilterFieldId left, right;
filterAddField(info, tree->_node.pLeft, &left);
filterAddField(info, tree->_node.pRight, &right);
filterAddUnit(info, tree->_node.optr, &left, &right);
filterAddUnitFromNode(info, tree);
SFilterGroup fgroup = {0};
filterAddGroup(&fgroup, info->unitNum - 1);
filterAddUnitToGroup(&fgroup, info->unitNum - 1);
taosArrayPush(group, &fgroup);
......@@ -537,9 +743,7 @@ _err_return:
int32_t filterInitUnitFunc(SFilterInfo *info) {
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
unit->compare.pCompareFunc = getComparFunc(FILTER_GET_COL_FIELD_TYPE(left), unit->compare.optr);
unit->compare.pCompareFunc = getComparFunc(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr);
}
return TSDB_CODE_SUCCESS;
......@@ -551,29 +755,44 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg) {
CHK_LRETV(info == NULL, "%s - FilterInfo: empty", msg);
qDebug("%s - FilterInfo:", msg);
qDebug("COLUMN Field Num:%u", info->fields[F_FIELD_COLUMN].num);
for (uint16_t i = 0; i < info->fields[F_FIELD_COLUMN].num; ++i) {
SFilterField *field = &info->fields[F_FIELD_COLUMN].fields[i];
qDebug("COLUMN Field Num:%u", info->fields[FLD_TYPE_COLUMN].num);
for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema *sch = field->desc;
qDebug("COL%d => [%d][%s]", i, sch->colId, sch->name);
}
qDebug("VALUE Field Num:%u", info->fields[F_FIELD_VALUE].num);
for (uint16_t i = 0; i < info->fields[F_FIELD_VALUE].num; ++i) {
SFilterField *field = &info->fields[F_FIELD_VALUE].fields[i];
tVariant *var = field->desc;
qDebug("VAL%d => [type:%d][val:%" PRIu64"]", i, var->nType, var->u64); //TODO
qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num);
for (uint16_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
if (field->desc) {
tVariant *var = field->desc;
qDebug("VAL%d => [type:%d][val:%" PRIi64"]", i, var->nType, var->i64); //TODO
} else {
qDebug("VAL%d => [type:NIL][val:%" PRIi64" or %f]", i, *(int64_t *)field->data, *(double *)field->data); //TODO
}
}
qDebug("Unit Num:%u", info->unitNum);
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit *unit = &info->units[i];
int32_t type = FILTER_UNIT_DATA_TYPE(unit);
int32_t len = 0;
char str[128];
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit);
SSchema *sch = left->desc;
tVariant *var = right->desc;
qDebug("UNIT%d => [%d][%s] %s %" PRId64, i, sch->colId, sch->name, gOptrStr[unit->compare.optr].str, IS_NUMERIC_TYPE(var->nType) ? var->i64 : -1); //TODO
len = sprintf(str, "UNIT[%d] => [%d][%s] %s [", i, sch->colId, sch->name, gOptrStr[unit->compare.optr].str);
if (unit->right.type== FLD_TYPE_VALUE) {
SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit);
converToStr(str + len, type, right->data, 0, &len);
} else {
strcat(str, "NULL");
}
strcat(str, "]");
qDebug("%s", str); //TODO
}
qDebug("Group Num:%u", info->groupNum);
......@@ -598,25 +817,17 @@ void filterFreeInfo(SFilterInfo *info) {
int32_t filterInitValFieldData(SFilterInfo *info) {
for (uint16_t i = 0; i < info->unitNum; ++i) {
SFilterUnit* unit = &info->units[i];
SFilterField* left = FILTER_UNIT_LEFT_FIELD(info, unit);
SFilterField* right = FILTER_UNIT_RIGHT_FIELD(info, unit);
if (left->type != F_FIELD_VALUE && right->type != F_FIELD_VALUE) {
if (unit->right.type != FLD_TYPE_VALUE) {
assert(unit->compare.optr == TSDB_RELATION_ISNULL || unit->compare.optr == TSDB_RELATION_NOTNULL);
continue;
}
SFilterField* right = FILTER_UNIT_RIGHT_FIELD(info, unit);
uint32_t type = 0;
SFilterField* fi = NULL;
if (left->type == F_FIELD_COLUMN) {
type = FILTER_GET_COL_FIELD_TYPE(left);
fi = right;
} else if (right->type == F_FIELD_COLUMN) {
type = FILTER_GET_COL_FIELD_TYPE(right);
fi = left;
} else {
type = FILTER_GET_VAL_FIELD_TYPE(left);
fi = right;
}
assert(FILTER_GET_FLAG(right->flag, FLD_TYPE_VALUE));
uint32_t type = FILTER_UNIT_DATA_TYPE(unit);
SFilterField* fi = right;
tVariant* var = fi->desc;
......@@ -632,7 +843,7 @@ int32_t filterInitValFieldData(SFilterInfo *info) {
} else if (type == TSDB_DATA_TYPE_NCHAR) {
fi->data = calloc(1, (var->nLen + 1) * TSDB_NCHAR_SIZE);
} else {
if (var->nType == TSDB_DATA_TYPE_VALUE_ARRAY) {
if (var->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE
fi->data = calloc(var->nLen, tDataTypes[type].bytes);
for (int32_t a = 0; a < var->nLen; ++a) {
int64_t *v = taosArrayGet(var->arr, a);
......@@ -690,38 +901,73 @@ bool filterDoCompare(SFilterUnit *unit, void *left, void *right) {
}
#if 0
int32_t filterAddUnitRange(SFilterInfo *info, SFilterUnit* u, SFilterRMCtx *ctx, int32_t optr) {
int32_t type = FILTER_UNIT_DATA_TYPE(info, u);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
uint8_t uoptr = FILTER_UNIT_OPTR(u);
void *val = FILTER_UNIT_VAL(info, u);
int64_t s = 0, e = 0;
void *val = FILTER_UNIT_VAL_DATA(info, u);
SFilterRange ra = {0};
int64_t tmp = 0;
switch (uoptr) {
case TSDB_RELATION_GREATER:
SIMPLE_COPY_VALUES(&ra.s, val);
FILTER_SET_FLAG(ra.sflag, RA_EXCLUDE);
FILTER_SET_FLAG(ra.eflag, RA_NULL);
break;
case TSDB_RELATION_GREATER_EQUAL:
SIMPLE_COPY_VALUES(&ra.s, val);
FILTER_SET_FLAG(ra.eflag, RA_NULL);
break;
case TSDB_RELATION_LESS:
SIMPLE_COPY_VALUES(&ra.e, val);
FILTER_SET_FLAG(ra.eflag, RA_EXCLUDE);
FILTER_SET_FLAG(ra.sflag, RA_NULL);
break;
case TSDB_RELATION_LESS_EQUAL:
SIMPLE_COPY_VALUES(&ra.e, val);
FILTER_SET_FLAG(ra.sflag, RA_NULL);
break;
case TSDB_RELATION_NOT_EQUAL:
assert(type == TSDB_DATA_TYPE_BOOL);
if (GET_INT8_VAL(val)) {
SIMPLE_COPY_VALUES(&ra.s, &tmp);
SIMPLE_COPY_VALUES(&ra.e, &tmp);
} else {
*(bool *)&tmp = true;
SIMPLE_COPY_VALUES(&ra.s, &tmp);
SIMPLE_COPY_VALUES(&ra.e, &tmp);
}
break;
case TSDB_RELATION_EQUAL:
case TSDB_RELATION_IN:
SIMPLE_COPY_VALUES(&ra.s, val);
SIMPLE_COPY_VALUES(&ra.e, val);
break;
case TSDB_RELATION_IN: {
void *p = taosHashIterate((SHashObj *)val, NULL);
while(p) {
void *key = taosHashGetDataKey((SHashObj *)val, p);
SIMPLE_COPY_VALUES(&ra.s, key);
SIMPLE_COPY_VALUES(&ra.e, key);
filterAddMergeRange(ctx, &ra, optr);
p = taosHashIterate((SHashObj *)val, p);
}
return TSDB_CODE_SUCCESS;
}
default:
assert(0);
}
filterAddMergeRange(ctx, &s, &e, optr);
filterAddMergeRange(ctx, &ra, optr);
return TSDB_CODE_SUCCESS;
}
int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t id1, uint16_t id2, SArray* res) {
int32_t filterProcessUnitsInOneGroup(SFilterInfo *info, SFilterGroup* g, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum) {
bool isnull = false, notnull = false;
int32_t num = 0;
SFilterRMCtx *cur = filterInitMergeRange(type, 0);
SFilterUnit* u1 = FILTER_GROUP_UNIT(info, g, id1);
SFilterUnit* u2 = FILTER_GROUP_UNIT(info, g, id2);
......@@ -729,10 +975,8 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t
uint8_t optr2 = FILTER_UNIT_OPTR(u2);
uint16_t cidx = FILTER_UNIT_COL_IDX(u1);
int32_t type = FILTER_UNIT_DATA_TYPE(info, u1);
#define SET_OPTR(o) ((o == TSDB_RELATION_ISNULL) ? isnull = true : notnull = true)
#define CHK_OPTR() (isnull == true && notnull == true)
int32_t type = FILTER_UNIT_DATA_TYPE(u1);
SFilterRMCtx *cur = filterInitMergeRange(type, 0);
SET_OPTR(optr1);
SET_OPTR(optr2);
......@@ -755,25 +999,37 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t
continue;
}
++(*removedNum);
optr2 = FILTER_UNIT_OPTR(u);
SET_OPTR(optr2);
CHK_JMP(CHK_OPTR());
if (!FILTER_NO_MERGE_OPTR(optr2)) {
filterAddUnitRange(info, u2, cur, TSDB_RELATION_AND);
filterAddUnitRange(info, u, cur, TSDB_RELATION_AND);
CHK_JMP(MR_EMPTY_RES(cur));
}
}
SFilterColRange ra;
ra.idx = cidx;
SFilterColRange cra = {0};
cra.idx = cidx;
filterGetMergeRangeNum(cur, &num);
assert(num == 1);
filterGetMergeRangeRes(cur, &ra.s, &ra.e);
assert(num == 1 || num == 0);
if (num == 1) {
filterGetMergeRangeRes(cur, &cra.ra);
filterPostProcessRange(cur, &cra.ra, &cra.notNull);
} else {
if (isnull) {
cra.isNull = true;
}
if (notnull) {
cra.notNull = true;
}
}
taosArrayPush(res, &ra);
taosArrayPush(res, &cra);
filterFreeMergeRange(cur);
......@@ -782,6 +1038,8 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t
_err_return:
g->unitNum = 0;
*removedNum = g->unitNum;
filterFreeMergeRange(cur);
......@@ -789,39 +1047,50 @@ _err_return:
}
int32_t filterMergeGroupUnits(SFilterInfo *info, SArray** res) {
uint16_t *f = malloc(1, info->fields[F_FIELD_COLUMN].num * sizeof(uint16_t));
SArray *gres = NULL;
int32_t filterProcessUnits(SFilterInfo *info, SFilterGroupCtx*** res) {
int32_t *col = NULL;
SFilterGroupCtx *gctx = NULL;
SArray *colRange = NULL;
bool gresUsed = false;
uint16_t removedNum = 0;
for (uint16_t i = 0; i < info->groupNum; ++i) {
SFilterGroup* g = info->groups + i;
memet(f, -1, info->fields[F_FIELD_COLUMN].num);
if (col == NULL) {
col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t));
} else {
memset(col, 0, info->fields[FLD_TYPE_COLUMN].num * sizeof(int32_t));
}
gresUsed = false;
removedNum = 0;
for (uint16_t j = 0; j < g->unitNum; ++j) {
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, j);
int32_t type = FILTER_UNIT_DATA_TYPE(info, u);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
continue;
}
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
if (f[cidx] == -1) {
f[u->left.idx] = j;
} else if (cidx] == -2) {
if (col[cidx] == 0) {
col[cidx] = j + 1;
} else if (col[cidx] == -1) {
continue;
} else {
f[cidx] = -2;
if (gres == NULL) {
gres = taosArrayInit(4, sizeof(SFilterColRange));
if (colRange == NULL) {
colRange = taosArrayInit(4, sizeof(SFilterColRange));
}
filterMergeSingleGroupUnits(info, g, f[cidx], j, gres);
removedNum += 2;
filterProcessUnitsInOneGroup(info, g, col[cidx] - 1, j, colRange, &removedNum);
col[cidx] = -1;
if (g->unitNum == 0) {
break;
} else {
......@@ -832,44 +1101,409 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SArray** res) {
if (g->unitNum == 0) {
if (gresUsed) {
taosArrayClear(gres);
taosArrayClear(colRange);
}
continue;
}
if (gresUsed) {
gctx = malloc(sizeof(*gctx));
gctx->num = g->unitNum - removedNum + 1;
gctx->col = col;
gctx->colRange = colRange;
col = NULL;
colRange = NULL;
if (*res == NULL) {
*res = calloc(info->groupNum, sizeof(SFilterGroupCtx *));
}
(*res)[i] = gctx;
}
}
tfree(col);
if (colRange) {
taosArrayDestroy(colRange);
}
return TSDB_CODE_SUCCESS;
}
int32_t filterProcessGroupsSameColumn(SFilterInfo *info, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum, SFilterGroupCtx** unitRes) {
bool isnull = false, notnull = false;
int32_t num = 0;
SFilterColRange *cra = NULL;
SFilterGroup *g = NULL;
SFilterUnit* u1 = FILTER_GROUP_UNIT(info, info->groups + id1, 0);
uint8_t optr = FILTER_UNIT_OPTR(u1);
uint16_t cidx = FILTER_UNIT_COL_IDX(u1);
uint16_t cidx2 = 0;
int32_t type = FILTER_UNIT_DATA_TYPE(u1);
SFilterRMCtx *cur = filterInitMergeRange(type, 0);
for (int32_t i = 0; i < info->groupNum; ++i) {
g = info->groups + i;
uint16_t unitNum = (unitRes && unitRes[i]) ? unitRes[i]->num : g->unitNum;
if (unitNum == 0) {
continue;
}
if (unitNum > 1) {
continue;
}
if (unitRes && unitRes[i]) {
assert(taosArrayGetSize(unitRes[i]->colRange) == 1);
if (notnull) {
continue;
}
cra = taosArrayGet(unitRes[i]->colRange, 0);
if (cra->notNull) {
notnull = true;
CHK_JMP(CHK_OPTR());
}
cidx2 = cra->idx;
} else {
u1 = FILTER_GROUP_UNIT(info, g, 0);
int32_t type = FILTER_UNIT_DATA_TYPE(u1);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
continue;
}
optr = FILTER_UNIT_OPTR(u1);
cidx2 = FILTER_UNIT_COL_IDX(u1);
}
if (cidx != cidx2) {
continue;
}
g->unitNum = 0;
if (unitRes && unitRes[i]) {
unitRes[i]->num = 0;
if (notnull) {
continue;
}
filterAddMergeRange(cur, &cra->ra, TSDB_RELATION_OR);
if (MR_EMPTY_RES(cur)) {
notnull = true;
CHK_JMP(CHK_OPTR());
}
continue;
}
if (res == NULL) {
res = calloc(info->groupNum, sizeof(SArray *));
res[i] = gres;
gres = NULL;
optr = FILTER_UNIT_OPTR(u1);
SET_OPTR(optr);
CHK_JMP(CHK_OPTR());
if (!FILTER_NO_MERGE_OPTR(optr) && notnull == false) {
filterAddUnitRange(info, u1, cur, TSDB_RELATION_OR);
if (MR_EMPTY_RES(cur)) {
notnull = true;
CHK_JMP(CHK_OPTR());
}
}
}
free(f);
if (gres) {
taosArrayDestroy(gres);
SFilterColRange ncra;
SFilterRange *ra;
ncra.idx = cidx;
ncra.isNull = isnull;
ncra.notNull = notnull;
if (isnull) {
--removedNum;
}
if (!notnull) {
filterGetMergeRangeNum(cur, &num);
if (num > 1) {
ra = calloc(num, sizeof(*ra));
} else {
assert(num == 1);
ra = &ncra.ra;
}
filterGetMergeRangeRes(cur, ra);
SFilterRange *tra = ra;
for (int32_t i = 0; i < num; ++i) {
filterPostProcessRange(cur, tra, &ncra.notNull);
assert(ncra.notNull == false);
++tra;
}
if (num > 1) {
for (int32_t i = 0; i < num; ++i) {
FILTER_COPY_RA(&ncra.ra, ra);
taosArrayPush(res, &ncra);
++ra;
}
} else {
FILTER_COPY_RA(&ncra.ra, ra);
taosArrayPush(res, &ncra);
}
} else {
taosArrayPush(res, &ncra);
}
filterFreeMergeRange(cur);
return TSDB_CODE_SUCCESS;
_err_return:
FILTER_SET_FLAG(info->flags, FILTER_ALL);
filterFreeMergeRange(cur);
return TSDB_CODE_SUCCESS;
}
int32_t filterProcessGroups(SFilterInfo *info, SFilterGroupCtx** unitRes, SFilterGroupCtx* groupRes) {
int32_t *col = NULL;
uint16_t cidx;
uint16_t removedNum = 0;
SArray *colRange = NULL;
for (uint16_t i = 0; i < info->groupNum; ++i) {
SFilterGroup* g = info->groups + i;
uint16_t unitNum = (unitRes && unitRes[i]) ? unitRes[i]->num : g->unitNum;
if (unitNum == 0) {
++removedNum;
continue;
}
if (unitNum > 1) {
continue;
}
if (unitRes && unitRes[i]) {
assert(taosArrayGetSize(unitRes[i]->colRange) == 1);
SFilterColRange *ra = taosArrayGet(unitRes[i]->colRange, 0);
cidx = ra->idx;
} else {
SFilterUnit* u = FILTER_GROUP_UNIT(info, g, 0);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
continue;
}
cidx = FILTER_UNIT_COL_IDX(u);
}
if (col == NULL) {
col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t));
}
if (col[cidx] == 0) {
col[cidx] = i + 1;
continue;
} else if (col[cidx] == -1) {
continue;
} else {
if (colRange == NULL) {
colRange = taosArrayInit(4, sizeof(SFilterColRange));
}
removedNum += 2;
filterProcessGroupsSameColumn(info, col[cidx] - 1, i, colRange, &removedNum, unitRes);
CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL));
col[cidx] = -1;
}
}
uint16_t num = info->groupNum - removedNum;
assert(num >= 0);
if (colRange) {
num += taosArrayGetSize(colRange);
}
if (num == 0) {
FILTER_SET_FLAG(info->flags, FILTER_NONE);
goto _err_return;
}
if (colRange || removedNum > 0) {
groupRes->num = num;
groupRes->col = col;
groupRes->colRange = colRange;
} else {
tfree(col);
}
return TSDB_CODE_SUCCESS;
_err_return:
tfree(col);
if (colRange) {
taosArrayDestroy(colRange);
}
return TSDB_CODE_SUCCESS;
}
int32_t filterGenerateGroupFromArray(SFilterInfo *info, SArray* group) {
size_t groupSize = taosArrayGetSize(group);
info->groupNum = (uint16_t)groupSize;
if (info->groupNum > 0) {
info->groups = calloc(info->groupNum, sizeof(*info->groups));
}
for (size_t i = 0; i < groupSize; ++i) {
SFilterGroup *pg = taosArrayGet(group, i);
pg->unitFlags = calloc(pg->unitNum, sizeof(*pg->unitFlags));
info->groups[i] = *pg;
}
return TSDB_CODE_SUCCESS;
}
int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx** uctx) {
if (gctx->num == 0) {
return TSDB_CODE_SUCCESS;
}
SFilterInfo oinfo = *info;
uint16_t gNum = 0;
SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup));
memset(info, 0, sizeof(*info));
filterInitUnitsFields(info);
for (uint16_t i = 0; i < oinfo.groupNum; ++i) {
SFilterGroup* g = oinfo.groups + i;
SFilterGroup ng = {0};
uint16_t unitNum = (uctx && uctx[i]) ? uctx[i]->num : g->unitNum;
if (unitNum == 0) {
continue;
}
++gNum;
if ((uctx == NULL) || (uctx[i] == NULL)) {
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
taosArrayPush(group, &ng);
continue;
}
SFilterGroupCtx* ctx = uctx[i];
for (uint16_t n = 0; n < g->unitNum; ++n) {
SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n);
int32_t type = FILTER_UNIT_DATA_TYPE(u);
if (FILTER_NO_MERGE_DATA_TYPE(type)) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
continue;
}
uint16_t cidx = FILTER_UNIT_COL_IDX(u);
assert(ctx->col[cidx] > 0 || ctx->col[cidx] == -1);
if (ctx->col[cidx] != -1) {
filterAddUnitFromUnit(info, &oinfo, u);
filterAddUnitToGroup(&ng, info->unitNum - 1);
}
}
if (ctx->colRange && taosArrayGetSize(ctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(ctx->colRange);
for (int32_t i = 0; i < size; ++i) {
SFilterColRange *cra = taosArrayGet(ctx->colRange, i);
filterAddGroupUnitFromRange(info, &oinfo, cra, &ng, TSDB_RELATION_AND, NULL);
}
}
taosArrayPush(group, &ng);
}
if (gctx->colRange && taosArrayGetSize(gctx->colRange) > 0) {
int32_t size = (int32_t)taosArrayGetSize(gctx->colRange);
for (int32_t i = 0; i < size; ++i) {
SFilterColRange *cra = taosArrayGet(gctx->colRange, i);
filterAddGroupUnitFromRange(info, &oinfo, cra, NULL, TSDB_RELATION_OR, group);
}
}
filterGenerateGroupFromArray(info, group);
taosArrayDestroy(group);
return TSDB_CODE_SUCCESS;
}
int32_t filterPreprocess(SFilterInfo *info) {
SArray* res = NULL;
SFilterGroupCtx** unitsRes = NULL;
SFilterGroupCtx groupRes = {0};
filterMergeGroupUnits(info, &res);
filterProcessUnits(info, &unitsRes);
filterProcessGroups(info, unitsRes, &groupRes);
if (FILTER_GET_FLAG(info->flags, FILTER_ALL)) {
qInfo("Final - FilterInfo: [ALL]");
return TSDB_CODE_SUCCESS;
}
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
qInfo("Final - FilterInfo: [NONE]");
return TSDB_CODE_SUCCESS;
}
//TODO GET COLUMN RANGE
filterRewrite(info, &groupRes, unitsRes);
return TSDB_CODE_SUCCESS;
}
#endif
int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) {
CHK_LRET(info == NULL, TSDB_CODE_QRY_APP_ERROR, "info NULL");
CHK_LRET(info->fields[F_FIELD_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds");
CHK_LRET(info->fields[FLD_TYPE_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds");
for (uint16_t i = 0; i < info->fields[F_FIELD_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[F_FIELD_COLUMN].fields[i];
for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) {
SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i];
SSchema* sch = fi->desc;
if (sch->colId == colId) {
fi->data = data;
......@@ -884,6 +1518,10 @@ int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) {
bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
bool all = true;
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
return false;
}
for (int32_t i = 0; i < numOfRows; ++i) {
FILTER_UNIT_CLR_F(info);
......@@ -901,10 +1539,8 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
ures = FILTER_UNIT_GET_R(info, uidx);
} else {
SFilterUnit *unit = &info->units[uidx];
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit);
if (isNull(FILTER_GET_COL_FIELD_DATA(left, i), FILTER_GET_COL_FIELD_TYPE(left))) {
if (isNull(FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_DATA_TYPE(unit))) {
ures = unit->compare.optr == TSDB_RELATION_ISNULL ? true : false;
} else {
if (unit->compare.optr == TSDB_RELATION_NOTNULL) {
......@@ -912,7 +1548,7 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
} else if (unit->compare.optr == TSDB_RELATION_ISNULL) {
ures = false;
} else {
ures = filterDoCompare(unit, FILTER_GET_COL_FIELD_DATA(left, i), FILTER_GET_VAL_FIELD_DATA(right));
ures = filterDoCompare(unit, FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_VAL_DATA(info, unit));
}
}
......@@ -941,8 +1577,7 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) {
}
int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) {
int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options) {
int32_t code = TSDB_CODE_SUCCESS;
SFilterInfo *info = NULL;
......@@ -954,42 +1589,33 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) {
info = *pinfo;
SArray* group = taosArrayInit(4, sizeof(SFilterGroup));
info->unitSize = FILTER_DEFAULT_UNIT_SIZE;
info->units = calloc(info->unitSize, sizeof(SFilterUnit));
info->flags = options;
info->fields[F_FIELD_COLUMN].num = 0;
info->fields[F_FIELD_COLUMN].size = FILTER_DEFAULT_FIELD_SIZE;
info->fields[F_FIELD_COLUMN].fields = calloc(info->fields[F_FIELD_COLUMN].size, COL_FIELD_SIZE);
info->fields[F_FIELD_VALUE].num = 0;
info->fields[F_FIELD_VALUE].size = FILTER_DEFAULT_FIELD_SIZE;
info->fields[F_FIELD_VALUE].fields = calloc(info->fields[F_FIELD_VALUE].size, sizeof(SFilterField));
SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup));
filterInitUnitsFields(info);
code = filterTreeToGroup(tree, info, group);
ERR_JRET(code);
size_t groupSize = taosArrayGetSize(group);
info->groupNum = (uint16_t)groupSize;
if (info->groupNum > 0) {
info->groups = calloc(info->groupNum, sizeof(*info->groups));
}
for (size_t i = 0; i < groupSize; ++i) {
SFilterGroup *pg = taosArrayGet(group, i);
pg->unitFlags = calloc(pg->unitNum, sizeof(*pg->unitFlags));
info->groups[i] = *pg;
}
filterGenerateGroupFromArray(info, group);
ERR_JRET(filterInitValFieldData(info));
filterDumpInfoToString(info, "Before preprocess");
if (!FILTER_GET_FLAG(info->flags, FILTER_NO_REWRITE)) {
filterDumpInfoToString(info, "Before preprocess");
//ERR_JRET(filterPreprocess(info));
ERR_JRET(filterPreprocess(info));
CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL));
if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) {
taosArrayDestroy(group);
return code;
}
}
ERR_JRET(filterInitUnitFunc(info));
info->unitRes = malloc(info->unitNum * sizeof(*info->unitRes));
......@@ -997,10 +1623,19 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) {
filterDumpInfoToString(info, "Final");
taosArrayDestroy(group);
return code;
_err_return:
qInfo("No filter, code:%d", code);
taosArrayDestroy(group);
filterFreeInfo(*pinfo);
*pinfo = NULL;
return code;
}
......@@ -1043,6 +1678,11 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
break;
}
if (FILTER_GET_FLAG(cur->status, MR_ALL)) {
FILTER_SET_FLAG(prev->status, MR_ALL);
break;
}
if (group->unitNum > 1) {
filterAddMergeRangeCtx(prev, cur, TSDB_RELATION_OR);
filterResetMergeRangeCtx(cur);
......@@ -1050,13 +1690,20 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
}
if (code == TSDB_CODE_SUCCESS) {
filterGetMergeRangeNum(prev, &num);
if (num != 1) {
qError("only one time range accepted, num:%d", num);
ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
}
if (FILTER_GET_FLAG(prev->status, MR_ALL)) {
*win = TSWINDOW_INITIALIZER;
} else {
filterGetMergeRangeNum(prev, &num);
if (num != 1) {
qError("only one time range accepted, num:%d", num);
ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION);
}
filterGetMergeRangeRes(prev, &win->skey, &win->ekey);
SFilterRange ra;
filterGetMergeRangeRes(prev, &ra);
win->skey = ra.s;
win->ekey = ra.e;
}
}
_err_return:
......
......@@ -20,7 +20,7 @@ namespace {
void intDataTest() {
printf("running %s\n", __FUNCTION__);
int32_t asize = 0;
SFilterRange ra = {0};
SFilterRange ra[10] = {0};
int64_t *s =NULL;
int64_t *e =NULL;
int64_t s0[3] = {-100, 1, 3};
......@@ -34,179 +34,191 @@ void intDataTest() {
int64_t s4[2] = {10, 0};
int64_t e4[2] = {20, 5};
int64_t s5[3] = {0, 6 ,7};
int64_t e5[5] = {4, 10,20};
int64_t e5[3] = {4, 10,20};
int64_t rs[10];
int64_t re[10];
int32_t num = 0;
void *h = NULL;
s = s0;
e = e0;
asize = sizeof(s0)/sizeof(s[0]);
void *h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
asize = sizeof(s0)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 3);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], -100);
ASSERT_EQ(re[0], 0);
ASSERT_EQ(rs[1], 1);
ASSERT_EQ(re[1], 2);
ASSERT_EQ(rs[2], 3);
ASSERT_EQ(re[2], 4);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, -100);
ASSERT_EQ(ra[0].e, 0);
ASSERT_EQ(ra[1].s, 1);
ASSERT_EQ(ra[1].e, 2);
ASSERT_EQ(ra[2].s, 3);
ASSERT_EQ(ra[2].e, 4);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, MR_OPT_TS);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], -100);
ASSERT_EQ(re[0], 4);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, -100);
ASSERT_EQ(ra[0].e, 4);
filterFreeMergeRange(h);
s = s1;
e = e1;
asize = sizeof(s1)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 3);
ASSERT_EQ(re[0], 4);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 3);
ASSERT_EQ(ra[0].e, 4);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], INT64_MIN);
ASSERT_EQ(re[0], 100);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, INT64_MIN);
ASSERT_EQ(ra[0].e, 100);
filterFreeMergeRange(h);
s = s2;
e = e2;
asize = sizeof(s2)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 1);
ASSERT_EQ(re[0], 120);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 1);
ASSERT_EQ(ra[0].e, 120);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, i % 2 ? TSDB_RELATION_OR : TSDB_RELATION_AND);
filterAddMergeRange(h, ra, i % 2 ? TSDB_RELATION_OR : TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, i % 2 ? TSDB_RELATION_AND : TSDB_RELATION_OR);
filterAddMergeRange(h, ra, i % 2 ? TSDB_RELATION_AND : TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 70);
ASSERT_EQ(re[0], 120);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 70);
ASSERT_EQ(ra[0].e, 120);
filterFreeMergeRange(h);
s = s3;
e = e3;
asize = sizeof(s3)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 1);
ASSERT_EQ(re[0], 100);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 1);
ASSERT_EQ(ra[0].e, 100);
filterFreeMergeRange(h);
......@@ -215,82 +227,131 @@ void intDataTest() {
s = s4;
e = e4;
asize = sizeof(s4)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 2);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 0);
ASSERT_EQ(re[0], 5);
ASSERT_EQ(rs[1], 10);
ASSERT_EQ(re[1], 20);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 0);
ASSERT_EQ(ra[0].e, 5);
ASSERT_EQ(ra[1].s, 10);
ASSERT_EQ(ra[1].e, 20);
filterFreeMergeRange(h);
s = s5;
e = e5;
asize = sizeof(s5)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_AND);
filterAddMergeRange(h, ra, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, TSDB_RELATION_OR);
filterAddMergeRange(h, ra, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 2);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 0);
ASSERT_EQ(re[0], 4);
ASSERT_EQ(rs[1], 6);
ASSERT_EQ(re[1], 20);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 0);
ASSERT_EQ(ra[0].e, 4);
ASSERT_EQ(ra[1].s, 6);
ASSERT_EQ(ra[1].e, 20);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra.s = s[i];
ra.e = e[i];
ra[0].s = s[i];
ra[0].e = e[i];
filterAddMergeRange(h, &ra, (i == (asize -1)) ? TSDB_RELATION_AND : TSDB_RELATION_OR);
filterAddMergeRange(h, ra, (i == (asize -1)) ? TSDB_RELATION_AND : TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 1);
filterGetMergeRangeRes(h, rs, re);
ASSERT_EQ(rs[0], 7);
ASSERT_EQ(re[0], 10);
filterGetMergeRangeRes(h, ra);
ASSERT_EQ(ra[0].s, 7);
ASSERT_EQ(ra[0].e, 10);
filterFreeMergeRange(h);
int64_t s6[2] = {0, 4};
int64_t e6[2] = {4, 6};
s = s6;
e = e6;
asize = sizeof(s6)/sizeof(s[0]);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra[0].eflag = 1;
ra[1].sflag = 4;
ra[i].s = s[i];
ra[i].e = e[i];
filterAddMergeRange(h, ra + i, TSDB_RELATION_AND);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 0);
filterFreeMergeRange(h);
memset(ra, 0, sizeof(ra));
h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0);
for (int32_t i = 0; i < asize; ++i) {
ra[0].eflag = 1;
ra[1].sflag = 1;
ra[i].s = s[i];
ra[i].e = e[i];
filterAddMergeRange(h, ra + i, TSDB_RELATION_OR);
}
filterGetMergeRangeNum(h, &num);
ASSERT_EQ(num, 2);
ASSERT_EQ(ra[0].s, 0);
ASSERT_EQ(ra[0].e, 4);
ASSERT_EQ(ra[0].eflag, 1);
ASSERT_EQ(ra[1].s, 4);
ASSERT_EQ(ra[1].e, 6);
ASSERT_EQ(ra[1].sflag, 1);
filterFreeMergeRange(h);
}
......
......@@ -42,7 +42,7 @@ typedef struct SHashNode {
#define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen)
#define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode))
#define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode));
#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode)))
typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0,
......@@ -161,6 +161,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p);
void taosHashCancelIterate(SHashObj *pHashObj, void *p);
void *taosHashGetDataKey(SHashObj *pHashObj, void *data);
#ifdef __cplusplus
}
#endif
......
......@@ -695,6 +695,10 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) {
return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj);
}
FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) {
return GET_HASH_NODE_KEY(GET_HASH_PNODE(data));
}
// release the pNode, return next pNode, and lock the current entry
static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) {
......
......@@ -64,21 +64,21 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
}
int32_t compareUint32Val(const void *pLeft, const void *pRight) {
int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint64Val(const void *pLeft, const void *pRight) {
int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册