Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
2d58e1fe
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
2d58e1fe
编写于
2月 27, 2022
作者:
D
dapan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feature/qnode
上级
7129ba9c
变更
10
展开全部
隐藏空白更改
内联
并排
Showing
10 changed file
with
537 addition
and
658 deletion
+537
-658
include/common/tep.h
include/common/tep.h
+3
-0
include/libs/function/function.h
include/libs/function/function.h
+4
-1
include/libs/scalar/scalar.h
include/libs/scalar/scalar.h
+1
-1
source/common/src/tep.c
source/common/src/tep.c
+0
-1
source/libs/scalar/inc/filterInt.h
source/libs/scalar/inc/filterInt.h
+1
-0
source/libs/scalar/inc/sclInt.h
source/libs/scalar/inc/sclInt.h
+6
-3
source/libs/scalar/inc/sclvector.h
source/libs/scalar/inc/sclvector.h
+1
-1
source/libs/scalar/src/filter.c
source/libs/scalar/src/filter.c
+6
-1
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+146
-69
source/libs/scalar/src/sclvector.c
source/libs/scalar/src/sclvector.c
+369
-581
未找到文件。
include/common/tep.h
浏览文件 @
2d58e1fe
...
...
@@ -67,6 +67,9 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
}
}
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
...
...
include/libs/function/function.h
浏览文件 @
2d58e1fe
...
...
@@ -228,7 +228,10 @@ typedef struct SAggFunctionInfo {
typedef
struct
SScalarParam
{
void
*
data
;
SColumnInfoData
*
columnData
;
union
{
SColumnInfoData
*
columnData
;
void
*
data
;
}
orig
;
char
*
bitmap
;
bool
dataInBlock
;
int32_t
num
;
...
...
include/libs/scalar/scalar.h
浏览文件 @
2d58e1fe
...
...
@@ -27,7 +27,7 @@ typedef struct SFilterInfo SFilterInfo;
int32_t
scalarCalculateConstants
(
SNode
*
pNode
,
SNode
**
pRes
);
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
SDataBlock
*
pSrc
,
SScalarParam
*
pDst
);
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
Array
*
pBlockList
,
SScalarParam
*
pDst
);
int32_t
scalarGetOperatorParamNum
(
EOperatorType
type
);
int32_t
scalarGenerateSetFromList
(
void
**
data
,
void
*
pNode
,
uint32_t
type
);
...
...
source/common/src/tep.c
浏览文件 @
2d58e1fe
...
...
@@ -61,7 +61,6 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
return
ep
;
}
#define BitmapLen(_n) (((_n) + ((1<<NBIT)-1)) >> NBIT)
int32_t
colDataGetSize
(
const
SColumnInfoData
*
pColumnInfoData
,
int32_t
numOfRows
)
{
ASSERT
(
pColumnInfoData
!=
NULL
);
...
...
source/libs/scalar/inc/filterInt.h
浏览文件 @
2d58e1fe
...
...
@@ -253,6 +253,7 @@ typedef struct SFilterInfo {
uint32_t
*
blkUnits
;
int8_t
*
blkUnitRes
;
void
*
pTable
;
SArray
*
blkList
;
SFilterPCtx
pctx
;
}
SFilterInfo
;
...
...
source/libs/scalar/inc/sclInt.h
浏览文件 @
2d58e1fe
...
...
@@ -24,7 +24,7 @@ extern "C" {
typedef
struct
SScalarCtx
{
int32_t
code
;
S
SDataBlock
*
pSrc
;
S
Array
*
pBlockList
;
/* element is SSDataBlock* */
SHashObj
*
pRes
;
/* element is SScalarParam */
}
SScalarCtx
;
...
...
@@ -44,10 +44,13 @@ typedef struct SScalarCtx {
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t
sclMoveParamListData
(
SScalarParam
*
params
,
int32_t
listNum
,
int32_t
idx
);
bool
sclIsNull
(
SScalarParam
*
param
,
int32_t
idx
);
void
sclSetNull
(
SScalarParam
*
param
,
int32_t
idx
);
void
sclFreeParam
(
SScalarParam
*
param
);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_SCALARINT_H
\ No newline at end of file
#endif // TDENGINE_SCALARINT_H
source/libs/scalar/inc/sclvector.h
浏览文件 @
2d58e1fe
...
...
@@ -23,7 +23,7 @@ extern "C" {
#include "sclfunc.h"
typedef
void
(
*
_bufConverteFunc
)(
char
*
buf
,
SScalarParam
*
pOut
,
int32_t
outType
);
typedef
void
(
*
_bin_scalar_fn_t
)(
SScalarParam
*
pLeft
,
SScalarParam
*
pRight
,
void
*
output
,
int32_t
order
);
typedef
void
(
*
_bin_scalar_fn_t
)(
SScalarParam
*
pLeft
,
SScalarParam
*
pRight
,
SScalarParam
*
output
,
int32_t
order
);
_bin_scalar_fn_t
getBinScalarOperatorFn
(
int32_t
binOperator
);
#ifdef __cplusplus
...
...
source/libs/scalar/src/filter.c
浏览文件 @
2d58e1fe
...
...
@@ -3668,7 +3668,12 @@ _return:
bool
filterExecute
(
SFilterInfo
*
info
,
SSDataBlock
*
pSrc
,
int8_t
**
p
,
SColumnDataAgg
*
statis
,
int16_t
numOfCols
)
{
if
(
info
->
scalarMode
)
{
SScalarParam
output
=
{
0
};
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pSrc
,
&
output
));
SArray
*
pList
=
taosArrayInit
(
1
,
POINTER_BYTES
);
taosArrayPush
(
pList
,
&
pSrc
);
FLT_ERR_RET
(
scalarCalculate
(
info
->
sclCtx
.
node
,
pList
,
&
output
));
taosArrayDestroy
(
pList
);
*
p
=
output
.
data
;
...
...
source/libs/scalar/src/scalar.c
浏览文件 @
2d58e1fe
...
...
@@ -5,6 +5,7 @@
#include "functionMgt.h"
#include "sclvector.h"
#include "sclInt.h"
#include "tep.h"
int32_t
scalarGetOperatorParamNum
(
EOperatorType
type
)
{
if
(
OP_TYPE_IS_NULL
==
type
||
OP_TYPE_IS_NOT_NULL
==
type
||
OP_TYPE_IS_TRUE
==
type
||
OP_TYPE_IS_NOT_TRUE
==
type
...
...
@@ -87,15 +88,23 @@ _return:
SCL_RET
(
code
);
}
bool
sclIsNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
FORCE_INLINE
bool
sclIsNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
if
(
param
->
dataInBlock
)
{
return
colDataIsNull
(
param
->
columnData
,
0
,
idx
,
NULL
);
return
colDataIsNull
(
param
->
orig
.
columnData
,
0
,
idx
,
NULL
);
}
return
colDataIsNull_f
(
param
->
bitmap
,
idx
)
;
return
param
->
bitmap
?
colDataIsNull_f
(
param
->
bitmap
,
idx
)
:
false
;
}
void
sclSetNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
FORCE_INLINE
void
sclSetNull
(
SScalarParam
*
param
,
int32_t
idx
)
{
if
(
NULL
==
param
->
bitmap
)
{
param
->
bitmap
=
calloc
(
BitmapLen
(
param
->
num
),
sizeof
(
char
));
if
(
NULL
==
param
->
bitmap
)
{
sclError
(
"calloc %d failed"
,
param
->
num
);
return
;
}
}
colDataSetNull_f
(
param
->
bitmap
,
idx
);
}
...
...
@@ -107,7 +116,7 @@ void sclFreeRes(SHashObj *res) {
p
=
(
SScalarParam
*
)
pIter
;
if
(
p
)
{
tfree
(
p
->
data
);
sclFreeParam
(
p
);
}
pIter
=
taosHashIterate
(
res
,
pIter
);
...
...
@@ -117,7 +126,15 @@ void sclFreeRes(SHashObj *res) {
}
void
sclFreeParam
(
SScalarParam
*
param
)
{
tfree
(
param
->
data
);
tfree
(
param
->
bitmap
);
if
(
!
param
->
dataInBlock
)
{
if
(
SCL_DATA_TYPE_DUMMY_HASH
==
param
->
type
)
{
taosHashCleanup
((
SHashObj
*
)
param
->
orig
.
data
);
}
else
{
tfree
(
param
->
orig
.
data
);
}
}
}
int32_t
sclInitParam
(
SNode
*
node
,
SScalarParam
*
param
,
SScalarCtx
*
ctx
,
int32_t
*
rowNum
)
{
...
...
@@ -140,30 +157,45 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
SCL_ERR_RET
(
scalarGenerateSetFromList
(
&
param
->
data
,
node
,
nodeList
->
dataType
.
type
));
param
->
orig
.
data
=
param
->
data
;
param
->
num
=
1
;
param
->
type
=
SCL_DATA_TYPE_DUMMY_HASH
;
param
->
dataInBlock
=
false
;
if
(
taosHashPut
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
,
param
,
sizeof
(
*
param
)))
{
taosHashCleanup
(
param
->
orig
.
data
);
param
->
orig
.
data
=
NULL
;
sclError
(
"taosHashPut nodeList failed, size:%d"
,
(
int32_t
)
sizeof
(
*
param
));
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
break
;
}
case
QUERY_NODE_COLUMN
:
{
if
(
NULL
==
ctx
)
{
sclError
(
"invalid node type for constant calculating, type:%d,
ctx:%p"
,
nodeType
(
node
),
ctx
);
if
(
NULL
==
ctx
->
pBlockList
)
{
sclError
(
"invalid node type for constant calculating, type:%d,
src:%p"
,
nodeType
(
node
),
ctx
->
pBlockList
);
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
SColumnNode
*
ref
=
(
SColumnNode
*
)
node
;
if
(
ref
->
slotId
>=
taosArrayGetSize
(
ctx
->
pSrc
->
pDataBlock
))
{
sclError
(
"column ref slotId is too big, slodId:%d, dataBlockSize:%d"
,
ref
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pSrc
->
pDataBlock
));
if
(
ref
->
tupleId
>=
taosArrayGetSize
(
ctx
->
pBlockList
))
{
sclError
(
"column tupleId is too big, tupleId:%d, dataBlockNum:%d"
,
ref
->
tupleId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pBlockList
));
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSDataBlock
*
block
=
taosArrayGet
(
ctx
->
pBlockList
,
ref
->
tupleId
);
if
(
NULL
==
block
||
ref
->
slotId
>=
taosArrayGetSize
(
block
->
pDataBlock
))
{
sclError
(
"column slotId is too big, slodId:%d, dataBlockSize:%d"
,
ref
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
block
->
pDataBlock
));
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SColumnInfoData
*
columnData
=
(
SColumnInfoData
*
)
taosArrayGet
(
ctx
->
pSrc
->
pDataBlock
,
ref
->
slotId
);
SColumnInfoData
*
columnData
=
(
SColumnInfoData
*
)
taosArrayGet
(
block
->
pDataBlock
,
ref
->
slotId
);
param
->
data
=
NULL
;
param
->
columnData
=
columnData
;
param
->
orig
.
columnData
=
columnData
;
param
->
dataInBlock
=
true
;
param
->
num
=
ctx
->
pSrc
->
info
.
rows
;
param
->
num
=
block
->
info
.
rows
;
param
->
type
=
columnData
->
info
.
type
;
param
->
bytes
=
columnData
->
info
.
bytes
;
...
...
@@ -171,11 +203,6 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
}
case
QUERY_NODE_LOGIC_CONDITION
:
case
QUERY_NODE_OPERATOR
:
{
if
(
NULL
==
ctx
)
{
sclError
(
"invalid node type for constant calculating, type:%d, ctx:%p"
,
nodeType
(
node
),
ctx
);
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
->
pRes
,
&
node
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no result for node, type:%d, node:%p"
,
nodeType
(
node
),
node
);
...
...
@@ -211,13 +238,15 @@ int32_t sclMoveParamListData(SScalarParam *params, int32_t listNum, int32_t idx)
}
if
(
param
->
dataInBlock
)
{
param
->
data
=
colDataGet
(
param
->
columnData
,
idx
);
param
->
data
=
colDataGet
(
param
->
orig
.
columnData
,
idx
);
}
else
if
(
idx
)
{
if
(
IS_VAR_DATA_TYPE
(
param
->
type
))
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
varDataTLen
(
param
->
data
);
}
else
{
param
->
data
=
(
char
*
)(
param
->
data
)
+
tDataTypes
[
param
->
type
].
bytes
;
}
}
else
{
param
->
data
=
param
->
orig
.
data
;
}
}
...
...
@@ -307,6 +336,7 @@ int32_t sclExecFuncion(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outpu
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
sizeof
(
tDataTypes
[
output
->
type
].
bytes
)));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
for
(
int32_t
i
=
0
;
i
<
rowNum
;
++
i
)
{
sclMoveParamListData
(
output
,
1
,
i
);
...
...
@@ -358,9 +388,8 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
sclError
(
"calloc %d failed"
,
(
int32_t
)(
rowNum
*
sizeof
(
bool
)));
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
void
*
data
=
output
->
data
;
bool
value
=
false
;
for
(
int32_t
i
=
0
;
i
<
rowNum
;
++
i
)
{
...
...
@@ -382,8 +411,6 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
*
(
bool
*
)
output
->
data
=
value
;
}
output
->
data
=
data
;
return
TSDB_CODE_SUCCESS
;
_return:
...
...
@@ -407,6 +434,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
sclError
(
"calloc %d failed"
,
(
int32_t
)
rowNum
*
tDataTypes
[
output
->
type
].
bytes
);
SCL_ERR_JRET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
output
->
orig
.
data
=
output
->
data
;
_bin_scalar_fn_t
OperatorFn
=
getBinScalarOperatorFn
(
node
->
opType
);
...
...
@@ -414,7 +442,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
SScalarParam
*
pLeft
=
&
params
[
0
];
SScalarParam
*
pRight
=
paramNum
>
1
?
&
params
[
1
]
:
NULL
;
OperatorFn
(
pLeft
,
pRight
,
output
->
data
,
TSDB_ORDER_ASC
);
OperatorFn
(
pLeft
,
pRight
,
output
,
TSDB_ORDER_ASC
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -425,12 +453,12 @@ _return:
}
EDealRes
sclRewriteFunction
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteFunction
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SFunctionNode
*
node
=
(
SFunctionNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecFuncion
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecFuncion
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -438,7 +466,7 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -459,12 +487,12 @@ EDealRes sclRewriteFunction(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclRewriteLogic
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteLogic
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SLogicConditionNode
*
node
=
(
SLogicConditionNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecLogic
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecLogic
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -472,7 +500,7 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -493,12 +521,12 @@ EDealRes sclRewriteLogic(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclRewriteOperator
(
SNode
**
pNode
,
void
*
pContext
)
{
EDealRes
sclRewriteOperator
(
SNode
**
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
*
pNode
;
SScalarParam
output
=
{
0
};
*
(
int32_t
*
)
pContext
=
sclExecOperator
(
node
,
NULL
,
&
output
);
if
(
*
(
int32_t
*
)
pContext
)
{
ctx
->
code
=
sclExecOperator
(
node
,
ctx
,
&
output
);
if
(
ctx
->
code
)
{
return
DEAL_RES_ERROR
;
}
...
...
@@ -506,7 +534,7 @@ EDealRes sclRewriteOperator(SNode** pNode, void* pContext) {
if
(
NULL
==
res
)
{
sclError
(
"make value node failed"
);
sclFreeParam
(
&
output
);
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
ctx
->
code
=
TSDB_CODE_QRY_OUT_OF_MEMORY
;
return
DEAL_RES_ERROR
;
}
...
...
@@ -533,28 +561,29 @@ EDealRes sclConstantsRewriter(SNode** pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
*
pNode
))
{
return
sclRewriteFunction
(
pNode
,
pContext
);
return
sclRewriteFunction
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
*
pNode
))
{
return
sclRewriteLogic
(
pNode
,
pContext
);
return
sclRewriteLogic
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
*
pNode
))
{
return
sclRewriteOperator
(
pNode
,
pContext
);
return
sclRewriteOperator
(
pNode
,
ctx
);
}
sclError
(
"invalid node type for calculating constants, type:%d"
,
nodeType
(
*
pNode
));
*
(
int32_t
*
)
pContext
=
TSDB_CODE_QRY_INVALID_INPUT
;
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
EDealRes
sclWalkFunction
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkFunction
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SFunctionNode
*
node
=
(
SFunctionNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -572,8 +601,7 @@ EDealRes sclWalkFunction(SNode* pNode, void* pContext) {
}
EDealRes
sclWalkLogic
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkLogic
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SLogicConditionNode
*
node
=
(
SLogicConditionNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -591,8 +619,7 @@ EDealRes sclWalkLogic(SNode* pNode, void* pContext) {
}
EDealRes
sclWalkOperator
(
SNode
*
pNode
,
void
*
pContext
)
{
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
EDealRes
sclWalkOperator
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
SOperatorNode
*
node
=
(
SOperatorNode
*
)
pNode
;
SScalarParam
output
=
{
0
};
...
...
@@ -609,27 +636,69 @@ EDealRes sclWalkOperator(SNode* pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclWalkTarget
(
SNode
*
pNode
,
SScalarCtx
*
ctx
)
{
STargetNode
*
target
=
(
STargetNode
*
)
pNode
;
if
(
target
->
tupleId
>=
taosArrayGetSize
(
ctx
->
pBlockList
))
{
sclError
(
"target tupleId is too big, tupleId:%d, dataBlockNum:%d"
,
target
->
tupleId
,
(
int32_t
)
taosArrayGetSize
(
ctx
->
pBlockList
));
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
SSDataBlock
*
block
=
taosArrayGet
(
ctx
->
pBlockList
,
target
->
tupleId
);
if
(
target
->
slotId
>=
taosArrayGetSize
(
block
->
pDataBlock
))
{
sclError
(
"target slot not exist, slotId:%d, dataBlockNum:%d"
,
target
->
slotId
,
(
int32_t
)
taosArrayGetSize
(
block
->
pDataBlock
));
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
DEAL_RES_ERROR
;
}
SColumnInfoData
*
col
=
taosArrayGet
(
block
->
pDataBlock
,
target
->
slotId
);
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
->
pRes
,
(
void
*
)
&
target
->
pExpr
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no valid res in hash, node:%p, type:%d"
,
target
->
pExpr
,
nodeType
(
target
->
pExpr
));
ctx
->
code
=
TSDB_CODE_QRY_APP_ERROR
;
return
DEAL_RES_ERROR
;
}
taosHashRemove
(
ctx
->
pRes
,
(
void
*
)
&
target
->
pExpr
,
POINTER_BYTES
);
for
(
int32_t
i
=
0
;
i
<
res
->
num
;
++
i
)
{
sclMoveParamListData
(
res
,
1
,
i
);
colDataAppend
(
col
,
i
,
res
->
data
,
sclIsNull
(
res
,
i
));
}
sclFreeParam
(
res
);
return
DEAL_RES_CONTINUE
;
}
EDealRes
sclCalcWalker
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_VALUE
==
nodeType
(
pNode
)
||
QUERY_NODE_NODE_LIST
==
nodeType
(
pNode
)
||
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
return
DEAL_RES_CONTINUE
;
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
if
(
QUERY_NODE_FUNCTION
==
nodeType
(
pNode
))
{
return
sclWalkFunction
(
pNode
,
pContext
);
return
sclWalkFunction
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pNode
))
{
return
sclWalkLogic
(
pNode
,
pContext
);
return
sclWalkLogic
(
pNode
,
ctx
);
}
if
(
QUERY_NODE_OPERATOR
==
nodeType
(
pNode
))
{
return
sclWalkOperator
(
pNode
,
pContext
);
return
sclWalkOperator
(
pNode
,
ctx
);
}
sclError
(
"invalid node type for scalar calculating, type:%d"
,
nodeType
(
pNode
));
if
(
QUERY_NODE_TARGET
==
nodeType
(
pNode
))
{
return
sclWalkTarget
(
pNode
,
ctx
);
}
SScalarCtx
*
ctx
=
(
SScalarCtx
*
)
pContext
;
sclError
(
"invalid node type for scalar calculating, type:%d"
,
nodeType
(
pNode
))
;
ctx
->
code
=
TSDB_CODE_QRY_INVALID_INPUT
;
...
...
@@ -644,26 +713,33 @@ int32_t scalarCalculateConstants(SNode *pNode, SNode **pRes) {
}
int32_t
code
=
0
;
SScalarCtx
ctx
=
{
0
};
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
sclError
(
"taosHashInit failed, num:%d"
,
SCL_DEFAULT_OP_NUM
);
SCL_ERR_RET
(
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
nodesRewriteNodePostOrder
(
&
pNode
,
sclConstantsRewriter
,
(
void
*
)
&
c
ode
);
nodesRewriteNodePostOrder
(
&
pNode
,
sclConstantsRewriter
,
(
void
*
)
&
c
tx
);
if
(
code
)
{
nodesDestroyNode
(
pNode
);
SCL_ERR_RET
(
code
);
}
SCL_ERR_JRET
(
ctx
.
code
);
*
pRes
=
pNode
;
SCL_RET
(
code
);
_return:
sclFreeRes
(
ctx
.
pRes
);
return
code
;
}
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
SDataBlock
*
pSrc
,
SScalarParam
*
pDst
)
{
if
(
NULL
==
pNode
||
NULL
==
p
Src
||
NULL
==
pDst
)
{
int32_t
scalarCalculate
(
SNode
*
pNode
,
S
Array
*
pBlockList
,
SScalarParam
*
pDst
)
{
if
(
NULL
==
pNode
||
NULL
==
p
BlockList
||
NULL
==
pDst
)
{
SCL_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
int32_t
code
=
0
;
SScalarCtx
ctx
=
{.
code
=
0
,
.
p
Src
=
pSrc
};
SScalarCtx
ctx
=
{.
code
=
0
,
.
p
BlockList
=
pBlockList
};
ctx
.
pRes
=
taosHashInit
(
SCL_DEFAULT_OP_NUM
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
),
false
,
HASH_NO_LOCK
);
if
(
NULL
==
ctx
.
pRes
)
{
...
...
@@ -673,23 +749,24 @@ int32_t scalarCalculate(SNode *pNode, SSDataBlock *pSrc, SScalarParam *pDst) {
nodesWalkNodePostOrder
(
pNode
,
sclCalcWalker
,
(
void
*
)
&
ctx
);
if
(
ctx
.
code
)
{
nodesDestroyNode
(
pNode
);
sclFreeRes
(
ctx
.
pRes
);
SCL_ERR_RET
(
ctx
.
code
);
}
SCL_ERR_JRET
(
ctx
.
code
);
SScalarParam
*
res
=
(
SScalarParam
*
)
taosHashGet
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
if
(
NULL
==
res
)
{
sclError
(
"no
res for calculating
, node:%p, type:%d"
,
pNode
,
nodeType
(
pNode
));
SCL_ERR_RET
(
TSDB_CODE_QRY_APP_ERROR
);
sclError
(
"no
valid res in hash
, node:%p, type:%d"
,
pNode
,
nodeType
(
pNode
));
SCL_ERR_
J
RET
(
TSDB_CODE_QRY_APP_ERROR
);
}
taosHashRemove
(
ctx
.
pRes
,
(
void
*
)
&
pNode
,
POINTER_BYTES
);
*
pDst
=
*
res
;
_return:
nodesDestroyNode
(
pNode
);
sclFreeRes
(
ctx
.
pRes
);
return
TSDB_CODE_SUCCESS
;
return
code
;
}
...
...
source/libs/scalar/src/sclvector.c
浏览文件 @
2d58e1fe
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录