diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 061ba7653b9ece73b1aacc3ab5a2bd467db4a1ec..92ebc8fbaa9748e7d6a7704ad88894b415c38c79 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -5006,7 +5006,7 @@ static int32_t getQueryTimeRange(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr goto _ret; } - ret = filterInitFromTree(p, &filter); + ret = filterInitFromTree(p, &filter, FILTER_NO_REWRITE); if (ret != TSDB_CODE_SUCCESS) { goto _ret; } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index a6a25269fc84a67120a490ae16fb996f0fd2a2f3..f7bb4fa1bf11a748d2d03122584f181befe9717b 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -81,7 +81,23 @@ int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *le *(str + bufSize + 1) = '"'; n = bufSize + 2; break; + + case TSDB_DATA_TYPE_UTINYINT: + n = sprintf(str, "%d", *(uint8_t*)buf); + break; + + case TSDB_DATA_TYPE_USMALLINT: + n = sprintf(str, "%d", *(uint16_t*)buf); + break; + case TSDB_DATA_TYPE_UINT: + n = sprintf(str, "%u", *(uint32_t*)buf); + break; + + case TSDB_DATA_TYPE_UBIGINT: + n = sprintf(str, "%" PRIu64, *(uint64_t*)buf); + break; + default: tscError("unsupported type:%d", type); return TSDB_CODE_TSC_INVALID_VALUE; diff --git a/src/query/inc/qFilter.h b/src/query/inc/qFilter.h index e570623ba1595e38725cdd58709522714cd67af7..092d244e24627af7be5e898bad761bcc0ee1e697 100644 --- a/src/query/inc/qFilter.h +++ b/src/query/inc/qFilter.h @@ -22,19 +22,24 @@ extern "C" { #include "texpr.h" +#define FILTER_DEFAULT_GROUP_SIZE 4 #define FILTER_DEFAULT_UNIT_SIZE 4 #define FILTER_DEFAULT_FIELD_SIZE 4 #define FILTER_DEFAULT_GROUP_UNIT_SIZE 2 enum { - F_FIELD_COLUMN = 0, - F_FIELD_VALUE, - F_FIELD_MAX + FLD_TYPE_COLUMN = 1, + FLD_TYPE_VALUE = 2, + FLD_TYPE_MAX = 3, + FLD_DESC_NO_FREE = 4, + FLD_DATA_NO_FREE = 8, }; enum { MR_ST_START = 1, MR_ST_FIN = 2, + MR_ALL = 4, + MR_NONE = 8, }; enum { @@ -46,17 +51,17 @@ enum { RA_NULL = 2, }; +enum { + FILTER_ALL = 1, + FILTER_NONE = 2, + FILTER_NO_REWRITE = 4, +}; + typedef struct OptrStr { uint16_t optr; char *str; } OptrStr; -typedef struct SFilterColRange { - uint16_t idx; //column field idx - int64_t s; - int64_t e; -} SFilterColRange; - typedef struct SFilterRange { char sflag; char eflag; @@ -64,6 +69,13 @@ typedef struct SFilterRange { int64_t e; } SFilterRange; +typedef struct SFilterColRange { + uint16_t idx; //column field idx + bool isNull; + bool notNull; + SFilterRange ra; +} SFilterColRange; + typedef struct SFilterRangeNode { struct SFilterRangeNode* prev; @@ -81,7 +93,7 @@ typedef struct SFilterRMCtx { } SFilterRMCtx ; typedef struct SFilterField { - uint16_t type; + uint16_t flag; void* desc; void* data; int64_t range[]; @@ -99,13 +111,21 @@ typedef struct SFilterFieldId { } SFilterFieldId; typedef struct SFilterGroup { + uint16_t unitSize; uint16_t unitNum; uint16_t *unitIdxs; uint8_t *unitFlags; // !unit result } SFilterGroup; +typedef struct SFilterGroupCtx { + uint16_t num; + int32_t *col; + SArray *colRange; +} SFilterGroupCtx; + typedef struct SFilterCompare { __compar_fn_t pCompareFunc; + int32_t type; uint8_t optr; } SFilterCompare; @@ -116,10 +136,11 @@ typedef struct SFilterUnit { } SFilterUnit; typedef struct SFilterInfo { + uint32_t flags; uint16_t unitSize; uint16_t unitNum; uint16_t groupNum; - SFilterFields fields[F_FIELD_MAX]; + SFilterFields fields[FLD_TYPE_MAX]; SFilterGroup *groups; SFilterUnit *units; uint8_t *unitRes; // result @@ -133,16 +154,24 @@ typedef struct SFilterInfo { #define MR_EMPTY_RES(ctx) (ctx->rs == NULL) -#define MR_GET_FLAG(st, f) (st & f) -#define MR_SET_FLAG(st, f) st |= (f) +#define SET_OPTR(o) do {if (o == TSDB_RELATION_ISNULL) { isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { notnull = true; } } while (0) +#define CHK_OPTR() (isnull == true && notnull == true) + + +#define FILTER_GET_FLAG(st, f) (st & f) +#define FILTER_SET_FLAG(st, f) st |= (f) +#define FILTER_CLR_FLAG(st, f) st &= (~f) #define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src) -#define RESET_RANGE(ctx, r) do { r->next = ctx->rf; ctx->rf = r; } while (0) -#define FREE_RANGE(ctx, r) do { if (r->prev) { r->prev->next = r->next; } else { ctx->rs = r->next;} if (r->next) { r->next->prev = r->prev; } RESET_RANGE(ctx, r); } while (0) -#define FREE_FROM_RANGE(ctx, r) do { if (r->prev) { r->prev->next = NULL; } else { ctx->rs = NULL;} while (r) {SFilterRangeNode *n = r->next; RESET_RANGE(ctx, r); r = n; } } while (0) -#define INSERT_RANGE(ctx, r, t, s, e) do { SFilterRangeNode *n = filterNewRange(ctx, t, s, e); n->prev = r->prev; if (r->prev) { r->prev->next = n; } else { ctx->rs = n; } r->prev = n; n->next = r; } while (0) -#define APPEND_RANGE(ctx, r, t, s, e) do { SFilterRangeNode *n = filterNewRange(ctx, t, s, e); n->prev = r; if (r) { r->next = n; } else { ctx->rs = n; } } while (0) +#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RA_EXCLUDE) || FILTER_GET_FLAG(eflag,RA_EXCLUDE)))) +#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0) + +#define RESET_RANGE(ctx, r) do { (r)->next = (ctx)->rf; (ctx)->rf = r; } while (0) +#define FREE_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = (r)->next; } else { (ctx)->rs = (r)->next;} if ((r)->next) { (r)->next->prev = (r)->prev; } RESET_RANGE(ctx, r); } while (0) +#define FREE_FROM_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = NULL; } else { (ctx)->rs = NULL;} while (r) {SFilterRangeNode *n = (r)->next; RESET_RANGE(ctx, r); r = n; } } while (0) +#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0) +#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0) #define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0) #define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0) @@ -155,16 +184,21 @@ typedef struct SFilterInfo { #define CHK_LRET(c, r,...) do { if (c) { qError(__VA_ARGS__); return r; } } while (0) #define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx])) +#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx])) #define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type) +#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc)) #define FILTER_GET_COL_FIELD_DATA(fi, ri) ((fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri)) #define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType) #define FILTER_GET_VAL_FIELD_DATA(fi) ((fi)->data) +#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX) -#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units[(g)->unitIdxs[uid]]) +#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid]) #define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left) #define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right) -#define FILTER_UNIT_DATA_TYPE(i, u) FILTER_GET_COL_FIELD_TYPE(FILTER_UNIT_LEFT_FIELD(i, u)) -#define FILTER_UNIT_VAL(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u)) +#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type) +#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u)) +#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri) +#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u)) #define FILTER_UNIT_COL_IDX(u) ((u)->left.idx) #define FILTER_UNIT_OPTR(u) ((u)->compare.optr) @@ -177,12 +211,12 @@ typedef struct SFilterInfo { typedef int32_t(*filter_desc_compare_func)(const void *, const void *); -extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo); +extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options); extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p); extern int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data); extern void* filterInitMergeRange(int32_t type, int32_t options); extern int32_t filterGetMergeRangeNum(void* h, int32_t* num); -extern int32_t filterGetMergeRangeRes(void* h, void *s, void* e); +extern int32_t filterGetMergeRangeRes(void* h, SFilterRange *ra); extern int32_t filterFreeMergeRange(void* h); extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win); #ifdef __cplusplus diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 63e5c50c50810ddb62b8207f0fd5893ecc7be6c3..f6a5d222877c266a33e9b4f26a06407f7040143b 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6966,7 +6966,7 @@ int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters) { return TSDB_CODE_QRY_APP_ERROR; } - int32_t ret = filterInitFromTree(expr, pFilters); + int32_t ret = filterInitFromTree(expr, pFilters, 0); tExprTreeDestroy(expr, NULL); return ret; diff --git a/src/query/src/qFilter.c b/src/query/src/qFilter.c index f5a04b46e09f1c177f4f862d492979e8f59c711b..aeab5c995b19524d9e77f315487292f2e863b85c 100644 --- a/src/query/src/qFilter.c +++ b/src/query/src/qFilter.c @@ -16,6 +16,8 @@ #include "queryLog.h" #include "qFilter.h" #include "tcompare.h" +#include "hash.h" +#include "tscUtil.h" OptrStr gOptrStr[] = { {TSDB_RELATION_INVALID, "invalid"}, @@ -49,12 +51,28 @@ static FORCE_INLINE int32_t filterFieldValDescCompare(const void *desc1, const v } -filter_desc_compare_func gDescCompare [F_FIELD_MAX] = { +filter_desc_compare_func gDescCompare [FLD_TYPE_MAX] = { + NULL, filterFieldColDescCompare, filterFieldValDescCompare }; -static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, int32_t t, void *s, void *e) { + +int32_t filterInitUnitsFields(SFilterInfo *info) { + info->unitSize = FILTER_DEFAULT_UNIT_SIZE; + info->units = calloc(info->unitSize, sizeof(SFilterUnit)); + + info->fields[FLD_TYPE_COLUMN].num = 0; + info->fields[FLD_TYPE_COLUMN].size = FILTER_DEFAULT_FIELD_SIZE; + info->fields[FLD_TYPE_COLUMN].fields = calloc(info->fields[FLD_TYPE_COLUMN].size, COL_FIELD_SIZE); + info->fields[FLD_TYPE_VALUE].num = 0; + info->fields[FLD_TYPE_VALUE].size = FILTER_DEFAULT_FIELD_SIZE; + info->fields[FLD_TYPE_VALUE].fields = calloc(info->fields[FLD_TYPE_VALUE].size, sizeof(SFilterField)); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, SFilterRange* ra) { SFilterRangeNode *r = NULL; if (ctx->rf) { @@ -65,9 +83,8 @@ static FORCE_INLINE SFilterRangeNode* filterNewRange(SFilterRMCtx *ctx, int32_t } else { r = calloc(1, sizeof(SFilterRangeNode)); } - - SIMPLE_COPY_VALUES((char*)&r->ra.s, s); - SIMPLE_COPY_VALUES((char*)&r->ra.e, e); + + FILTER_COPY_RA(&r->ra, ra); return r; } @@ -119,14 +136,41 @@ int32_t filterReuseMergeRangeCtx(SFilterRMCtx *ctx, int32_t type, int32_t option } +int32_t filterPostProcessRange(SFilterRMCtx *cur, SFilterRange *ra, bool *notNull) { + if (!FILTER_GET_FLAG(ra->sflag, RA_NULL)) { + int32_t sr = cur->pCompareFunc(&ra->s, getDataMin(cur->type)); + if (sr == 0) { + FILTER_SET_FLAG(ra->sflag, RA_NULL); + } + } -int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char eflag, int32_t optr) { + if (!FILTER_GET_FLAG(ra->eflag, RA_NULL)) { + int32_t er = cur->pCompareFunc(&ra->e, getDataMax(cur->type)); + if (er == 0) { + FILTER_SET_FLAG(ra->eflag, RA_NULL); + } + } + + + if (FILTER_GET_FLAG(ra->sflag, RA_NULL) && FILTER_GET_FLAG(ra->eflag, RA_NULL)) { + *notNull = true; + } else { + *notNull = false; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t filterAddMergeRangeImpl(void* h, SFilterRange* ra, int32_t optr) { SFilterRMCtx *ctx = (SFilterRMCtx *)h; if (ctx->rs == NULL) { - if (MR_GET_FLAG(ctx->status, MR_ST_START) == 0 || optr == TSDB_RELATION_OR) { - APPEND_RANGE(ctx, ctx->rs, ctx->type, s, e); - MR_SET_FLAG(ctx->status, MR_ST_START); + if ((FILTER_GET_FLAG(ctx->status, MR_ST_START) == 0) + || (FILTER_GET_FLAG(ctx->status, MR_ALL) && (optr == TSDB_RELATION_AND)) + || ((!FILTER_GET_FLAG(ctx->status, MR_ALL)) && (optr == TSDB_RELATION_OR))) { + APPEND_RANGE(ctx, ctx->rs, ra); + FILTER_SET_FLAG(ctx->status, MR_ST_START); } return TSDB_CODE_SUCCESS; @@ -134,27 +178,34 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla SFilterRangeNode *r = ctx->rs; SFilterRangeNode *rn = NULL; + int32_t cr = 0; if (optr == TSDB_RELATION_AND) { while (r != NULL) { - if (ctx->pCompareFunc(&r->ra.s, e) > 0) { + cr = ctx->pCompareFunc(&r->ra.s, &ra->e); + if (FILTER_GREATER(cr, r->ra.sflag, ra->eflag)) { FREE_FROM_RANGE(ctx, r); break; } - if (ctx->pCompareFunc(s, &r->ra.e) > 0) { + cr = ctx->pCompareFunc(&ra->s, &r->ra.e); + if (FILTER_GREATER(cr, ra->sflag, r->ra.eflag)) { rn = r->next; FREE_RANGE(ctx, r); r = rn; continue; } - if (ctx->pCompareFunc(s, &r->ra.s) > 0) { - SIMPLE_COPY_VALUES((char *)&r->ra.s, s); + cr = ctx->pCompareFunc(&ra->s, &r->ra.s); + if (FILTER_GREATER(cr, ra->sflag, r->ra.sflag)) { + SIMPLE_COPY_VALUES((char *)&r->ra.s, &ra->s); + cr == 0 ? (r->ra.sflag = 0) : (r->ra.sflag = ra->sflag); } - if (ctx->pCompareFunc(&r->ra.e, e) > 0) { - SIMPLE_COPY_VALUES((char *)&r->ra.e, e); + cr = ctx->pCompareFunc(&r->ra.e, &ra->e); + if (FILTER_GREATER(cr, r->ra.eflag, ra->eflag)) { + SIMPLE_COPY_VALUES((char *)&r->ra.e, &ra->e); + cr == 0 ? (r->ra.eflag = 0) : (r->ra.eflag = ra->eflag); break; } @@ -166,41 +217,52 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla //TSDB_RELATION_OR + bool smerged = false; bool emerged = false; while (r != NULL) { - if (ctx->pCompareFunc(&r->ra.s, e) > 0) { + cr = ctx->pCompareFunc(&r->ra.s, &ra->e); + if (FILTER_GREATER(cr, r->ra.sflag, ra->eflag)) { if (emerged == false) { - INSERT_RANGE(ctx, r, ctx->type, s, e); + INSERT_RANGE(ctx, r, ra); } break; } - if (ctx->pCompareFunc(s, &r->ra.e) > 0) { - if (r->next) { - r= r->next; - continue; - } + if (smerged == false) { + cr = ctx->pCompareFunc(&ra->s, &r->ra.e); + if (FILTER_GREATER(cr, ra->sflag, r->ra.eflag)) { + if (r->next) { + r= r->next; + continue; + } - APPEND_RANGE(ctx, r, ctx->type, s, e); - break; - } + APPEND_RANGE(ctx, r, ra); + break; + } - if (smerged == false) { - if (ctx->pCompareFunc(&r->ra.s, s) > 0) { - SIMPLE_COPY_VALUES((char *)&r->ra.s, s); + cr = ctx->pCompareFunc(&r->ra.s, &ra->s); + if (FILTER_GREATER(cr, r->ra.sflag, ra->sflag)) { + SIMPLE_COPY_VALUES((char *)&r->ra.s, &ra->s); + cr == 0 ? (r->ra.sflag &= ra->sflag) : (r->ra.sflag = ra->sflag); } smerged = true; } if (emerged == false) { - if (ctx->pCompareFunc(e, &r->ra.e) > 0) { - SIMPLE_COPY_VALUES((char *)&r->ra.e, e); + cr = ctx->pCompareFunc(&ra->e, &r->ra.e); + if (FILTER_GREATER(cr, ra->eflag, r->ra.eflag)) { + SIMPLE_COPY_VALUES((char *)&r->ra.e, &ra->e); + if (cr == 0) { + r->ra.eflag &= ra->eflag; + break; + } + + r->ra.eflag = ra->eflag; emerged = true; - e = &r->ra.e; r = r->next; continue; } @@ -208,43 +270,48 @@ int32_t filterAddMergeRangeImpl(void* h, void* s, void* e, char sflag, char efla break; } - if (ctx->pCompareFunc(e, &r->ra.e) > 0) { + cr = ctx->pCompareFunc(&ra->e, &r->ra.e); + if (FILTER_GREATER(cr, ra->eflag, r->ra.eflag)) { rn = r->next; FREE_RANGE(ctx, r); r = rn; continue; } else { - SIMPLE_COPY_VALUES(e, (char *)&r->ra.e); + SIMPLE_COPY_VALUES(&r->prev->ra.e, (char *)&r->ra.e); + cr == 0 ? (r->prev->ra.eflag &= r->ra.eflag) : (r->prev->ra.eflag = r->ra.eflag); FREE_RANGE(ctx, r); break; } } + if (ctx->rs && ctx->rs->next == NULL) { + bool notnull; + filterPostProcessRange(ctx, &ctx->rs->ra, ¬null); + if (notnull) { + FREE_FROM_RANGE(ctx, ctx->rs); + FILTER_SET_FLAG(ctx->status, MR_ALL); + } + } + return TSDB_CODE_SUCCESS; } int32_t filterAddMergeRange(void* h, SFilterRange* ra, int32_t optr) { SFilterRMCtx *ctx = (SFilterRMCtx *)h; - int64_t sv, ev; - void *s, *e; - if (MR_GET_FLAG(ra->sflag, RA_NULL)) { - SIMPLE_COPY_VALUES(&sv, getDataMin(ctx->type)); - s = &sv; - } else { - s = &ra->s; + if (FILTER_GET_FLAG(ra->sflag, RA_NULL)) { + SIMPLE_COPY_VALUES(&ra->s, getDataMin(ctx->type)); + //FILTER_CLR_FLAG(ra->sflag, RA_NULL); } - if (MR_GET_FLAG(ra->eflag, RA_NULL)) { - SIMPLE_COPY_VALUES(&ev, getDataMax(ctx->type)); - e = &ev; - } else { - e = &ra->e; + if (FILTER_GET_FLAG(ra->eflag, RA_NULL)) { + SIMPLE_COPY_VALUES(&ra->e, getDataMax(ctx->type)); + //FILTER_CLR_FLAG(ra->eflag, RA_NULL); } - return filterAddMergeRangeImpl(h, s, e, ra->sflag, ra->eflag, optr); + return filterAddMergeRangeImpl(h, ra, optr); } int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) { @@ -269,11 +336,11 @@ int32_t filterAddMergeRangeCtx(void *dst, void *src, int32_t optr) { int32_t filterFinMergeRange(void* h) { SFilterRMCtx *ctx = (SFilterRMCtx *)h; - if (MR_GET_FLAG(ctx->status, MR_ST_FIN)) { + if (FILTER_GET_FLAG(ctx->status, MR_ST_FIN)) { return TSDB_CODE_SUCCESS; } - if (MR_GET_FLAG(ctx->options, MR_OPT_TS)) { + if (FILTER_GET_FLAG(ctx->options, MR_OPT_TS)) { SFilterRangeNode *r = ctx->rs; SFilterRangeNode *rn = NULL; @@ -293,7 +360,7 @@ int32_t filterFinMergeRange(void* h) { } } - MR_SET_FLAG(ctx->status, MR_ST_FIN); + FILTER_SET_FLAG(ctx->status, MR_ST_FIN); return TSDB_CODE_SUCCESS; } @@ -316,7 +383,7 @@ int32_t filterGetMergeRangeNum(void* h, int32_t* num) { } -int32_t filterGetMergeRangeRes(void* h, void *s, void* e) { +int32_t filterGetMergeRangeRes(void* h, SFilterRange *ra) { filterFinMergeRange(h); SFilterRMCtx *ctx = (SFilterRMCtx *)h; @@ -324,11 +391,11 @@ int32_t filterGetMergeRangeRes(void* h, void *s, void* e) { SFilterRangeNode* r = ctx->rs; while (r) { - assignVal(s + num * tDataTypes[ctx->type].bytes, (char *)&r->ra.s, 0, ctx->type); - assignVal(e + num * tDataTypes[ctx->type].bytes, (char *)&r->ra.e, 0, ctx->type); + FILTER_COPY_RA(ra, &r->ra); ++num; r = r->next; + ++ra; } if (num == 0) { @@ -409,29 +476,14 @@ int32_t filterGetFiled(SFilterFields* fields, int32_t type, void *v) { return -1; } - -int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) { - CHK_LRET(node == NULL, TSDB_CODE_QRY_APP_ERROR, "empty node"); - CHK_LRET(node->nodeType != TSQL_NODE_COL && node->nodeType != TSQL_NODE_VALUE, TSDB_CODE_QRY_APP_ERROR, "invalid nodeType:%d", node->nodeType); - - int32_t type, idx = -1; +int32_t filterAddField(SFilterInfo *info, void *desc, void *data, int32_t type, SFilterFieldId *fid) { + int32_t idx = -1; uint16_t *num; - void *v; - - if (node->nodeType == TSQL_NODE_COL) { - type = F_FIELD_COLUMN; - v = node->pSchema; - node->pSchema = NULL; - } else { - type = F_FIELD_VALUE; - v = node->pVal; - node->pVal = NULL; - } num = &info->fields[type].num; - if (*num > 0 && type != F_FIELD_VALUE) { - idx = filterGetFiled(&info->fields[type], type, v); + if (*num > 0 && type != FLD_TYPE_VALUE) { + idx = filterGetFiled(&info->fields[type], type, desc); } if (idx < 0) { @@ -441,8 +493,9 @@ int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) info->fields[type].fields = realloc(info->fields[type].fields, info->fields[type].size * sizeof(SFilterField)); } - info->fields[type].fields[idx].type = type; - info->fields[type].fields[idx].desc = v; + info->fields[type].fields[idx].flag = type; + info->fields[type].fields[idx].desc = desc; + info->fields[type].fields[idx].data = data; ++(*num); } @@ -452,29 +505,186 @@ int32_t filterAddField(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) return TSDB_CODE_SUCCESS; } +static FORCE_INLINE int32_t filterAddFieldFromField(SFilterInfo *info, SFilterField *field, SFilterFieldId *fid) { + filterAddField(info, field->desc, field->data, FILTER_GET_TYPE(field->flag), fid); + + FILTER_SET_FLAG(field->flag, FLD_DESC_NO_FREE); + FILTER_SET_FLAG(field->flag, FLD_DATA_NO_FREE); + + return TSDB_CODE_SUCCESS; +} + + +int32_t filterAddFieldFromNode(SFilterInfo *info, tExprNode *node, SFilterFieldId *fid) { + CHK_LRET(node == NULL, TSDB_CODE_QRY_APP_ERROR, "empty node"); + CHK_LRET(node->nodeType != TSQL_NODE_COL && node->nodeType != TSQL_NODE_VALUE, TSDB_CODE_QRY_APP_ERROR, "invalid nodeType:%d", node->nodeType); + + int32_t type; + void *v; + + if (node->nodeType == TSQL_NODE_COL) { + type = FLD_TYPE_COLUMN; + v = node->pSchema; + node->pSchema = NULL; + } else { + type = FLD_TYPE_VALUE; + v = node->pVal; + node->pVal = NULL; + } + + filterAddField(info, v, NULL, type, fid); + + return TSDB_CODE_SUCCESS; +} + int32_t filterAddUnit(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, SFilterFieldId *right) { if (info->unitNum >= info->unitSize) { + uint16_t psize = info->unitSize; info->unitSize += FILTER_DEFAULT_UNIT_SIZE; info->units = realloc(info->units, info->unitSize * sizeof(SFilterUnit)); + memset(info->units + psize, 0, sizeof(*info->units) * FILTER_DEFAULT_UNIT_SIZE); } + + SFilterUnit *u = &info->units[info->unitNum]; - info->units[info->unitNum].compare.optr = optr; - info->units[info->unitNum].left = *left; - info->units[info->unitNum].right = *right; + u->compare.optr = optr; + u->left = *left; + if (right) { + u->right = *right; + } + if (u->right.type == FLD_TYPE_VALUE) { + SFilterField *val = FILTER_UNIT_RIGHT_FIELD(info, u); + assert(FILTER_GET_FLAG(val->flag, FLD_TYPE_VALUE)); + } else { + assert(optr == TSDB_RELATION_ISNULL || optr == TSDB_RELATION_NOTNULL); + } + + SFilterField *col = FILTER_UNIT_LEFT_FIELD(info, u); + assert(FILTER_GET_FLAG(col->flag, FLD_TYPE_COLUMN)); + + info->units[info->unitNum].compare.type = FILTER_GET_COL_FIELD_TYPE(col); + ++info->unitNum; return TSDB_CODE_SUCCESS; } -int32_t filterAddGroup(SFilterGroup *group, uint16_t unitIdx) { - group->unitNum = 1; - group->unitIdxs= calloc(group->unitNum, sizeof(*group->unitIdxs)); - group->unitIdxs[0] = unitIdx; + +int32_t filterAddUnitToGroup(SFilterGroup *group, uint16_t unitIdx) { + if (group->unitNum >= group->unitSize) { + group->unitSize += FILTER_DEFAULT_UNIT_SIZE; + group->unitIdxs = realloc(group->unitIdxs, group->unitSize * sizeof(*group->unitIdxs)); + } + + group->unitIdxs[group->unitNum++] = unitIdx; + + return TSDB_CODE_SUCCESS; +} + + + +int32_t filterAddUnitFromNode(SFilterInfo *info, tExprNode* tree) { + SFilterFieldId left = {0}, right = {0}; + + filterAddFieldFromNode(info, tree->_node.pLeft, &left); + filterAddFieldFromNode(info, tree->_node.pRight, &right); + + filterAddUnit(info, tree->_node.optr, &left, &right); + return TSDB_CODE_SUCCESS; } + +int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u) { + SFilterFieldId left, right; + + filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left); + filterAddField(dst, NULL, FILTER_UNIT_VAL_DATA(src, u), FLD_TYPE_VALUE, &right); + + SFilterField *t = FILTER_UNIT_LEFT_FIELD(src, u); + FILTER_SET_FLAG(t->flag, FLD_DESC_NO_FREE); + t = FILTER_UNIT_RIGHT_FIELD(src, u); + FILTER_SET_FLAG(t->flag, FLD_DATA_NO_FREE); + + return filterAddUnit(dst, FILTER_UNIT_OPTR(u), &left, &right); +} + +int32_t filterAddGroupUnitFromRange(SFilterInfo *dst, SFilterInfo *src, SFilterColRange *cra, SFilterGroup *g, int32_t optr, SArray *res) { + SFilterFieldId left, right; + + SFilterField *col = FILTER_GET_COL_FIELD(src, cra->idx); + + filterAddFieldFromField(dst, col, &left); + + if (optr == TSDB_RELATION_AND) { + if (cra->isNull) { + assert(cra->notNull == false); + filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL); + return filterAddUnitToGroup(g, dst->unitNum - 1); + } + + if (cra->notNull) { + assert(cra->isNull == false); + filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL); + return filterAddUnitToGroup(g, dst->unitNum - 1); + } + + assert(!((FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) && (FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)))); + + if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) { + filterAddField(dst, NULL, &cra->ra.s, FLD_TYPE_VALUE, &right); + filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right); + filterAddUnitToGroup(g, dst->unitNum - 1); + } + + if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) { + filterAddField(dst, NULL, &cra->ra.e, FLD_TYPE_VALUE, &right); + filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right); + filterAddUnitToGroup(g, dst->unitNum - 1); + } + } else { + SFilterGroup ng = {0}; + g = &ng; + + if (cra->isNull) { + filterAddUnit(dst, TSDB_RELATION_ISNULL, &left, NULL); + filterAddUnitToGroup(g, dst->unitNum - 1); + taosArrayPush(res, g); + } + + if (cra->notNull) { + memset(g, 0, sizeof(*g)); + + filterAddUnit(dst, TSDB_RELATION_NOTNULL, &left, NULL); + filterAddUnitToGroup(g, dst->unitNum - 1); + taosArrayPush(res, g); + } + + memset(g, 0, sizeof(*g)); + + if (!FILTER_GET_FLAG(cra->ra.sflag, RA_NULL)) { + filterAddField(dst, NULL, &cra->ra.s, FLD_TYPE_VALUE, &right); + filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.sflag, RA_EXCLUDE) ? TSDB_RELATION_GREATER : TSDB_RELATION_GREATER_EQUAL, &left, &right); + filterAddUnitToGroup(g, dst->unitNum - 1); + } + + if (!FILTER_GET_FLAG(cra->ra.eflag, RA_NULL)) { + filterAddField(dst, NULL, &cra->ra.e, FLD_TYPE_VALUE, &right); + filterAddUnit(dst, FILTER_GET_FLAG(cra->ra.eflag, RA_EXCLUDE) ? TSDB_RELATION_LESS : TSDB_RELATION_LESS_EQUAL, &left, &right); + filterAddUnitToGroup(g, dst->unitNum - 1); + } + + if (g->unitNum > 0) { + taosArrayPush(res, g); + } + } + + return TSDB_CODE_SUCCESS; +} + + static void filterFreeGroup(void *pItem) { SFilterGroup* p = (SFilterGroup*) pItem; if (p) { @@ -515,14 +725,10 @@ int32_t filterTreeToGroup(tExprNode* tree, SFilterInfo *info, SArray* group) { return TSDB_CODE_SUCCESS; } - SFilterFieldId left, right; - filterAddField(info, tree->_node.pLeft, &left); - filterAddField(info, tree->_node.pRight, &right); - - filterAddUnit(info, tree->_node.optr, &left, &right); + filterAddUnitFromNode(info, tree); SFilterGroup fgroup = {0}; - filterAddGroup(&fgroup, info->unitNum - 1); + filterAddUnitToGroup(&fgroup, info->unitNum - 1); taosArrayPush(group, &fgroup); @@ -537,9 +743,7 @@ _err_return: int32_t filterInitUnitFunc(SFilterInfo *info) { for (uint16_t i = 0; i < info->unitNum; ++i) { SFilterUnit* unit = &info->units[i]; - SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit); - - unit->compare.pCompareFunc = getComparFunc(FILTER_GET_COL_FIELD_TYPE(left), unit->compare.optr); + unit->compare.pCompareFunc = getComparFunc(FILTER_UNIT_DATA_TYPE(unit), unit->compare.optr); } return TSDB_CODE_SUCCESS; @@ -551,29 +755,44 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg) { CHK_LRETV(info == NULL, "%s - FilterInfo: empty", msg); qDebug("%s - FilterInfo:", msg); - qDebug("COLUMN Field Num:%u", info->fields[F_FIELD_COLUMN].num); - for (uint16_t i = 0; i < info->fields[F_FIELD_COLUMN].num; ++i) { - SFilterField *field = &info->fields[F_FIELD_COLUMN].fields[i]; + qDebug("COLUMN Field Num:%u", info->fields[FLD_TYPE_COLUMN].num); + for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) { + SFilterField *field = &info->fields[FLD_TYPE_COLUMN].fields[i]; SSchema *sch = field->desc; qDebug("COL%d => [%d][%s]", i, sch->colId, sch->name); } - qDebug("VALUE Field Num:%u", info->fields[F_FIELD_VALUE].num); - for (uint16_t i = 0; i < info->fields[F_FIELD_VALUE].num; ++i) { - SFilterField *field = &info->fields[F_FIELD_VALUE].fields[i]; - tVariant *var = field->desc; - qDebug("VAL%d => [type:%d][val:%" PRIu64"]", i, var->nType, var->u64); //TODO + qDebug("VALUE Field Num:%u", info->fields[FLD_TYPE_VALUE].num); + for (uint16_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) { + SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i]; + if (field->desc) { + tVariant *var = field->desc; + qDebug("VAL%d => [type:%d][val:%" PRIi64"]", i, var->nType, var->i64); //TODO + } else { + qDebug("VAL%d => [type:NIL][val:%" PRIi64" or %f]", i, *(int64_t *)field->data, *(double *)field->data); //TODO + } } qDebug("Unit Num:%u", info->unitNum); for (uint16_t i = 0; i < info->unitNum; ++i) { SFilterUnit *unit = &info->units[i]; + int32_t type = FILTER_UNIT_DATA_TYPE(unit); + int32_t len = 0; + char str[128]; + SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit); - SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit); - SSchema *sch = left->desc; - tVariant *var = right->desc; - qDebug("UNIT%d => [%d][%s] %s %" PRId64, i, sch->colId, sch->name, gOptrStr[unit->compare.optr].str, IS_NUMERIC_TYPE(var->nType) ? var->i64 : -1); //TODO + len = sprintf(str, "UNIT[%d] => [%d][%s] %s [", i, sch->colId, sch->name, gOptrStr[unit->compare.optr].str); + + if (unit->right.type== FLD_TYPE_VALUE) { + SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit); + converToStr(str + len, type, right->data, 0, &len); + } else { + strcat(str, "NULL"); + } + strcat(str, "]"); + + qDebug("%s", str); //TODO } qDebug("Group Num:%u", info->groupNum); @@ -598,25 +817,17 @@ void filterFreeInfo(SFilterInfo *info) { int32_t filterInitValFieldData(SFilterInfo *info) { for (uint16_t i = 0; i < info->unitNum; ++i) { SFilterUnit* unit = &info->units[i]; - SFilterField* left = FILTER_UNIT_LEFT_FIELD(info, unit); - SFilterField* right = FILTER_UNIT_RIGHT_FIELD(info, unit); - - if (left->type != F_FIELD_VALUE && right->type != F_FIELD_VALUE) { + if (unit->right.type != FLD_TYPE_VALUE) { + assert(unit->compare.optr == TSDB_RELATION_ISNULL || unit->compare.optr == TSDB_RELATION_NOTNULL); continue; } + + SFilterField* right = FILTER_UNIT_RIGHT_FIELD(info, unit); - uint32_t type = 0; - SFilterField* fi = NULL; - if (left->type == F_FIELD_COLUMN) { - type = FILTER_GET_COL_FIELD_TYPE(left); - fi = right; - } else if (right->type == F_FIELD_COLUMN) { - type = FILTER_GET_COL_FIELD_TYPE(right); - fi = left; - } else { - type = FILTER_GET_VAL_FIELD_TYPE(left); - fi = right; - } + assert(FILTER_GET_FLAG(right->flag, FLD_TYPE_VALUE)); + + uint32_t type = FILTER_UNIT_DATA_TYPE(unit); + SFilterField* fi = right; tVariant* var = fi->desc; @@ -632,7 +843,7 @@ int32_t filterInitValFieldData(SFilterInfo *info) { } else if (type == TSDB_DATA_TYPE_NCHAR) { fi->data = calloc(1, (var->nLen + 1) * TSDB_NCHAR_SIZE); } else { - if (var->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { + if (var->nType == TSDB_DATA_TYPE_VALUE_ARRAY) { //TIME RANGE fi->data = calloc(var->nLen, tDataTypes[type].bytes); for (int32_t a = 0; a < var->nLen; ++a) { int64_t *v = taosArrayGet(var->arr, a); @@ -690,38 +901,73 @@ bool filterDoCompare(SFilterUnit *unit, void *left, void *right) { } -#if 0 int32_t filterAddUnitRange(SFilterInfo *info, SFilterUnit* u, SFilterRMCtx *ctx, int32_t optr) { - int32_t type = FILTER_UNIT_DATA_TYPE(info, u); + int32_t type = FILTER_UNIT_DATA_TYPE(u); uint8_t uoptr = FILTER_UNIT_OPTR(u); - void *val = FILTER_UNIT_VAL(info, u); - int64_t s = 0, e = 0; + void *val = FILTER_UNIT_VAL_DATA(info, u); + SFilterRange ra = {0}; + int64_t tmp = 0; switch (uoptr) { case TSDB_RELATION_GREATER: - + SIMPLE_COPY_VALUES(&ra.s, val); + FILTER_SET_FLAG(ra.sflag, RA_EXCLUDE); + FILTER_SET_FLAG(ra.eflag, RA_NULL); break; case TSDB_RELATION_GREATER_EQUAL: + SIMPLE_COPY_VALUES(&ra.s, val); + FILTER_SET_FLAG(ra.eflag, RA_NULL); + break; case TSDB_RELATION_LESS: + SIMPLE_COPY_VALUES(&ra.e, val); + FILTER_SET_FLAG(ra.eflag, RA_EXCLUDE); + FILTER_SET_FLAG(ra.sflag, RA_NULL); + break; case TSDB_RELATION_LESS_EQUAL: + SIMPLE_COPY_VALUES(&ra.e, val); + FILTER_SET_FLAG(ra.sflag, RA_NULL); + break; case TSDB_RELATION_NOT_EQUAL: + assert(type == TSDB_DATA_TYPE_BOOL); + if (GET_INT8_VAL(val)) { + SIMPLE_COPY_VALUES(&ra.s, &tmp); + SIMPLE_COPY_VALUES(&ra.e, &tmp); + } else { + *(bool *)&tmp = true; + SIMPLE_COPY_VALUES(&ra.s, &tmp); + SIMPLE_COPY_VALUES(&ra.e, &tmp); + } + break; case TSDB_RELATION_EQUAL: - case TSDB_RELATION_IN: + SIMPLE_COPY_VALUES(&ra.s, val); + SIMPLE_COPY_VALUES(&ra.e, val); + break; + case TSDB_RELATION_IN: { + void *p = taosHashIterate((SHashObj *)val, NULL); + while(p) { + void *key = taosHashGetDataKey((SHashObj *)val, p); + SIMPLE_COPY_VALUES(&ra.s, key); + SIMPLE_COPY_VALUES(&ra.e, key); + filterAddMergeRange(ctx, &ra, optr); + p = taosHashIterate((SHashObj *)val, p); + } + + return TSDB_CODE_SUCCESS; + } default: assert(0); } - filterAddMergeRange(ctx, &s, &e, optr); + filterAddMergeRange(ctx, &ra, optr); return TSDB_CODE_SUCCESS; } -int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t id1, uint16_t id2, SArray* res) { + +int32_t filterProcessUnitsInOneGroup(SFilterInfo *info, SFilterGroup* g, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum) { bool isnull = false, notnull = false; int32_t num = 0; - - SFilterRMCtx *cur = filterInitMergeRange(type, 0); SFilterUnit* u1 = FILTER_GROUP_UNIT(info, g, id1); SFilterUnit* u2 = FILTER_GROUP_UNIT(info, g, id2); @@ -729,10 +975,8 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t uint8_t optr2 = FILTER_UNIT_OPTR(u2); uint16_t cidx = FILTER_UNIT_COL_IDX(u1); - int32_t type = FILTER_UNIT_DATA_TYPE(info, u1); - -#define SET_OPTR(o) ((o == TSDB_RELATION_ISNULL) ? isnull = true : notnull = true) -#define CHK_OPTR() (isnull == true && notnull == true) + int32_t type = FILTER_UNIT_DATA_TYPE(u1); + SFilterRMCtx *cur = filterInitMergeRange(type, 0); SET_OPTR(optr1); SET_OPTR(optr2); @@ -755,25 +999,37 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t continue; } + ++(*removedNum); optr2 = FILTER_UNIT_OPTR(u); SET_OPTR(optr2); CHK_JMP(CHK_OPTR()); if (!FILTER_NO_MERGE_OPTR(optr2)) { - filterAddUnitRange(info, u2, cur, TSDB_RELATION_AND); + filterAddUnitRange(info, u, cur, TSDB_RELATION_AND); CHK_JMP(MR_EMPTY_RES(cur)); } } - SFilterColRange ra; - ra.idx = cidx; + SFilterColRange cra = {0}; + cra.idx = cidx; filterGetMergeRangeNum(cur, &num); - assert(num == 1); - - filterGetMergeRangeRes(cur, &ra.s, &ra.e); + assert(num == 1 || num == 0); + + if (num == 1) { + filterGetMergeRangeRes(cur, &cra.ra); + filterPostProcessRange(cur, &cra.ra, &cra.notNull); + } else { + if (isnull) { + cra.isNull = true; + } + + if (notnull) { + cra.notNull = true; + } + } - taosArrayPush(res, &ra); + taosArrayPush(res, &cra); filterFreeMergeRange(cur); @@ -782,6 +1038,8 @@ int32_t filterMergeSingleGroupUnits(SFilterInfo *info, SFilterGroup* g, uint16_t _err_return: g->unitNum = 0; + + *removedNum = g->unitNum; filterFreeMergeRange(cur); @@ -789,39 +1047,50 @@ _err_return: } -int32_t filterMergeGroupUnits(SFilterInfo *info, SArray** res) { - uint16_t *f = malloc(1, info->fields[F_FIELD_COLUMN].num * sizeof(uint16_t)); - SArray *gres = NULL; + +int32_t filterProcessUnits(SFilterInfo *info, SFilterGroupCtx*** res) { + int32_t *col = NULL; + SFilterGroupCtx *gctx = NULL; + SArray *colRange = NULL; bool gresUsed = false; + uint16_t removedNum = 0; for (uint16_t i = 0; i < info->groupNum; ++i) { SFilterGroup* g = info->groups + i; - - memet(f, -1, info->fields[F_FIELD_COLUMN].num); + if (col == NULL) { + col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t)); + } else { + memset(col, 0, info->fields[FLD_TYPE_COLUMN].num * sizeof(int32_t)); + } gresUsed = false; + + removedNum = 0; for (uint16_t j = 0; j < g->unitNum; ++j) { SFilterUnit* u = FILTER_GROUP_UNIT(info, g, j); - int32_t type = FILTER_UNIT_DATA_TYPE(info, u); + int32_t type = FILTER_UNIT_DATA_TYPE(u); if (FILTER_NO_MERGE_DATA_TYPE(type)) { continue; } uint16_t cidx = FILTER_UNIT_COL_IDX(u); - if (f[cidx] == -1) { - f[u->left.idx] = j; - } else if (cidx] == -2) { + if (col[cidx] == 0) { + col[cidx] = j + 1; + } else if (col[cidx] == -1) { continue; } else { - f[cidx] = -2; - - if (gres == NULL) { - gres = taosArrayInit(4, sizeof(SFilterColRange)); + if (colRange == NULL) { + colRange = taosArrayInit(4, sizeof(SFilterColRange)); } - filterMergeSingleGroupUnits(info, g, f[cidx], j, gres); + removedNum += 2; + + filterProcessUnitsInOneGroup(info, g, col[cidx] - 1, j, colRange, &removedNum); + + col[cidx] = -1; + if (g->unitNum == 0) { break; } else { @@ -832,44 +1101,409 @@ int32_t filterMergeGroupUnits(SFilterInfo *info, SArray** res) { if (g->unitNum == 0) { if (gresUsed) { - taosArrayClear(gres); + taosArrayClear(colRange); + } + + continue; + } + + if (gresUsed) { + gctx = malloc(sizeof(*gctx)); + gctx->num = g->unitNum - removedNum + 1; + gctx->col = col; + gctx->colRange = colRange; + + col = NULL; + colRange = NULL; + + if (*res == NULL) { + *res = calloc(info->groupNum, sizeof(SFilterGroupCtx *)); + } + + (*res)[i] = gctx; + } + } + + tfree(col); + if (colRange) { + taosArrayDestroy(colRange); + } + + return TSDB_CODE_SUCCESS; +} + + + + +int32_t filterProcessGroupsSameColumn(SFilterInfo *info, uint16_t id1, uint16_t id2, SArray* res, uint16_t *removedNum, SFilterGroupCtx** unitRes) { + bool isnull = false, notnull = false; + int32_t num = 0; + SFilterColRange *cra = NULL; + SFilterGroup *g = NULL; + + SFilterUnit* u1 = FILTER_GROUP_UNIT(info, info->groups + id1, 0); + uint8_t optr = FILTER_UNIT_OPTR(u1); + uint16_t cidx = FILTER_UNIT_COL_IDX(u1); + uint16_t cidx2 = 0; + + int32_t type = FILTER_UNIT_DATA_TYPE(u1); + SFilterRMCtx *cur = filterInitMergeRange(type, 0); + + for (int32_t i = 0; i < info->groupNum; ++i) { + g = info->groups + i; + uint16_t unitNum = (unitRes && unitRes[i]) ? unitRes[i]->num : g->unitNum; + + if (unitNum == 0) { + continue; + } + + if (unitNum > 1) { + continue; + } + + if (unitRes && unitRes[i]) { + assert(taosArrayGetSize(unitRes[i]->colRange) == 1); + if (notnull) { + continue; + } + + cra = taosArrayGet(unitRes[i]->colRange, 0); + + if (cra->notNull) { + notnull = true; + CHK_JMP(CHK_OPTR()); + } + + cidx2 = cra->idx; + } else { + u1 = FILTER_GROUP_UNIT(info, g, 0); + int32_t type = FILTER_UNIT_DATA_TYPE(u1); + if (FILTER_NO_MERGE_DATA_TYPE(type)) { + continue; + } + + optr = FILTER_UNIT_OPTR(u1); + + cidx2 = FILTER_UNIT_COL_IDX(u1); + } + + if (cidx != cidx2) { + continue; + } + + g->unitNum = 0; + + if (unitRes && unitRes[i]) { + unitRes[i]->num = 0; + if (notnull) { + continue; + } + + filterAddMergeRange(cur, &cra->ra, TSDB_RELATION_OR); + if (MR_EMPTY_RES(cur)) { + notnull = true; + CHK_JMP(CHK_OPTR()); } continue; } - if (res == NULL) { - res = calloc(info->groupNum, sizeof(SArray *)); - res[i] = gres; - gres = NULL; + optr = FILTER_UNIT_OPTR(u1); + SET_OPTR(optr); + CHK_JMP(CHK_OPTR()); + + if (!FILTER_NO_MERGE_OPTR(optr) && notnull == false) { + filterAddUnitRange(info, u1, cur, TSDB_RELATION_OR); + if (MR_EMPTY_RES(cur)) { + notnull = true; + CHK_JMP(CHK_OPTR()); + } } } - free(f); - if (gres) { - taosArrayDestroy(gres); + SFilterColRange ncra; + SFilterRange *ra; + ncra.idx = cidx; + + ncra.isNull = isnull; + ncra.notNull = notnull; + + if (isnull) { + --removedNum; } + if (!notnull) { + filterGetMergeRangeNum(cur, &num); + if (num > 1) { + ra = calloc(num, sizeof(*ra)); + } else { + assert(num == 1); + ra = &ncra.ra; + } + + filterGetMergeRangeRes(cur, ra); + + SFilterRange *tra = ra; + for (int32_t i = 0; i < num; ++i) { + filterPostProcessRange(cur, tra, &ncra.notNull); + assert(ncra.notNull == false); + ++tra; + } + + if (num > 1) { + for (int32_t i = 0; i < num; ++i) { + FILTER_COPY_RA(&ncra.ra, ra); + + taosArrayPush(res, &ncra); + + ++ra; + } + } else { + FILTER_COPY_RA(&ncra.ra, ra); + + taosArrayPush(res, &ncra); + } + } else { + taosArrayPush(res, &ncra); + } + + filterFreeMergeRange(cur); + + return TSDB_CODE_SUCCESS; + +_err_return: + + FILTER_SET_FLAG(info->flags, FILTER_ALL); + + filterFreeMergeRange(cur); + + return TSDB_CODE_SUCCESS; + +} + + +int32_t filterProcessGroups(SFilterInfo *info, SFilterGroupCtx** unitRes, SFilterGroupCtx* groupRes) { + int32_t *col = NULL; + uint16_t cidx; + uint16_t removedNum = 0; + SArray *colRange = NULL; + + for (uint16_t i = 0; i < info->groupNum; ++i) { + SFilterGroup* g = info->groups + i; + uint16_t unitNum = (unitRes && unitRes[i]) ? unitRes[i]->num : g->unitNum; + + if (unitNum == 0) { + ++removedNum; + continue; + } + + if (unitNum > 1) { + continue; + } + + if (unitRes && unitRes[i]) { + assert(taosArrayGetSize(unitRes[i]->colRange) == 1); + SFilterColRange *ra = taosArrayGet(unitRes[i]->colRange, 0); + + cidx = ra->idx; + } else { + SFilterUnit* u = FILTER_GROUP_UNIT(info, g, 0); + int32_t type = FILTER_UNIT_DATA_TYPE(u); + if (FILTER_NO_MERGE_DATA_TYPE(type)) { + continue; + } + + cidx = FILTER_UNIT_COL_IDX(u); + } + + if (col == NULL) { + col = calloc(info->fields[FLD_TYPE_COLUMN].num, sizeof(int32_t)); + } + + if (col[cidx] == 0) { + col[cidx] = i + 1; + continue; + } else if (col[cidx] == -1) { + continue; + } else { + if (colRange == NULL) { + colRange = taosArrayInit(4, sizeof(SFilterColRange)); + } + + removedNum += 2; + filterProcessGroupsSameColumn(info, col[cidx] - 1, i, colRange, &removedNum, unitRes); + + CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL)); + + col[cidx] = -1; + } + } + + uint16_t num = info->groupNum - removedNum; + assert(num >= 0); + + if (colRange) { + num += taosArrayGetSize(colRange); + } + + if (num == 0) { + FILTER_SET_FLAG(info->flags, FILTER_NONE); + goto _err_return; + } + + if (colRange || removedNum > 0) { + groupRes->num = num; + groupRes->col = col; + groupRes->colRange = colRange; + } else { + tfree(col); + } + + return TSDB_CODE_SUCCESS; + +_err_return: + tfree(col); + if (colRange) { + taosArrayDestroy(colRange); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t filterGenerateGroupFromArray(SFilterInfo *info, SArray* group) { + size_t groupSize = taosArrayGetSize(group); + + info->groupNum = (uint16_t)groupSize; + + if (info->groupNum > 0) { + info->groups = calloc(info->groupNum, sizeof(*info->groups)); + } + + for (size_t i = 0; i < groupSize; ++i) { + SFilterGroup *pg = taosArrayGet(group, i); + pg->unitFlags = calloc(pg->unitNum, sizeof(*pg->unitFlags)); + info->groups[i] = *pg; + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx* gctx, SFilterGroupCtx** uctx) { + if (gctx->num == 0) { + return TSDB_CODE_SUCCESS; + } + + SFilterInfo oinfo = *info; + uint16_t gNum = 0; + SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup)); + + memset(info, 0, sizeof(*info)); + + filterInitUnitsFields(info); + + for (uint16_t i = 0; i < oinfo.groupNum; ++i) { + SFilterGroup* g = oinfo.groups + i; + SFilterGroup ng = {0}; + uint16_t unitNum = (uctx && uctx[i]) ? uctx[i]->num : g->unitNum; + + if (unitNum == 0) { + continue; + } + + ++gNum; + + if ((uctx == NULL) || (uctx[i] == NULL)) { + for (uint16_t n = 0; n < g->unitNum; ++n) { + SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n); + filterAddUnitFromUnit(info, &oinfo, u); + filterAddUnitToGroup(&ng, info->unitNum - 1); + } + + taosArrayPush(group, &ng); + + continue; + } + + SFilterGroupCtx* ctx = uctx[i]; + for (uint16_t n = 0; n < g->unitNum; ++n) { + SFilterUnit* u = FILTER_GROUP_UNIT(&oinfo, g, n); + int32_t type = FILTER_UNIT_DATA_TYPE(u); + if (FILTER_NO_MERGE_DATA_TYPE(type)) { + filterAddUnitFromUnit(info, &oinfo, u); + filterAddUnitToGroup(&ng, info->unitNum - 1); + continue; + } + + uint16_t cidx = FILTER_UNIT_COL_IDX(u); + + assert(ctx->col[cidx] > 0 || ctx->col[cidx] == -1); + + if (ctx->col[cidx] != -1) { + filterAddUnitFromUnit(info, &oinfo, u); + filterAddUnitToGroup(&ng, info->unitNum - 1); + } + } + + if (ctx->colRange && taosArrayGetSize(ctx->colRange) > 0) { + int32_t size = (int32_t)taosArrayGetSize(ctx->colRange); + for (int32_t i = 0; i < size; ++i) { + SFilterColRange *cra = taosArrayGet(ctx->colRange, i); + filterAddGroupUnitFromRange(info, &oinfo, cra, &ng, TSDB_RELATION_AND, NULL); + } + } + + taosArrayPush(group, &ng); + } + + if (gctx->colRange && taosArrayGetSize(gctx->colRange) > 0) { + int32_t size = (int32_t)taosArrayGetSize(gctx->colRange); + for (int32_t i = 0; i < size; ++i) { + SFilterColRange *cra = taosArrayGet(gctx->colRange, i); + filterAddGroupUnitFromRange(info, &oinfo, cra, NULL, TSDB_RELATION_OR, group); + } + } + + filterGenerateGroupFromArray(info, group); + + taosArrayDestroy(group); + return TSDB_CODE_SUCCESS; } int32_t filterPreprocess(SFilterInfo *info) { - SArray* res = NULL; + SFilterGroupCtx** unitsRes = NULL; + SFilterGroupCtx groupRes = {0}; - filterMergeGroupUnits(info, &res); + filterProcessUnits(info, &unitsRes); + + filterProcessGroups(info, unitsRes, &groupRes); + + if (FILTER_GET_FLAG(info->flags, FILTER_ALL)) { + qInfo("Final - FilterInfo: [ALL]"); + return TSDB_CODE_SUCCESS; + } + + if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) { + qInfo("Final - FilterInfo: [NONE]"); + return TSDB_CODE_SUCCESS; + } + //TODO GET COLUMN RANGE + filterRewrite(info, &groupRes, unitsRes); + + return TSDB_CODE_SUCCESS; } -#endif int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) { CHK_LRET(info == NULL, TSDB_CODE_QRY_APP_ERROR, "info NULL"); - CHK_LRET(info->fields[F_FIELD_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds"); + CHK_LRET(info->fields[FLD_TYPE_COLUMN].num <= 0, TSDB_CODE_QRY_APP_ERROR, "no column fileds"); - for (uint16_t i = 0; i < info->fields[F_FIELD_COLUMN].num; ++i) { - SFilterField* fi = &info->fields[F_FIELD_COLUMN].fields[i]; + for (uint16_t i = 0; i < info->fields[FLD_TYPE_COLUMN].num; ++i) { + SFilterField* fi = &info->fields[FLD_TYPE_COLUMN].fields[i]; SSchema* sch = fi->desc; if (sch->colId == colId) { fi->data = data; @@ -884,6 +1518,10 @@ int32_t filterSetColFieldData(SFilterInfo *info, int16_t colId, void *data) { bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) { bool all = true; + if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) { + return false; + } + for (int32_t i = 0; i < numOfRows; ++i) { FILTER_UNIT_CLR_F(info); @@ -901,10 +1539,8 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) { ures = FILTER_UNIT_GET_R(info, uidx); } else { SFilterUnit *unit = &info->units[uidx]; - SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit); - SFilterField *right = FILTER_UNIT_RIGHT_FIELD(info, unit); - if (isNull(FILTER_GET_COL_FIELD_DATA(left, i), FILTER_GET_COL_FIELD_TYPE(left))) { + if (isNull(FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_DATA_TYPE(unit))) { ures = unit->compare.optr == TSDB_RELATION_ISNULL ? true : false; } else { if (unit->compare.optr == TSDB_RELATION_NOTNULL) { @@ -912,7 +1548,7 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) { } else if (unit->compare.optr == TSDB_RELATION_ISNULL) { ures = false; } else { - ures = filterDoCompare(unit, FILTER_GET_COL_FIELD_DATA(left, i), FILTER_GET_VAL_FIELD_DATA(right)); + ures = filterDoCompare(unit, FILTER_UNIT_COL_DATA(info, unit, i), FILTER_UNIT_VAL_DATA(info, unit)); } } @@ -941,8 +1577,7 @@ bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t* p) { } - -int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) { +int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options) { int32_t code = TSDB_CODE_SUCCESS; SFilterInfo *info = NULL; @@ -954,42 +1589,33 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) { info = *pinfo; - SArray* group = taosArrayInit(4, sizeof(SFilterGroup)); - - info->unitSize = FILTER_DEFAULT_UNIT_SIZE; - info->units = calloc(info->unitSize, sizeof(SFilterUnit)); + info->flags = options; - info->fields[F_FIELD_COLUMN].num = 0; - info->fields[F_FIELD_COLUMN].size = FILTER_DEFAULT_FIELD_SIZE; - info->fields[F_FIELD_COLUMN].fields = calloc(info->fields[F_FIELD_COLUMN].size, COL_FIELD_SIZE); - info->fields[F_FIELD_VALUE].num = 0; - info->fields[F_FIELD_VALUE].size = FILTER_DEFAULT_FIELD_SIZE; - info->fields[F_FIELD_VALUE].fields = calloc(info->fields[F_FIELD_VALUE].size, sizeof(SFilterField)); + SArray* group = taosArrayInit(FILTER_DEFAULT_GROUP_SIZE, sizeof(SFilterGroup)); + + filterInitUnitsFields(info); code = filterTreeToGroup(tree, info, group); ERR_JRET(code); - size_t groupSize = taosArrayGetSize(group); - - info->groupNum = (uint16_t)groupSize; - - if (info->groupNum > 0) { - info->groups = calloc(info->groupNum, sizeof(*info->groups)); - } - - for (size_t i = 0; i < groupSize; ++i) { - SFilterGroup *pg = taosArrayGet(group, i); - pg->unitFlags = calloc(pg->unitNum, sizeof(*pg->unitFlags)); - info->groups[i] = *pg; - } + filterGenerateGroupFromArray(info, group); ERR_JRET(filterInitValFieldData(info)); - filterDumpInfoToString(info, "Before preprocess"); + if (!FILTER_GET_FLAG(info->flags, FILTER_NO_REWRITE)) { + filterDumpInfoToString(info, "Before preprocess"); - //ERR_JRET(filterPreprocess(info)); + ERR_JRET(filterPreprocess(info)); + + CHK_JMP(FILTER_GET_FLAG(info->flags, FILTER_ALL)); + if (FILTER_GET_FLAG(info->flags, FILTER_NONE)) { + taosArrayDestroy(group); + return code; + } + } + ERR_JRET(filterInitUnitFunc(info)); info->unitRes = malloc(info->unitNum * sizeof(*info->unitRes)); @@ -997,10 +1623,19 @@ int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo) { filterDumpInfoToString(info, "Final"); + taosArrayDestroy(group); + + return code; + _err_return: + qInfo("No filter, code:%d", code); taosArrayDestroy(group); + filterFreeInfo(*pinfo); + + *pinfo = NULL; + return code; } @@ -1043,6 +1678,11 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) { break; } + if (FILTER_GET_FLAG(cur->status, MR_ALL)) { + FILTER_SET_FLAG(prev->status, MR_ALL); + break; + } + if (group->unitNum > 1) { filterAddMergeRangeCtx(prev, cur, TSDB_RELATION_OR); filterResetMergeRangeCtx(cur); @@ -1050,13 +1690,20 @@ int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) { } if (code == TSDB_CODE_SUCCESS) { - filterGetMergeRangeNum(prev, &num); - if (num != 1) { - qError("only one time range accepted, num:%d", num); - ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION); - } + if (FILTER_GET_FLAG(prev->status, MR_ALL)) { + *win = TSWINDOW_INITIALIZER; + } else { + filterGetMergeRangeNum(prev, &num); + if (num != 1) { + qError("only one time range accepted, num:%d", num); + ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION); + } - filterGetMergeRangeRes(prev, &win->skey, &win->ekey); + SFilterRange ra; + filterGetMergeRangeRes(prev, &ra); + win->skey = ra.s; + win->ekey = ra.e; + } } _err_return: diff --git a/src/query/tests/rangeMergeTest.cpp b/src/query/tests/rangeMergeTest.cpp index 202394e94366e034f5d3859b28cea19c61b0d3f6..1f4626e36469a06b64852d128cc3f93fdc1c74a2 100644 --- a/src/query/tests/rangeMergeTest.cpp +++ b/src/query/tests/rangeMergeTest.cpp @@ -20,7 +20,7 @@ namespace { void intDataTest() { printf("running %s\n", __FUNCTION__); int32_t asize = 0; - SFilterRange ra = {0}; + SFilterRange ra[10] = {0}; int64_t *s =NULL; int64_t *e =NULL; int64_t s0[3] = {-100, 1, 3}; @@ -34,179 +34,191 @@ void intDataTest() { int64_t s4[2] = {10, 0}; int64_t e4[2] = {20, 5}; int64_t s5[3] = {0, 6 ,7}; - int64_t e5[5] = {4, 10,20}; + int64_t e5[3] = {4, 10,20}; int64_t rs[10]; int64_t re[10]; int32_t num = 0; + void *h = NULL; s = s0; e = e0; - asize = sizeof(s0)/sizeof(s[0]); - void *h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); + asize = sizeof(s0)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); + h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + ra[0].s = s[i]; + ra[0].e = e[i]; + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 3); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], -100); - ASSERT_EQ(re[0], 0); - ASSERT_EQ(rs[1], 1); - ASSERT_EQ(re[1], 2); - ASSERT_EQ(rs[2], 3); - ASSERT_EQ(re[2], 4); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, -100); + ASSERT_EQ(ra[0].e, 0); + ASSERT_EQ(ra[1].s, 1); + ASSERT_EQ(ra[1].e, 2); + ASSERT_EQ(ra[2].s, 3); + ASSERT_EQ(ra[2].e, 4); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, MR_OPT_TS); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], -100); - ASSERT_EQ(re[0], 4); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, -100); + ASSERT_EQ(ra[0].e, 4); filterFreeMergeRange(h); s = s1; e = e1; asize = sizeof(s1)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 3); - ASSERT_EQ(re[0], 4); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 3); + ASSERT_EQ(ra[0].e, 4); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], INT64_MIN); - ASSERT_EQ(re[0], 100); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, INT64_MIN); + ASSERT_EQ(ra[0].e, 100); filterFreeMergeRange(h); + s = s2; e = e2; asize = sizeof(s2)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 1); - ASSERT_EQ(re[0], 120); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 1); + ASSERT_EQ(ra[0].e, 120); filterFreeMergeRange(h); - + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, i % 2 ? TSDB_RELATION_OR : TSDB_RELATION_AND); + filterAddMergeRange(h, ra, i % 2 ? TSDB_RELATION_OR : TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, i % 2 ? TSDB_RELATION_AND : TSDB_RELATION_OR); + filterAddMergeRange(h, ra, i % 2 ? TSDB_RELATION_AND : TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 70); - ASSERT_EQ(re[0], 120); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 70); + ASSERT_EQ(ra[0].e, 120); filterFreeMergeRange(h); s = s3; e = e3; asize = sizeof(s3)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 1); - ASSERT_EQ(re[0], 100); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 1); + ASSERT_EQ(ra[0].e, 100); filterFreeMergeRange(h); @@ -215,82 +227,131 @@ void intDataTest() { s = s4; e = e4; asize = sizeof(s4)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 2); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 0); - ASSERT_EQ(re[0], 5); - ASSERT_EQ(rs[1], 10); - ASSERT_EQ(re[1], 20); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 0); + ASSERT_EQ(ra[0].e, 5); + ASSERT_EQ(ra[1].s, 10); + ASSERT_EQ(ra[1].e, 20); filterFreeMergeRange(h); s = s5; e = e5; asize = sizeof(s5)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_AND); + filterAddMergeRange(h, ra, TSDB_RELATION_AND); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 0); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, TSDB_RELATION_OR); + filterAddMergeRange(h, ra, TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 2); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 0); - ASSERT_EQ(re[0], 4); - ASSERT_EQ(rs[1], 6); - ASSERT_EQ(re[1], 20); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 0); + ASSERT_EQ(ra[0].e, 4); + ASSERT_EQ(ra[1].s, 6); + ASSERT_EQ(ra[1].e, 20); filterFreeMergeRange(h); + memset(ra, 0, sizeof(ra)); h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); for (int32_t i = 0; i < asize; ++i) { - ra.s = s[i]; - ra.e = e[i]; + ra[0].s = s[i]; + ra[0].e = e[i]; - filterAddMergeRange(h, &ra, (i == (asize -1)) ? TSDB_RELATION_AND : TSDB_RELATION_OR); + filterAddMergeRange(h, ra, (i == (asize -1)) ? TSDB_RELATION_AND : TSDB_RELATION_OR); } filterGetMergeRangeNum(h, &num); ASSERT_EQ(num, 1); - filterGetMergeRangeRes(h, rs, re); - ASSERT_EQ(rs[0], 7); - ASSERT_EQ(re[0], 10); + filterGetMergeRangeRes(h, ra); + ASSERT_EQ(ra[0].s, 7); + ASSERT_EQ(ra[0].e, 10); + filterFreeMergeRange(h); + + + + int64_t s6[2] = {0, 4}; + int64_t e6[2] = {4, 6}; + s = s6; + e = e6; + asize = sizeof(s6)/sizeof(s[0]); + memset(ra, 0, sizeof(ra)); + h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); + for (int32_t i = 0; i < asize; ++i) { + ra[0].eflag = 1; + ra[1].sflag = 4; + + ra[i].s = s[i]; + ra[i].e = e[i]; + + filterAddMergeRange(h, ra + i, TSDB_RELATION_AND); + } + filterGetMergeRangeNum(h, &num); + ASSERT_EQ(num, 0); filterFreeMergeRange(h); + + memset(ra, 0, sizeof(ra)); + h = filterInitMergeRange(TSDB_DATA_TYPE_BIGINT, 0); + for (int32_t i = 0; i < asize; ++i) { + ra[0].eflag = 1; + ra[1].sflag = 1; + + ra[i].s = s[i]; + ra[i].e = e[i]; + + filterAddMergeRange(h, ra + i, TSDB_RELATION_OR); + } + filterGetMergeRangeNum(h, &num); + ASSERT_EQ(num, 2); + ASSERT_EQ(ra[0].s, 0); + ASSERT_EQ(ra[0].e, 4); + ASSERT_EQ(ra[0].eflag, 1); + ASSERT_EQ(ra[1].s, 4); + ASSERT_EQ(ra[1].e, 6); + ASSERT_EQ(ra[1].sflag, 1); + filterFreeMergeRange(h); + } diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index c37426069cf64bb35ac5e1100d49e8103c851625..79eda33e74bb66038d97c02cef0060c87022083e 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -42,7 +42,7 @@ typedef struct SHashNode { #define GET_HASH_NODE_KEY(_n) ((char*)(_n) + sizeof(SHashNode) + (_n)->dataLen) #define GET_HASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SHashNode)) -#define GET_HASH_PNODE(_n) ((char*)(_n) - sizeof(SHashNode)); +#define GET_HASH_PNODE(_n) ((SHashNode *)((char*)(_n) - sizeof(SHashNode))) typedef enum SHashLockTypeE { HASH_NO_LOCK = 0, @@ -161,6 +161,8 @@ void *taosHashIterate(SHashObj *pHashObj, void *p); void taosHashCancelIterate(SHashObj *pHashObj, void *p); +void *taosHashGetDataKey(SHashObj *pHashObj, void *data); + #ifdef __cplusplus } #endif diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 5b3218382fff38382a536c8ece077fd2f350adf2..a3501d47dab778250f11d89419795dc2729ca04d 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -695,6 +695,10 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); } +FORCE_INLINE void *taosHashGetDataKey(SHashObj *pHashObj, void *data) { + return GET_HASH_NODE_KEY(GET_HASH_PNODE(data)); +} + // release the pNode, return next pNode, and lock the current entry static void *taosHashReleaseNode(SHashObj *pHashObj, void *p, int *slot) { diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c index 4cc8eeece6ae18de2ee3f2f56a8cb3a327dedf3d..6d4d6dc0b8a8f4c3924075784159f81833a54bb4 100644 --- a/src/util/src/tcompare.c +++ b/src/util/src/tcompare.c @@ -64,21 +64,21 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) { } int32_t compareUint32Val(const void *pLeft, const void *pRight) { - int32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight); + uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } int32_t compareUint64Val(const void *pLeft, const void *pRight) { - int64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight); + uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0; } int32_t compareUint16Val(const void *pLeft, const void *pRight) { - int16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight); + uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight); if (left > right) return 1; if (left < right) return -1; return 0;