Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8d413f09
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
8d413f09
编写于
2月 18, 2022
作者:
D
dapan1121
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
6d04b954
变更
6
展开全部
显示空白变更内容
内联
并排
Showing
6 changed file
with
737 addition
and
751 deletion
+737
-751
include/libs/scalar/filter.h
include/libs/scalar/filter.h
+7
-16
source/libs/scalar/inc/filterInt.h
source/libs/scalar/inc/filterInt.h
+67
-43
source/libs/scalar/inc/tfilter.h
source/libs/scalar/inc/tfilter.h
+0
-347
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+657
-344
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+5
-0
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+1
-1
未找到文件。
source/libs/scalar/inc/tsclfunc
.h
→
include/libs/scalar/filter
.h
浏览文件 @
8d413f09
...
@@ -12,34 +12,25 @@
...
@@ -12,34 +12,25 @@
* You should have received a copy of the GNU Affero General Public License
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef TDENGINE_
TSCALARFUNCTION
_H
#ifndef TDENGINE_
SCALAR
_H
#define TDENGINE_
TSCALARFUNCTION
_H
#define TDENGINE_
SCALAR
_H
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
#endif
#endif
#include "function.h"
#include "function.h"
#include "
scalar
.h"
#include "
nodes
.h"
typedef
struct
SScalarFunctionSupport
{
typedef
struct
SFilterInfo
SFilterInfo
;
struct
SExprInfo
*
pExprInfo
;
int32_t
numOfCols
;
SColumnInfo
*
colList
;
void
*
exprList
;
// client side used
int32_t
offset
;
char
**
data
;
}
SScalarFunctionSupport
;
extern
struct
SScalarFunctionInfo
scalarFunc
[
8
];
int32_t
evaluateExprNodeTree
(
tExprNode
*
pExprs
,
int32_t
numOfRows
,
SScalarParam
*
pOutput
,
void
*
param
,
char
*
(
*
getSourceDataBlock
)(
void
*
,
const
char
*
,
int32_t
));
int32_t
scalarCalculateConstants
(
SNode
*
pNode
,
SNode
**
pRes
);
int32_t
scalarCalculate
(
SNode
*
pNode
,
SSDataBlock
*
pSrc
,
SScalarParam
*
pDst
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
#endif
#endif
#endif // TDENGINE_
TSCALARFUNCTION
_H
#endif // TDENGINE_
SCALAR
_H
source/libs/scalar/inc/filter.h
→
source/libs/scalar/inc/filter
Int
.h
浏览文件 @
8d413f09
...
@@ -13,8 +13,8 @@
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#ifndef TDENGINE_
QFILTER
_H
#ifndef TDENGINE_
FILTER_INT
_H
#define TDENGINE_
QFILTER
_H
#define TDENGINE_
FILTER_INT
_H
#ifdef __cplusplus
#ifdef __cplusplus
extern
"C"
{
extern
"C"
{
...
@@ -85,6 +85,12 @@ enum {
...
@@ -85,6 +85,12 @@ enum {
RANGE_TYPE_MR_CTX
=
3
,
RANGE_TYPE_MR_CTX
=
3
,
};
};
enum
{
FI_ACTION_NO_NEED
=
1
,
FI_ACTION_CONTINUE
,
FI_ACTION_STOP
,
};
typedef
struct
OptrStr
{
typedef
struct
OptrStr
{
uint16_t
optr
;
uint16_t
optr
;
char
*
str
;
char
*
str
;
...
@@ -97,17 +103,11 @@ typedef struct SFilterRange {
...
@@ -97,17 +103,11 @@ typedef struct SFilterRange {
char
eflag
;
char
eflag
;
}
SFilterRange
;
}
SFilterRange
;
typedef
struct
SFilterColRange
{
uint16_t
idx
;
//column field idx
bool
isNull
;
bool
notNull
;
bool
isRange
;
SFilterRange
ra
;
}
SFilterColRange
;
typedef
bool
(
*
rangeCompFunc
)
(
const
void
*
,
const
void
*
,
const
void
*
,
const
void
*
,
__compar_fn_t
);
typedef
bool
(
*
rangeCompFunc
)
(
const
void
*
,
const
void
*
,
const
void
*
,
const
void
*
,
__compar_fn_t
);
typedef
int32_t
(
*
filter_desc_compare_func
)(
const
void
*
,
const
void
*
);
typedef
int32_t
(
*
filter_desc_compare_func
)(
const
void
*
,
const
void
*
);
typedef
bool
(
*
filter_exec_func
)(
void
*
,
int32_t
,
int8_t
**
,
SColumnDataAgg
*
,
int16_t
);
typedef
bool
(
*
filter_exec_func
)(
void
*
,
int32_t
,
int8_t
**
,
SColumnDataAgg
*
,
int16_t
);
typedef
int32_t
(
*
filer_get_col_from_id
)(
void
*
,
int32_t
,
void
**
);
typedef
int32_t
(
*
filer_get_col_from_name
)(
void
*
,
int32_t
,
char
*
,
void
**
);
typedef
struct
SFilterRangeCompare
{
typedef
struct
SFilterRangeCompare
{
int64_t
s
;
int64_t
s
;
...
@@ -155,37 +155,39 @@ typedef struct SFilterField {
...
@@ -155,37 +155,39 @@ typedef struct SFilterField {
}
SFilterField
;
}
SFilterField
;
typedef
struct
SFilterFields
{
typedef
struct
SFilterFields
{
uint
16
_t
size
;
uint
32
_t
size
;
uint
16
_t
num
;
uint
32
_t
num
;
SFilterField
*
fields
;
SFilterField
*
fields
;
}
SFilterFields
;
}
SFilterFields
;
typedef
struct
SFilterFieldId
{
typedef
struct
SFilterFieldId
{
uint16_t
type
;
uint16_t
type
;
uint
16
_t
idx
;
uint
32
_t
idx
;
}
SFilterFieldId
;
}
SFilterFieldId
;
typedef
struct
SFilterGroup
{
typedef
struct
SFilterGroup
{
uint
16
_t
unitSize
;
uint
32
_t
unitSize
;
uint
16
_t
unitNum
;
uint
32
_t
unitNum
;
uint
16
_t
*
unitIdxs
;
uint
32
_t
*
unitIdxs
;
uint8_t
*
unitFlags
;
// !unit result
uint8_t
*
unitFlags
;
// !unit result
}
SFilterGroup
;
}
SFilterGroup
;
typedef
struct
SFilterColInfo
{
typedef
struct
SFilterColInfo
{
uint8_t
type
;
uint8_t
type
;
int32_t
dataType
;
int32_t
dataType
;
uint8_t
optr
;
// for equal operation in the relation of RELATION_IN
int64_t
value
;
// for equal operation in the relation of RELATION_IN
void
*
info
;
void
*
info
;
}
SFilterColInfo
;
}
SFilterColInfo
;
typedef
struct
SFilterGroupCtx
{
typedef
struct
SFilterGroupCtx
{
uint
16
_t
colNum
;
uint
32
_t
colNum
;
uint
16
_t
*
colIdx
;
uint
32
_t
*
colIdx
;
SFilterColInfo
*
colInfo
;
SFilterColInfo
*
colInfo
;
}
SFilterGroupCtx
;
}
SFilterGroupCtx
;
typedef
struct
SFilterColCtx
{
typedef
struct
SFilterColCtx
{
uint
16
_t
colIdx
;
uint
32
_t
colIdx
;
void
*
ctx
;
void
*
ctx
;
}
SFilterColCtx
;
}
SFilterColCtx
;
...
@@ -219,16 +221,31 @@ typedef struct SFilterPCtx {
...
@@ -219,16 +221,31 @@ typedef struct SFilterPCtx {
SHashObj
*
unitHash
;
SHashObj
*
unitHash
;
}
SFilterPCtx
;
}
SFilterPCtx
;
typedef
struct
SFltTreeStat
{
int32_t
code
;
}
SFltTreeStat
;
typedef
struct
SFltScalarCtx
{
SNode
*
node
;
}
SFltScalarCtx
;
typedef
struct
SFltBuildGroupCtx
{
SFilterInfo
*
info
;
SArray
*
group
;
int32_t
code
;
}
SFltBuildGroupCtx
;
typedef
struct
SFilterInfo
{
typedef
struct
SFilterInfo
{
bool
scalarMode
;
SFltScalarCtx
sclCtx
;
uint32_t
options
;
uint32_t
options
;
uint32_t
status
;
uint32_t
status
;
uint
16
_t
unitSize
;
uint
32
_t
unitSize
;
uint
16
_t
unitNum
;
uint
32
_t
unitNum
;
uint
16
_t
groupNum
;
uint
32
_t
groupNum
;
uint
16
_t
colRangeNum
;
uint
32
_t
colRangeNum
;
SFilterFields
fields
[
FLD_TYPE_MAX
];
SFilterFields
fields
[
FLD_TYPE_MAX
];
SFilterGroup
*
groups
;
SFilterGroup
*
groups
;
uint16_t
*
cgroups
;
SFilterUnit
*
units
;
SFilterUnit
*
units
;
SFilterComUnit
*
cunits
;
SFilterComUnit
*
cunits
;
uint8_t
*
unitRes
;
// result
uint8_t
*
unitRes
;
// result
...
@@ -236,16 +253,15 @@ typedef struct SFilterInfo {
...
@@ -236,16 +253,15 @@ typedef struct SFilterInfo {
SFilterRangeCtx
**
colRange
;
SFilterRangeCtx
**
colRange
;
filter_exec_func
func
;
filter_exec_func
func
;
uint8_t
blkFlag
;
uint8_t
blkFlag
;
uint
16
_t
blkGroupNum
;
uint
32
_t
blkGroupNum
;
uint
16
_t
*
blkUnits
;
uint
32
_t
*
blkUnits
;
int8_t
*
blkUnitRes
;
int8_t
*
blkUnitRes
;
void
*
pTable
;
SFilterPCtx
pctx
;
SFilterPCtx
pctx
;
}
SFilterInfo
;
}
SFilterInfo
;
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR || (t) == TSDB_DATA_TYPE_JSON)
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR)
#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR)
#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR)
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
...
@@ -261,7 +277,7 @@ typedef struct SFilterInfo {
...
@@ -261,7 +277,7 @@ typedef struct SFilterInfo {
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint
16_t *)(_t + 1) = idx1; *(uint16
_t *)(_t + 3) = idx2; } while (0)
#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint
32_t *)(_t + 1) = idx1; *(uint32
_t *)(_t + 3) = idx2; } while (0)
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RANGE_FLG_EXCLUDE) || FILTER_GET_FLAG(eflag,RANGE_FLG_EXCLUDE))))
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RANGE_FLG_EXCLUDE) || FILTER_GET_FLAG(eflag,RANGE_FLG_EXCLUDE))))
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
...
@@ -271,15 +287,20 @@ typedef struct SFilterInfo {
...
@@ -271,15 +287,20 @@ typedef struct SFilterInfo {
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
#define ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#define CHK_RETV(c) do { if (c) { return; } } while (0)
#define fltFatal(...) qFatal(__VA_ARGS__)
#define CHK_RET(c, r) do { if (c) { return r; } } while (0)
#define fltError(...) qError(__VA_ARGS__)
#define CHK_JMP(c) do { if (c) { goto _return; } } while (0)
#define fltWarn(...) qWarn(__VA_ARGS__)
#define CHK_LRETV(c,...) do { if (c) { qError(__VA_ARGS__); return; } } while (0)
#define fltInfo(...) qInfo(__VA_ARGS__)
#define CHK_LRET(c, r,...) do { if (c) { if (r) {qError(__VA_ARGS__); } else { qDebug(__VA_ARGS__); } return r; } } while (0)
#define fltDebug(...) qDebug(__VA_ARGS__)
#define fltTrace(...) qTrace(__VA_ARGS__)
#define FLT_CHK_JMP(c) do { if (c) { goto _return; } } while (0)
#define FLT_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define FLT_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define FLT_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
...
@@ -290,6 +311,7 @@ typedef struct SFilterInfo {
...
@@ -290,6 +311,7 @@ typedef struct SFilterInfo {
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_JSON_VAL_FIELD_DATA(fi) ((char *)(fi)->desc)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
...
@@ -302,6 +324,7 @@ typedef struct SFilterInfo {
...
@@ -302,6 +324,7 @@ typedef struct SFilterInfo {
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_JSON_VAL_DATA(i, u) FILTER_GET_JSON_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
...
@@ -316,7 +339,7 @@ typedef struct SFilterInfo {
...
@@ -316,7 +339,7 @@ typedef struct SFilterInfo {
#define FILTER_PUSH_VAR_HASH(colInfo, ha) do { (colInfo).type = RANGE_TYPE_VAR_HASH; (colInfo).info = ha;} while (0)
#define FILTER_PUSH_VAR_HASH(colInfo, ha) do { (colInfo).type = RANGE_TYPE_VAR_HASH; (colInfo).info = ha;} while (0)
#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0)
#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0)
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint
16_t) * n); memcpy(*(dst), src, sizeof(uint16
_t) * n);} while (0)
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint
32_t) * n); memcpy(*(dst), src, sizeof(uint32
_t) * n);} while (0)
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
...
@@ -325,9 +348,10 @@ typedef struct SFilterInfo {
...
@@ -325,9 +348,10 @@ typedef struct SFilterInfo {
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
#if 0
#if 0
extern int32_t filterInitFromTree(tExprNode* tree,
SFilterInfo
**pinfo, uint32_t options);
extern int32_t filterInitFromTree(tExprNode* tree,
void
**pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
extern int32_t filterSetColFieldData(SFilterInfo *info, void *param, filer_get_col_from_id fp);
extern int32_t filterSetJsonColFieldData(SFilterInfo *info, void *param, filer_get_col_from_name fp);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
...
@@ -344,4 +368,4 @@ extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr);
...
@@ -344,4 +368,4 @@ extern __compar_fn_t filterGetCompFunc(int32_t type, int32_t optr);
}
}
#endif
#endif
#endif // TDENGINE_
QFILTER
_H
#endif // TDENGINE_
FILTER_INT
_H
\ No newline at end of file
source/libs/scalar/inc/tfilter.h
已删除
100644 → 0
浏览文件 @
6d04b954
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_QFILTER_H
#define TDENGINE_QFILTER_H
#ifdef __cplusplus
extern
"C"
{
#endif
#include "thash.h"
#include "tname.h"
#include "common.h"
#include "scalar.h"
#include "querynodes.h"
#define FILTER_DEFAULT_GROUP_SIZE 4
#define FILTER_DEFAULT_UNIT_SIZE 4
#define FILTER_DEFAULT_FIELD_SIZE 4
#define FILTER_DEFAULT_VALUE_SIZE 4
#define FILTER_DEFAULT_GROUP_UNIT_SIZE 2
#define FILTER_DUMMY_EMPTY_OPTR 127
#define MAX_NUM_STR_SIZE 40
#define FILTER_RM_UNIT_MIN_ROWS 100
enum
{
FLD_TYPE_COLUMN
=
1
,
FLD_TYPE_VALUE
=
2
,
FLD_TYPE_MAX
=
3
,
FLD_DESC_NO_FREE
=
4
,
FLD_DATA_NO_FREE
=
8
,
FLD_DATA_IS_HASH
=
16
,
};
enum
{
MR_ST_START
=
1
,
MR_ST_FIN
=
2
,
MR_ST_ALL
=
4
,
MR_ST_EMPTY
=
8
,
};
enum
{
RANGE_FLG_EXCLUDE
=
1
,
RANGE_FLG_INCLUDE
=
2
,
RANGE_FLG_NULL
=
4
,
};
enum
{
FI_OPTION_NO_REWRITE
=
1
,
FI_OPTION_TIMESTAMP
=
2
,
FI_OPTION_NEED_UNIQE
=
4
,
};
enum
{
FI_STATUS_ALL
=
1
,
FI_STATUS_EMPTY
=
2
,
FI_STATUS_REWRITE
=
4
,
FI_STATUS_CLONED
=
8
,
};
enum
{
FI_STATUS_BLK_ALL
=
1
,
FI_STATUS_BLK_EMPTY
=
2
,
FI_STATUS_BLK_ACTIVE
=
4
,
};
enum
{
RANGE_TYPE_UNIT
=
1
,
RANGE_TYPE_VAR_HASH
=
2
,
RANGE_TYPE_MR_CTX
=
3
,
};
typedef
struct
OptrStr
{
uint16_t
optr
;
char
*
str
;
}
OptrStr
;
typedef
struct
SFilterRange
{
int64_t
s
;
int64_t
e
;
char
sflag
;
char
eflag
;
}
SFilterRange
;
typedef
struct
SFilterColRange
{
uint16_t
idx
;
//column field idx
bool
isNull
;
bool
notNull
;
bool
isRange
;
SFilterRange
ra
;
}
SFilterColRange
;
typedef
bool
(
*
rangeCompFunc
)
(
const
void
*
,
const
void
*
,
const
void
*
,
const
void
*
,
__compar_fn_t
);
typedef
int32_t
(
*
filter_desc_compare_func
)(
const
void
*
,
const
void
*
);
typedef
bool
(
*
filter_exec_func
)(
void
*
,
int32_t
,
int8_t
**
,
SColumnDataAgg
*
,
int16_t
);
typedef
struct
SFilterRangeCompare
{
int64_t
s
;
int64_t
e
;
rangeCompFunc
func
;
}
SFilterRangeCompare
;
typedef
struct
SFilterRangeNode
{
struct
SFilterRangeNode
*
prev
;
struct
SFilterRangeNode
*
next
;
union
{
SFilterRange
ra
;
SFilterRangeCompare
rc
;
};
}
SFilterRangeNode
;
typedef
struct
SFilterRangeCtx
{
int32_t
type
;
int32_t
options
;
int8_t
status
;
bool
isnull
;
bool
notnull
;
bool
isrange
;
int16_t
colId
;
__compar_fn_t
pCompareFunc
;
SFilterRangeNode
*
rf
;
//freed
SFilterRangeNode
*
rs
;
}
SFilterRangeCtx
;
typedef
struct
SFilterVarCtx
{
int32_t
type
;
int32_t
options
;
int8_t
status
;
bool
isnull
;
bool
notnull
;
bool
isrange
;
SHashObj
*
wild
;
SHashObj
*
value
;
}
SFilterVarCtx
;
typedef
struct
SFilterField
{
uint16_t
flag
;
void
*
desc
;
void
*
data
;
}
SFilterField
;
typedef
struct
SFilterFields
{
uint16_t
size
;
uint16_t
num
;
SFilterField
*
fields
;
}
SFilterFields
;
typedef
struct
SFilterFieldId
{
uint16_t
type
;
uint16_t
idx
;
}
SFilterFieldId
;
typedef
struct
SFilterGroup
{
uint16_t
unitSize
;
uint16_t
unitNum
;
uint16_t
*
unitIdxs
;
uint8_t
*
unitFlags
;
// !unit result
}
SFilterGroup
;
typedef
struct
SFilterColInfo
{
uint8_t
type
;
int32_t
dataType
;
void
*
info
;
}
SFilterColInfo
;
typedef
struct
SFilterGroupCtx
{
uint16_t
colNum
;
uint16_t
*
colIdx
;
SFilterColInfo
*
colInfo
;
}
SFilterGroupCtx
;
typedef
struct
SFilterColCtx
{
uint16_t
colIdx
;
void
*
ctx
;
}
SFilterColCtx
;
typedef
struct
SFilterCompare
{
uint8_t
type
;
uint8_t
optr
;
uint8_t
optr2
;
}
SFilterCompare
;
typedef
struct
SFilterUnit
{
SFilterCompare
compare
;
SFilterFieldId
left
;
SFilterFieldId
right
;
SFilterFieldId
right2
;
}
SFilterUnit
;
typedef
struct
SFilterComUnit
{
void
*
colData
;
void
*
valData
;
void
*
valData2
;
uint16_t
colId
;
uint16_t
dataSize
;
uint8_t
dataType
;
uint8_t
optr
;
int8_t
func
;
int8_t
rfunc
;
}
SFilterComUnit
;
typedef
struct
SFilterPCtx
{
SHashObj
*
valHash
;
SHashObj
*
unitHash
;
}
SFilterPCtx
;
typedef
struct
SFilterInfo
{
uint32_t
options
;
uint32_t
status
;
uint16_t
unitSize
;
uint16_t
unitNum
;
uint16_t
groupNum
;
uint16_t
colRangeNum
;
SFilterFields
fields
[
FLD_TYPE_MAX
];
SFilterGroup
*
groups
;
uint16_t
*
cgroups
;
SFilterUnit
*
units
;
SFilterComUnit
*
cunits
;
uint8_t
*
unitRes
;
// result
uint8_t
*
unitFlags
;
// got result
SFilterRangeCtx
**
colRange
;
filter_exec_func
func
;
uint8_t
blkFlag
;
uint16_t
blkGroupNum
;
uint16_t
*
blkUnits
;
int8_t
*
blkUnitRes
;
SFilterPCtx
pctx
;
}
SFilterInfo
;
#define COL_FIELD_SIZE (sizeof(SFilterField) + 2 * sizeof(int64_t))
#define FILTER_NO_MERGE_DATA_TYPE(t) ((t) == TSDB_DATA_TYPE_BINARY || (t) == TSDB_DATA_TYPE_NCHAR)
#define FILTER_NO_MERGE_OPTR(o) ((o) == TSDB_RELATION_ISNULL || (o) == TSDB_RELATION_NOTNULL || (o) == FILTER_DUMMY_EMPTY_OPTR)
#define MR_EMPTY_RES(ctx) (ctx->rs == NULL)
#define SET_AND_OPTR(ctx, o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { if (!(ctx)->isrange) { (ctx)->notnull = true; } } else if (o != FILTER_DUMMY_EMPTY_OPTR) { (ctx)->isrange = true; (ctx)->notnull = false; } } while (0)
#define SET_OR_OPTR(ctx,o) do {if (o == TSDB_RELATION_ISNULL) { (ctx)->isnull = true; } else if (o == TSDB_RELATION_NOTNULL) { (ctx)->notnull = true; (ctx)->isrange = false; } else if (o != FILTER_DUMMY_EMPTY_OPTR) { if (!(ctx)->notnull) { (ctx)->isrange = true; } } } while (0)
#define CHK_OR_OPTR(ctx) ((ctx)->isnull == true && (ctx)->notnull == true)
#define CHK_AND_OPTR(ctx) ((ctx)->isnull == true && (((ctx)->notnull == true) || ((ctx)->isrange == true)))
#define FILTER_GET_FLAG(st, f) (st & f)
#define FILTER_SET_FLAG(st, f) st |= (f)
#define FILTER_CLR_FLAG(st, f) st &= (~f)
#define SIMPLE_COPY_VALUES(dst, src) *((int64_t *)dst) = *((int64_t *)src)
#define FILTER_PACKAGE_UNIT_HASH_KEY(v, optr, idx1, idx2) do { char *_t = (char *)v; _t[0] = optr; *(uint16_t *)(_t + 1) = idx1; *(uint16_t *)(_t + 3) = idx2; } while (0)
#define FILTER_GREATER(cr,sflag,eflag) ((cr > 0) || ((cr == 0) && (FILTER_GET_FLAG(sflag,RANGE_FLG_EXCLUDE) || FILTER_GET_FLAG(eflag,RANGE_FLG_EXCLUDE))))
#define FILTER_COPY_RA(dst, src) do { (dst)->sflag = (src)->sflag; (dst)->eflag = (src)->eflag; (dst)->s = (src)->s; (dst)->e = (src)->e; } while (0)
#define RESET_RANGE(ctx, r) do { (r)->next = (ctx)->rf; (ctx)->rf = r; } while (0)
#define FREE_RANGE(ctx, r) do { if ((r)->prev) { (r)->prev->next = (r)->next; } else { (ctx)->rs = (r)->next;} if ((r)->next) { (r)->next->prev = (r)->prev; } RESET_RANGE(ctx, r); } while (0)
#define FREE_FROM_RANGE(ctx, r) do { SFilterRangeNode *_r = r; if ((_r)->prev) { (_r)->prev->next = NULL; } else { (ctx)->rs = NULL;} while (_r) {SFilterRangeNode *n = (_r)->next; RESET_RANGE(ctx, _r); _r = n; } } while (0)
#define INSERT_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r)->prev; if ((r)->prev) { (r)->prev->next = n; } else { (ctx)->rs = n; } (r)->prev = n; n->next = r; } while (0)
#define APPEND_RANGE(ctx, r, ra) do { SFilterRangeNode *n = filterNewRange(ctx, ra); n->prev = (r); if (r) { (r)->next = n; } else { (ctx)->rs = n; } } while (0)
#define ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { return _code; } } while (0)
#define ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); return _code; } } while (0)
#define ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { goto _return; } } while (0)
#define CHK_RETV(c) do { if (c) { return; } } while (0)
#define CHK_RET(c, r) do { if (c) { return r; } } while (0)
#define CHK_JMP(c) do { if (c) { goto _return; } } while (0)
#define CHK_LRETV(c,...) do { if (c) { qError(__VA_ARGS__); return; } } while (0)
#define CHK_LRET(c, r,...) do { if (c) { if (r) {qError(__VA_ARGS__); } else { qDebug(__VA_ARGS__); } return r; } } while (0)
#define FILTER_GET_FIELD(i, id) (&((i)->fields[(id).type].fields[(id).idx]))
#define FILTER_GET_COL_FIELD(i, idx) (&((i)->fields[FLD_TYPE_COLUMN].fields[idx]))
#define FILTER_GET_COL_FIELD_TYPE(fi) (((SSchema *)((fi)->desc))->type)
#define FILTER_GET_COL_FIELD_SIZE(fi) (((SSchema *)((fi)->desc))->bytes)
#define FILTER_GET_COL_FIELD_ID(fi) (((SSchema *)((fi)->desc))->colId)
#define FILTER_GET_COL_FIELD_DESC(fi) ((SSchema *)((fi)->desc))
#define FILTER_GET_COL_FIELD_DATA(fi, ri) ((char *)(fi)->data + ((SSchema *)((fi)->desc))->bytes * (ri))
#define FILTER_GET_VAL_FIELD_TYPE(fi) (((tVariant *)((fi)->desc))->nType)
#define FILTER_GET_VAL_FIELD_DATA(fi) ((char *)(fi)->data)
#define FILTER_GET_TYPE(fl) ((fl) & FLD_TYPE_MAX)
#define FILTER_GROUP_UNIT(i, g, uid) ((i)->units + (g)->unitIdxs[uid])
#define FILTER_UNIT_LEFT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->left)
#define FILTER_UNIT_RIGHT_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right)
#define FILTER_UNIT_RIGHT2_FIELD(i, u) FILTER_GET_FIELD(i, (u)->right2)
#define FILTER_UNIT_DATA_TYPE(u) ((u)->compare.type)
#define FILTER_UNIT_COL_DESC(i, u) FILTER_GET_COL_FIELD_DESC(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_DATA(i, u, ri) FILTER_GET_COL_FIELD_DATA(FILTER_UNIT_LEFT_FIELD(i, u), ri)
#define FILTER_UNIT_COL_SIZE(i, u) FILTER_GET_COL_FIELD_SIZE(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_COL_ID(i, u) FILTER_GET_COL_FIELD_ID(FILTER_UNIT_LEFT_FIELD(i, u))
#define FILTER_UNIT_VAL_DATA(i, u) FILTER_GET_VAL_FIELD_DATA(FILTER_UNIT_RIGHT_FIELD(i, u))
#define FILTER_UNIT_COL_IDX(u) ((u)->left.idx)
#define FILTER_UNIT_OPTR(u) ((u)->compare.optr)
#define FILTER_UNIT_COMP_FUNC(u) ((u)->compare.func)
#define FILTER_UNIT_CLR_F(i) memset((i)->unitFlags, 0, (i)->unitNum * sizeof(*info->unitFlags))
#define FILTER_UNIT_SET_F(i, idx) (i)->unitFlags[idx] = 1
#define FILTER_UNIT_GET_F(i, idx) ((i)->unitFlags[idx])
#define FILTER_UNIT_GET_R(i, idx) ((i)->unitRes[idx])
#define FILTER_UNIT_SET_R(i, idx, v) (i)->unitRes[idx] = (v)
#define FILTER_PUSH_UNIT(colInfo, u) do { (colInfo).type = RANGE_TYPE_UNIT; (colInfo).dataType = FILTER_UNIT_DATA_TYPE(u);taosArrayPush((SArray *)((colInfo).info), &u);} while (0)
#define FILTER_PUSH_VAR_HASH(colInfo, ha) do { (colInfo).type = RANGE_TYPE_VAR_HASH; (colInfo).info = ha;} while (0)
#define FILTER_PUSH_CTX(colInfo, ctx) do { (colInfo).type = RANGE_TYPE_MR_CTX; (colInfo).info = ctx;} while (0)
#define FILTER_COPY_IDX(dst, src, n) do { *(dst) = malloc(sizeof(uint16_t) * n); memcpy(*(dst), src, sizeof(uint16_t) * n);} while (0)
#define FILTER_ADD_CTX_TO_GRES(gres, idx, ctx) do { if ((gres)->colCtxs == NULL) { (gres)->colCtxs = taosArrayInit(gres->colNum, sizeof(SFilterColCtx)); } SFilterColCtx cCtx = {idx, ctx}; taosArrayPush((gres)->colCtxs, &cCtx); } while (0)
#define FILTER_ALL_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_ALL)
#define FILTER_EMPTY_RES(i) FILTER_GET_FLAG((i)->status, FI_STATUS_EMPTY)
#if 0
extern int32_t filterInitFromTree(tExprNode* tree, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, int32_t numOfRows, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetColFieldData(SFilterInfo *info, int32_t numOfCols, SArray* pDataBlock);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
#else
//REMOVE THESE!!!!!!!!!!!!!!!!!!!!
#include "function.h"
#endif
extern
bool
filterDoCompare
(
__compar_fn_t
func
,
uint8_t
optr
,
void
*
left
,
void
*
right
);
extern
__compar_fn_t
filterGetCompFunc
(
int32_t
type
,
int32_t
optr
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_QFILTER_H
source/libs/scalar/src/filter.c
浏览文件 @
8d413f09
此差异已折叠。
点击以展开。
source/libs/scalar/src/scalar.c
浏览文件 @
8d413f09
...
@@ -45,6 +45,11 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
...
@@ -45,6 +45,11 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
break
;
break
;
}
}
case
QUERY_NODE_NODE_LIST
:
{
SNodeListNode
*
nodeList
=
(
SNodeListNode
*
)
node
;
//TODO BUILD HASH
break
;
}
case
QUERY_NODE_COLUMN_REF
:
{
case
QUERY_NODE_COLUMN_REF
:
{
if
(
NULL
==
ctx
)
{
if
(
NULL
==
ctx
)
{
sclError
(
"invalid node type for constant calculating, type:%d, ctx:%p"
,
nodeType
(
node
),
ctx
);
sclError
(
"invalid node type for constant calculating, type:%d, ctx:%p"
,
nodeType
(
node
),
ctx
);
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
8d413f09
...
@@ -19,7 +19,7 @@
...
@@ -19,7 +19,7 @@
#include "sclvector.h"
#include "sclvector.h"
#include "tcompare.h"
#include "tcompare.h"
#include "querynodes.h"
#include "querynodes.h"
#include "filter.h"
#include "filter
Int
.h"
#include "query.h"
#include "query.h"
//GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i]));
//GET_TYPED_DATA(v, double, pRight->type, (char *)&((right)[i]));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录