Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4364e7d4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4364e7d4
编写于
5月 23, 2022
作者:
wmmhello
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-13041
上级
aca3da38
6f302c2e
变更
24
显示空白变更内容
内联
并排
Showing
24 changed file
with
298 addition
and
208 deletion
+298
-208
include/common/ttypes.h
include/common/ttypes.h
+1
-1
include/libs/function/functionMgt.h
include/libs/function/functionMgt.h
+2
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-0
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+1
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+1
-1
source/common/src/trow.c
source/common/src/trow.c
+1
-1
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+2
-1
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+2
-6
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+106
-96
source/libs/function/inc/functionMgtInt.h
source/libs/function/inc/functionMgtInt.h
+1
-1
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+10
-10
source/libs/function/src/functionMgt.c
source/libs/function/src/functionMgt.c
+3
-1
source/libs/function/src/udfd.c
source/libs/function/src/udfd.c
+22
-16
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+14
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+11
-17
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+1
-1
source/libs/planner/CMakeLists.txt
source/libs/planner/CMakeLists.txt
+1
-1
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+3
-3
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+77
-35
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+10
-4
source/libs/planner/src/planner.c
source/libs/planner/src/planner.c
+10
-0
source/libs/planner/test/planOptimizeTest.cpp
source/libs/planner/test/planOptimizeTest.cpp
+6
-0
source/libs/planner/test/planTestUtil.cpp
source/libs/planner/test/planTestUtil.cpp
+10
-10
未找到文件。
include/common/ttypes.h
浏览文件 @
4364e7d4
...
...
@@ -30,7 +30,7 @@ typedef uint64_t TDRowVerT;
typedef
int16_t
col_id_t
;
typedef
int8_t
col_type_t
;
typedef
int32_t
col_bytes_t
;
typedef
uint16
_t
schema_ver_t
;
typedef
int32
_t
schema_ver_t
;
typedef
int32_t
func_id_t
;
#pragma pack(push, 1)
...
...
include/libs/function/functionMgt.h
浏览文件 @
4364e7d4
...
...
@@ -146,7 +146,8 @@ bool fmIsBuiltinFunc(const char* pFunc);
bool
fmIsAggFunc
(
int32_t
funcId
);
bool
fmIsScalarFunc
(
int32_t
funcId
);
bool
fmIsNonstandardSQLFunc
(
int32_t
funcId
);
bool
fmIsVectorFunc
(
int32_t
funcId
);
bool
fmIsIndefiniteRowsFunc
(
int32_t
funcId
);
bool
fmIsStringFunc
(
int32_t
funcId
);
bool
fmIsDatetimeFunc
(
int32_t
funcId
);
bool
fmIsSelectFunc
(
int32_t
funcId
);
...
...
include/libs/nodes/plannodes.h
浏览文件 @
4364e7d4
...
...
@@ -54,6 +54,7 @@ typedef struct SScanLogicNode {
int64_t
sliding
;
int8_t
intervalUnit
;
int8_t
slidingUnit
;
SNode
*
pTagCond
;
}
SScanLogicNode
;
typedef
struct
SJoinLogicNode
{
...
...
@@ -343,6 +344,7 @@ typedef struct SSubplan {
SNodeList
*
pParents
;
// the data destination subplan, get data from current subplan
SPhysiNode
*
pNode
;
// physical plan of current subplan
SDataSinkNode
*
pDataSink
;
// data of the subplan flow into the datasink
SNode
*
pTagCond
;
}
SSubplan
;
typedef
enum
EExplainMode
{
EXPLAIN_MODE_DISABLE
=
1
,
EXPLAIN_MODE_STATIC
,
EXPLAIN_MODE_ANALYZE
}
EExplainMode
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
4364e7d4
...
...
@@ -241,7 +241,7 @@ typedef struct SSelectStmt {
bool
isTimeOrderQuery
;
bool
hasAggFuncs
;
bool
hasRepeatScanFuncs
;
bool
has
NonstdSQL
Func
;
bool
has
IndefiniteRows
Func
;
}
SSelectStmt
;
typedef
enum
ESetOperatorType
{
SET_OP_TYPE_UNION_ALL
=
1
,
SET_OP_TYPE_UNION
}
ESetOperatorType
;
...
...
source/common/src/tdatablock.c
浏览文件 @
4364e7d4
...
...
@@ -1538,7 +1538,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t
msgLen
=
sizeof
(
SSubmitReq
);
int32_t
numOfBlks
=
0
;
SRowBuilder
rb
=
{
0
};
tdSRowInit
(
&
rb
,
0
);
// TODO: use the latest version
tdSRowInit
(
&
rb
,
pTSchema
->
version
);
// TODO: use the latest version
for
(
int32_t
i
=
0
;
i
<
sz
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
pDataBlocks
,
i
);
...
...
source/common/src/tmsg.c
浏览文件 @
4364e7d4
...
...
@@ -3817,7 +3817,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) {
STSchema
*
tdGetSTSChemaFromSSChema
(
SSchema
**
pSchema
,
int32_t
nCols
)
{
STSchemaBuilder
schemaBuilder
=
{
0
};
if
(
tdInitTSchemaBuilder
(
&
schemaBuilder
,
0
)
<
0
)
{
if
(
tdInitTSchemaBuilder
(
&
schemaBuilder
,
1
)
<
0
)
{
return
NULL
;
}
...
...
source/common/src/trow.c
浏览文件 @
4364e7d4
...
...
@@ -924,7 +924,7 @@ void tdSRowPrint(STSRow *row, STSchema *pSchema, const char *tag) {
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pSchema
);
tdSTSRowIterReset
(
&
iter
,
row
);
printf
(
"%s >>>
"
,
tag
);
printf
(
"%s >>>
type:%d,sver:%d "
,
tag
,
(
int32_t
)
TD_ROW_TYPE
(
row
),
(
int32_t
)
TD_ROW_SVER
(
row
)
);
for
(
int
i
=
0
;
i
<
pSchema
->
numOfCols
;
++
i
)
{
STColumn
*
stCol
=
pSchema
->
columns
+
i
;
SCellVal
sVal
=
{
255
,
NULL
};
...
...
source/dnode/vnode/src/meta/metaQuery.c
浏览文件 @
4364e7d4
...
...
@@ -278,12 +278,13 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
pSW
=
metaGetTableSchema
(
pMeta
,
quid
,
sver
,
0
);
if
(
!
pSW
)
return
NULL
;
tdInitTSchemaBuilder
(
&
sb
,
0
);
tdInitTSchemaBuilder
(
&
sb
,
sver
);
for
(
int
i
=
0
;
i
<
pSW
->
nCols
;
i
++
)
{
pSchema
=
pSW
->
pSchema
+
i
;
tdAddColToSchema
(
&
sb
,
pSchema
->
type
,
pSchema
->
flags
,
pSchema
->
colId
,
pSchema
->
bytes
);
}
pTSchema
=
tdGetSchemaFromBuilder
(
&
sb
);
tdDestroyTSchemaBuilder
(
&
sb
);
taosMemoryFree
(
pSW
->
pSchema
);
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
4364e7d4
...
...
@@ -1638,9 +1638,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
int32_t
numOfColsOfRow1
=
0
;
if
(
pSchema1
==
NULL
)
{
// pSchema1 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row1));
// TODO: use the real schemaVersion
pSchema1
=
metaGetTbTSchema
(
REPO_META
(
pTsdbReadHandle
->
pTsdb
),
uid
,
1
);
pSchema1
=
metaGetTbTSchema
(
REPO_META
(
pTsdbReadHandle
->
pTsdb
),
uid
,
TD_ROW_SVER
(
row1
));
}
#ifdef TD_DEBUG_PRINT_ROW
...
...
@@ -1657,9 +1655,7 @@ static int32_t mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capa
if
(
row2
)
{
isRow2DataRow
=
TD_IS_TP_ROW
(
row2
);
if
(
pSchema2
==
NULL
)
{
// pSchema2 = metaGetTbTSchema(REPO_META(pTsdbReadHandle->pTsdb), uid, TD_ROW_SVER(row2));
// TODO: use the real schemaVersion
pSchema2
=
metaGetTbTSchema
(
REPO_META
(
pTsdbReadHandle
->
pTsdb
),
uid
,
1
);
pSchema2
=
metaGetTbTSchema
(
REPO_META
(
pTsdbReadHandle
->
pTsdb
),
uid
,
TD_ROW_SVER
(
row2
));
}
if
(
isRow2DataRow
)
{
numOfColsOfRow2
=
schemaNCols
(
pSchema2
);
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4364e7d4
...
...
@@ -154,8 +154,9 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void
operatorDummyCloseFn
(
void
*
param
,
int32_t
numOfCols
)
{}
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
static
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
taskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
);
static
void
initCtxOutputBuffer
(
SqlFunctionCtx
*
pCtx
,
int32_t
size
);
static
void
setResultBufSize
(
STaskAttr
*
pQueryAttr
,
SResultInfo
*
pResultInfo
);
...
...
@@ -182,10 +183,10 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
int16_t
offset
=
supporter
->
dataOffset
;
return
0
;
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
}
// setup the output buffer for each operator
...
...
@@ -582,8 +583,9 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64
(
pColData
,
4
,
&
pQueryWindow
->
ekey
);
}
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
)
{
void
doApplyFunctions
(
SExecTaskInfo
*
taskInfo
,
SqlFunctionCtx
*
pCtx
,
STimeWindow
*
pWin
,
SColumnInfoData
*
pTimeWindowData
,
int32_t
offset
,
int32_t
forwardStep
,
TSKEY
*
tsCol
,
int32_t
numOfTotal
,
int32_t
numOfOutput
,
int32_t
order
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
// keep it temporarily
bool
hasAgg
=
pCtx
[
k
].
input
.
colDataAggIsSet
;
...
...
@@ -665,8 +667,8 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
}
}
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
void
setInputDataBlock
(
SOperatorInfo
*
pOperator
,
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
order
,
int32_t
scanFlag
,
bool
createDummyCol
)
{
if
(
pBlock
->
pBlockAgg
!=
NULL
)
{
doSetInputDataBlockInfo
(
pOperator
,
pCtx
,
pBlock
,
order
);
}
else
{
...
...
@@ -852,7 +854,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
// _rowts/_c0, not tbname column
if
(
fmIsPseudoColumnFunc
(
pfCtx
->
functionId
)
&&
(
!
fmIsScanPseudoColumnFunc
(
pfCtx
->
functionId
)))
{
// do nothing
}
else
if
(
fmIs
NonstandardSQL
Func
(
pfCtx
->
functionId
))
{
}
else
if
(
fmIs
IndefiniteRows
Func
(
pfCtx
->
functionId
))
{
SResultRowEntryInfo
*
pResInfo
=
GET_RES_INFO
(
&
pCtx
[
k
]);
pfCtx
->
fpSet
.
init
(
&
pCtx
[
k
],
pResInfo
);
...
...
@@ -950,14 +952,14 @@ static bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
return
false
;
}
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
// }
//
// // denote the order type
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
// // return pCtx->param[0].i == pQueryAttr->order.order;
// }
// in the reverse table scan, only the following functions need to be executed
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
...
...
@@ -1072,19 +1074,19 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
for
(
int32_t
i
=
0
;
i
<
numOfOutput
;
++
i
)
{
if
(
strcmp
(
pCtx
[
i
].
pExpr
->
pExpr
->
_function
.
functionName
,
"_select_value"
)
==
0
)
{
pValCtx
[
num
++
]
=
&
pCtx
[
i
];
}
else
if
(
fmIs
Agg
Func
(
pCtx
[
i
].
functionId
))
{
}
else
if
(
fmIs
Select
Func
(
pCtx
[
i
].
functionId
))
{
p
=
&
pCtx
[
i
];
}
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
// tagLen += pCtx[i].resDataInfo.bytes;
// pTagCtx[num++] = &pCtx[i];
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
// // tag function may be the group by tag column
// // ts may be the required primary timestamp column
// continue;
// } else {
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
// }
// if (functionId == FUNCTION_TAG_DUMMY || functionId == FUNCTION_TS_DUMMY) {
// tagLen += pCtx[i].resDataInfo.bytes;
// pTagCtx[num++] = &pCtx[i];
// } else if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG) {
// // tag function may be the group by tag column
// // ts may be the required primary timestamp column
// continue;
// } else {
// // the column may be the normal column, group by normal_column, the functionId is FUNCTION_PRJ
// }
}
if
(
p
!=
NULL
)
{
...
...
@@ -1123,7 +1125,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SFuncExecEnv
env
=
{
0
};
pCtx
->
functionId
=
pExpr
->
pExpr
->
_function
.
pFunctNode
->
funcId
;
if
(
fmIsAggFunc
(
pCtx
->
functionId
)
||
fmIs
NonstandardSQL
Func
(
pCtx
->
functionId
))
{
if
(
fmIsAggFunc
(
pCtx
->
functionId
)
||
fmIs
IndefiniteRows
Func
(
pCtx
->
functionId
))
{
bool
isUdaf
=
fmIsUserDefinedFunc
(
pCtx
->
functionId
);
if
(
!
isUdaf
)
{
fmGetFuncExecFuncs
(
pCtx
->
functionId
,
&
pCtx
->
fpSet
);
...
...
@@ -2005,8 +2007,9 @@ static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_
}
}
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
doCopyToSDataBlock
(
SExecTaskInfo
*
pTaskInfo
,
SSDataBlock
*
pBlock
,
SExprInfo
*
pExprInfo
,
SDiskbasedBuf
*
pBuf
,
SGroupResInfo
*
pGroupResInfo
,
const
int32_t
*
rowCellOffset
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExprs
)
{
int32_t
numOfRows
=
getNumOfTotalRes
(
pGroupResInfo
);
int32_t
start
=
pGroupResInfo
->
index
;
...
...
@@ -2057,7 +2060,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
// the _wstartts needs to copy to 20 following rows, since the results of top-k expands to 20 different rows.
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
slotId
);
char
*
in
=
GET_ROWCELL_INTERBUF
(
pCtx
[
j
].
resultInfo
);
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
for
(
int32_t
k
=
0
;
k
<
pRow
->
numOfRows
;
++
k
)
{
colDataAppend
(
pColInfoData
,
pBlock
->
info
.
rows
+
k
,
in
,
pCtx
[
j
].
resultInfo
->
isNullRes
);
}
}
...
...
@@ -2070,12 +2073,14 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
}
}
qDebug
(
"%s result generated, rows:%d, groupId:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pBlock
->
info
.
rows
,
pBlock
->
info
.
groupId
);
qDebug
(
"%s result generated, rows:%d, groupId:%"
PRIu64
,
GET_TASKID
(
pTaskInfo
),
pBlock
->
info
.
rows
,
pBlock
->
info
.
groupId
);
blockDataUpdateTsWindow
(
pBlock
,
0
);
return
0
;
}
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
)
{
void
doBuildResultDatablock
(
SOperatorInfo
*
pOperator
,
SOptrBasicInfo
*
pbInfo
,
SGroupResInfo
*
pGroupResInfo
,
SDiskbasedBuf
*
pBuf
)
{
SExprInfo
*
pExprInfo
=
pOperator
->
pExpr
;
int32_t
numOfExprs
=
pOperator
->
numOfExprs
;
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
...
...
@@ -3099,7 +3104,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return
TSDB_CODE_SUCCESS
;
}
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
createExchangeOperatorInfo
(
void
*
pTransporter
,
const
SNodeList
*
pSources
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
)
{
SExchangeInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SExchangeInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
...
...
@@ -3212,7 +3217,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static
void
doMergeResultImpl
(
SSortedMergeOperatorInfo
*
pInfo
,
SqlFunctionCtx
*
pCtx
,
int32_t
numOfExpr
,
int32_t
rowIndex
)
{
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
// TODO set row index
// pCtx[j].startRow = rowIndex;
// pCtx[j].startRow = rowIndex;
}
for
(
int32_t
j
=
0
;
j
<
numOfExpr
;
++
j
)
{
...
...
@@ -3263,7 +3268,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx
*
pCtx
=
pInfo
->
binfo
.
pCtx
;
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
++
i
)
{
// pCtx[i].size = 1;
// pCtx[i].size = 1;
}
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
...
...
@@ -3489,10 +3494,11 @@ _error:
return
NULL
;
}
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
int32_t
getTableScanInfo
(
SOperatorInfo
*
pOperator
,
int32_t
*
order
,
int32_t
*
scanFlag
)
{
// todo add more information about exchange operation
int32_t
type
=
pOperator
->
operatorType
;
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
||
type
==
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
||
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
type
==
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
||
type
==
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
||
type
==
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
*
order
=
TSDB_ORDER_ASC
;
*
scanFlag
=
MAIN_SCAN
;
return
TSDB_CODE_SUCCESS
;
...
...
@@ -3859,7 +3865,8 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
setInputDataBlock
(
pOperator
,
pInfo
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
code
=
projectApplyFunctions
(
pOperator
->
pExpr
,
pInfo
->
pRes
,
pBlock
,
pInfo
->
pCtx
,
pOperator
->
numOfExprs
,
pProjectInfo
->
pPseudoColInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
code
);
}
...
...
@@ -4471,10 +4478,10 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
if
(
strcmp
(
pExp
->
pExpr
->
_function
.
functionName
,
"tbname"
)
==
0
)
{
pFuncNode
->
pParameterList
=
nodesMakeList
();
ASSERT
(
LIST_LENGTH
(
pFuncNode
->
pParameterList
)
==
0
);
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
SValueNode
*
res
=
(
SValueNode
*
)
nodesMakeNode
(
QUERY_NODE_VALUE
);
if
(
NULL
==
res
)
{
// todo handle error
}
else
{
res
->
node
.
resType
=
(
SDataType
)
{.
bytes
=
sizeof
(
int64_t
),
.
type
=
TSDB_DATA_TYPE_BIGINT
};
res
->
node
.
resType
=
(
SDataType
){.
bytes
=
sizeof
(
int64_t
),
.
type
=
TSDB_DATA_TYPE_BIGINT
};
nodesListAppend
(
pFuncNode
->
pParameterList
,
res
);
}
}
...
...
@@ -4544,7 +4551,7 @@ static SArray* extractColumnInfo(SNodeList* pNodeList);
static
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
);
static
SArray
*
extractPartitionColInfo
(
SNodeList
*
pNodeList
);
void
extractTableSchemaVersion
(
SReadHandle
*
pHandle
,
uint64_t
uid
,
SExecTaskInfo
*
pTaskInfo
)
{
void
extractTableSchemaVersion
(
SReadHandle
*
pHandle
,
uint64_t
uid
,
SExecTaskInfo
*
pTaskInfo
)
{
SMetaReader
mr
=
{
0
};
metaReaderInit
(
&
mr
,
pHandle
->
meta
,
0
);
metaGetTableEntryByUid
(
&
mr
,
uid
);
...
...
@@ -4600,8 +4607,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
if
(
pHandle
->
vnode
)
{
pDataReader
=
doCreateDataReader
(
pTableScanNode
,
pHandle
,
pTableGroupInfo
,
(
uint64_t
)
queryId
,
taskId
);
}
else
{
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
doCreateTableGroup
(
pHandle
->
meta
,
pScanPhyNode
->
tableType
,
pScanPhyNode
->
uid
,
pTableGroupInfo
,
queryId
,
taskId
);
}
if
(
pDataReader
==
NULL
&&
terrno
!=
0
)
{
...
...
@@ -4617,10 +4623,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SArray
*
tableIdList
=
extractTableIdList
(
pTableGroupInfo
);
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
SArray
*
pCols
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SArray
*
pCols
=
extractColMatchInfo
(
pScanPhyNode
->
pScanCols
,
pDescNode
,
&
numOfCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pHandle
,
pScanPhyNode
->
uid
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
);
SOperatorInfo
*
pOperator
=
createStreamScanOperatorInfo
(
pHandle
->
reader
,
pDataReader
,
pHandle
,
pScanPhyNode
->
uid
,
pResBlock
,
pCols
,
tableIdList
,
pTaskInfo
,
pScanPhyNode
->
node
.
pConditions
,
pOperatorDumy
);
taosArrayDestroy
(
tableIdList
);
return
pOperator
;
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN
==
type
)
{
...
...
@@ -4632,7 +4640,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pDescNode
);
int32_t
numOfOutputCols
=
0
;
SArray
*
colList
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SArray
*
colList
=
extractColMatchInfo
(
pScanNode
->
pScanCols
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SOperatorInfo
*
pOperator
=
createSysTableScanOperatorInfo
(
pHandle
,
pResBlock
,
&
pScanNode
->
tableName
,
pScanNode
->
node
.
pConditions
,
pSysScanPhyNode
->
mgmtEpSet
,
colList
,
pTaskInfo
,
pSysScanPhyNode
->
showRewrite
,
pSysScanPhyNode
->
accountId
);
...
...
@@ -4654,8 +4663,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo
*
pExprInfo
=
createExprInfo
(
pScanPhyNode
->
pScanPseudoCols
,
NULL
,
&
num
);
int32_t
numOfOutputCols
=
0
;
SArray
*
colList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanPseudoCols
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SArray
*
colList
=
extractColMatchInfo
(
pScanPhyNode
->
pScanPseudoCols
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_COL_ID
);
SOperatorInfo
*
pOperator
=
createTagScanOperatorInfo
(
pHandle
,
pExprInfo
,
num
,
pResBlock
,
colList
,
pTableGroupInfo
,
pTaskInfo
);
...
...
@@ -4737,7 +4746,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SExprInfo
*
pExprInfo
=
createExprInfo
(
pSortPhyNode
->
pExprs
,
NULL
,
&
numOfCols
);
int32_t
numOfOutputCols
=
0
;
SArray
*
pColList
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_SLOT_ID
);
SArray
*
pColList
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
pTaskInfo
,
COL_MATCH_FROM_SLOT_ID
);
pOptr
=
createSortOperatorInfo
(
ops
[
0
],
pResBlock
,
info
,
pExprInfo
,
numOfCols
,
pColList
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW
==
type
)
{
...
...
@@ -5238,15 +5248,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
return
TSDB_CODE_SUCCESS
;
}
int32_t
initCatchSupporter
(
SCatchSupporter
*
pCatchSup
,
size_t
rowSize
,
size_t
keyBufSize
,
const
char
*
pKey
,
const
char
*
pDir
)
{
int32_t
initCatchSupporter
(
SCatchSupporter
*
pCatchSup
,
size_t
rowSize
,
size_t
keyBufSize
,
const
char
*
pKey
,
const
char
*
pDir
)
{
pCatchSup
->
keySize
=
sizeof
(
int64_t
)
+
sizeof
(
int64_t
)
+
sizeof
(
TSKEY
);
pCatchSup
->
pKeyBuf
=
taosMemoryCalloc
(
1
,
pCatchSup
->
keySize
);
int32_t
pageSize
=
rowSize
*
32
;
int32_t
bufSize
=
pageSize
*
4096
;
createDiskbasedBuf
(
&
pCatchSup
->
pDataBuf
,
pageSize
,
bufSize
,
pKey
,
pDir
);
_hash_fn_t
hashFn
=
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BINARY
);
pCatchSup
->
pWindowHashTable
=
taosHashInit
(
10000
,
hashFn
,
true
,
HASH_NO_LOCK
);;
pCatchSup
->
pWindowHashTable
=
taosHashInit
(
10000
,
hashFn
,
true
,
HASH_NO_LOCK
);
;
return
TSDB_CODE_SUCCESS
;
}
source/libs/function/inc/functionMgtInt.h
浏览文件 @
4364e7d4
...
...
@@ -28,7 +28,7 @@ extern "C" {
#define FUNC_MGT_AGG_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(0)
#define FUNC_MGT_SCALAR_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(1)
#define FUNC_MGT_
NONSTANDARD_SQL
_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
#define FUNC_MGT_
INDEFINITE_ROWS
_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(2)
#define FUNC_MGT_STRING_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(3)
#define FUNC_MGT_DATETIME_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(4)
#define FUNC_MGT_TIMELINE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(5)
...
...
source/libs/function/src/builtins.c
浏览文件 @
4364e7d4
...
...
@@ -837,7 +837,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"top"
,
.
type
=
FUNCTION_TYPE_TOP
,
.
classification
=
FUNC_MGT_
AGG_FUNC
|
FUNC_MGT_SELECT
_FUNC
,
.
classification
=
FUNC_MGT_
SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS
_FUNC
,
.
translateFunc
=
translateTop
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -847,7 +847,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"bottom"
,
.
type
=
FUNCTION_TYPE_BOTTOM
,
.
classification
=
FUNC_MGT_
AGG_FUNC
|
FUNC_MGT_SELECT
_FUNC
,
.
classification
=
FUNC_MGT_
SELECT_FUNC
|
FUNC_MGT_INDEFINITE_ROWS
_FUNC
,
.
translateFunc
=
translateBottom
,
.
getEnvFunc
=
getTopBotFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -929,7 +929,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"diff"
,
.
type
=
FUNCTION_TYPE_DIFF
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateDiff
,
.
getEnvFunc
=
getDiffFuncEnv
,
.
initFunc
=
diffFunctionSetup
,
...
...
@@ -939,7 +939,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"state_count"
,
.
type
=
FUNCTION_TYPE_STATE_COUNT
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
,
.
translateFunc
=
translateStateCount
,
.
getEnvFunc
=
getStateFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -949,7 +949,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"state_duration"
,
.
type
=
FUNCTION_TYPE_STATE_DURATION
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateStateDuration
,
.
getEnvFunc
=
getStateFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -959,7 +959,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"csum"
,
.
type
=
FUNCTION_TYPE_CSUM
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateCsum
,
.
getEnvFunc
=
getCsumFuncEnv
,
.
initFunc
=
functionSetup
,
...
...
@@ -969,7 +969,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"mavg"
,
.
type
=
FUNCTION_TYPE_MAVG
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateMavg
,
.
getEnvFunc
=
getMavgFuncEnv
,
.
initFunc
=
mavgFunctionSetup
,
...
...
@@ -979,7 +979,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"sample"
,
.
type
=
FUNCTION_TYPE_SAMPLE
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateSample
,
.
getEnvFunc
=
getSampleFuncEnv
,
.
initFunc
=
sampleFunctionSetup
,
...
...
@@ -989,7 +989,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"tail"
,
.
type
=
FUNCTION_TYPE_TAIL
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateTail
,
.
getEnvFunc
=
getTailFuncEnv
,
.
initFunc
=
tailFunctionSetup
,
...
...
@@ -999,7 +999,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.
name
=
"unique"
,
.
type
=
FUNCTION_TYPE_UNIQUE
,
.
classification
=
FUNC_MGT_
NONSTANDARD_SQL
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
classification
=
FUNC_MGT_
INDEFINITE_ROWS
_FUNC
|
FUNC_MGT_TIMELINE_FUNC
,
.
translateFunc
=
translateUnique
,
.
getEnvFunc
=
getUniqueFuncEnv
,
.
initFunc
=
uniqueFunctionSetup
,
...
...
source/libs/function/src/functionMgt.c
浏览文件 @
4364e7d4
...
...
@@ -145,6 +145,8 @@ bool fmIsAggFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MG
bool
fmIsScalarFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SCALAR_FUNC
);
}
bool
fmIsVectorFunc
(
int32_t
funcId
)
{
return
!
fmIsScalarFunc
(
funcId
);
}
bool
fmIsSelectFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SELECT_FUNC
);
}
bool
fmIsTimelineFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_TIMELINE_FUNC
);
}
...
...
@@ -157,7 +159,7 @@ bool fmIsWindowPseudoColumnFunc(int32_t funcId) { return isSpecificClassifyFunc(
bool
fmIsWindowClauseFunc
(
int32_t
funcId
)
{
return
fmIsAggFunc
(
funcId
)
||
fmIsWindowPseudoColumnFunc
(
funcId
);
}
bool
fmIs
NonstandardSQLFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_NONSTANDARD_SQL
_FUNC
);
}
bool
fmIs
IndefiniteRowsFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_INDEFINITE_ROWS
_FUNC
);
}
bool
fmIsSpecialDataRequiredFunc
(
int32_t
funcId
)
{
return
isSpecificClassifyFunc
(
funcId
,
FUNC_MGT_SPECIAL_DATA_REQUIRED
);
...
...
source/libs/function/src/udfd.c
浏览文件 @
4364e7d4
...
...
@@ -138,6 +138,7 @@ static void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf);
static
int32_t
udfdUvInit
();
static
void
udfdCloseWalkCb
(
uv_handle_t
*
handle
,
void
*
arg
);
static
int32_t
udfdRun
();
static
void
udfdConnectMnodeThreadFunc
(
void
*
args
);
void
udfdProcessRequest
(
uv_work_t
*
req
)
{
SUvUdfWork
*
uvUdf
=
(
SUvUdfWork
*
)(
req
->
data
);
...
...
@@ -870,6 +871,23 @@ static int32_t udfdRun() {
return
0
;
}
void
udfdConnectMnodeThreadFunc
(
void
*
args
)
{
int32_t
retryMnodeTimes
=
0
;
int32_t
code
=
0
;
while
(
retryMnodeTimes
++
<=
TSDB_MAX_REPLICA
)
{
uv_sleep
(
100
*
(
1
<<
retryMnodeTimes
));
code
=
udfdConnectToMnode
();
if
(
code
==
0
)
{
break
;
}
fnError
(
"udfd can not connect to mnode, code: %s. retry"
,
tstrerror
(
code
));
}
if
(
code
!=
0
)
{
fnError
(
"udfd can not connect to mnode"
);
}
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
!
taosCheckSystemIsSmallEnd
())
{
printf
(
"failed to start since on non-small-end machines
\n
"
);
...
...
@@ -902,30 +920,18 @@ int main(int argc, char *argv[]) {
return
-
3
;
}
int32_t
retryMnodeTimes
=
0
;
int32_t
code
=
0
;
while
(
retryMnodeTimes
++
<=
TSDB_MAX_REPLICA
)
{
uv_sleep
(
100
*
(
1
<<
retryMnodeTimes
));
code
=
udfdConnectToMnode
();
if
(
code
==
0
)
{
break
;
}
fnError
(
"can not connect to mnode, code: %s. retry"
,
tstrerror
(
code
));
}
if
(
code
!=
0
)
{
fnError
(
"failed to start since can not connect to mnode"
);
return
-
4
;
}
if
(
udfdUvInit
()
!=
0
)
{
fnError
(
"uv init failure"
);
return
-
5
;
}
uv_thread_t
mnodeConnectThread
;
uv_thread_create
(
&
mnodeConnectThread
,
udfdConnectMnodeThreadFunc
,
NULL
);
udfdRun
();
removeListeningPipe
();
uv_thread_join
(
&
mnodeConnectThread
);
udfdCloseClientRpc
();
return
0
;
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
4364e7d4
...
...
@@ -492,6 +492,7 @@ static const char* jkScanLogicPlanScanCols = "ScanCols";
static
const
char
*
jkScanLogicPlanScanPseudoCols
=
"ScanPseudoCols"
;
static
const
char
*
jkScanLogicPlanTableMetaSize
=
"TableMetaSize"
;
static
const
char
*
jkScanLogicPlanTableMeta
=
"TableMeta"
;
static
const
char
*
jkScanLogicPlanTagCond
=
"TagCond"
;
static
int32_t
logicScanNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SScanLogicNode
*
pNode
=
(
const
SScanLogicNode
*
)
pObj
;
...
...
@@ -509,6 +510,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkScanLogicPlanTableMeta
,
tableMetaToJson
,
pNode
->
pMeta
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkScanLogicPlanTagCond
,
nodeToJson
,
pNode
->
pTagCond
);
}
return
code
;
}
...
...
@@ -530,6 +534,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonMakeObject
(
pJson
,
jkScanLogicPlanTableMeta
,
jsonToTableMeta
,
(
void
**
)
&
pNode
->
pMeta
,
objSize
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkScanLogicPlanTagCond
,
&
pNode
->
pTagCond
);
}
return
code
;
}
...
...
@@ -1777,6 +1784,7 @@ static const char* jkSubplanDbFName = "DbFName";
static
const
char
*
jkSubplanNodeAddr
=
"NodeAddr"
;
static
const
char
*
jkSubplanRootNode
=
"RootNode"
;
static
const
char
*
jkSubplanDataSink
=
"DataSink"
;
static
const
char
*
jkSubplanTagCond
=
"TagCond"
;
static
int32_t
subplanToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SSubplan
*
pNode
=
(
const
SSubplan
*
)
pObj
;
...
...
@@ -1803,6 +1811,9 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkSubplanDataSink
,
nodeToJson
,
pNode
->
pDataSink
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkSubplanTagCond
,
nodeToJson
,
pNode
->
pTagCond
);
}
return
code
;
}
...
...
@@ -1833,6 +1844,9 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkSubplanDataSink
,
(
SNode
**
)
&
pNode
->
pDataSink
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkSubplanTagCond
,
(
SNode
**
)
&
pNode
->
pTagCond
);
}
return
code
;
}
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
4364e7d4
...
...
@@ -342,26 +342,20 @@ SNode* createLogicConditionNode(SAstCreateContext* pCxt, ELogicConditionType typ
CHECK_OUT_OF_MEM
(
cond
);
cond
->
condType
=
type
;
cond
->
pParameterList
=
nodesMakeList
();
if
((
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam1
)
&&
type
!=
((
SLogicConditionNode
*
)
pParam1
)
->
condType
)
||
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam2
)
&&
type
!=
((
SLogicConditionNode
*
)
pParam2
)
->
condType
))
{
nodesListAppend
(
cond
->
pParameterList
,
pParam1
);
nodesListAppend
(
cond
->
pParameterList
,
pParam2
);
}
else
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam1
))
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam1
)
&&
type
==
((
SLogicConditionNode
*
)
pParam1
)
->
condType
)
{
nodesListAppendList
(
cond
->
pParameterList
,
((
SLogicConditionNode
*
)
pParam1
)
->
pParameterList
);
((
SLogicConditionNode
*
)
pParam1
)
->
pParameterList
=
NULL
;
nodesDestroyNode
(
pParam1
);
}
else
{
nodesListAppend
(
cond
->
pParameterList
,
pParam1
);
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam2
)
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pParam2
)
&&
type
==
((
SLogicConditionNode
*
)
pParam2
)
->
condType
)
{
nodesListAppendList
(
cond
->
pParameterList
,
((
SLogicConditionNode
*
)
pParam2
)
->
pParameterList
);
((
SLogicConditionNode
*
)
pParam2
)
->
pParameterList
=
NULL
;
nodesDestroyNode
(
pParam2
);
}
else
{
nodesListAppend
(
cond
->
pParameterList
,
pParam2
);
}
}
return
(
SNode
*
)
cond
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
4364e7d4
...
...
@@ -647,7 +647,7 @@ static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, in
if
(
TSDB_DATA_TYPE_BINARY
==
pa
->
schema
->
type
)
{
const
char
*
rowEnd
=
tdRowEnd
(
rb
->
pBuf
);
STR_WITH_SIZE_TO_VARSTR
(
rowEnd
,
value
,
len
);
tdAppendColValToRow
(
rb
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
TD_VTYPE_NORM
,
rowEnd
,
tru
e
,
pa
->
toffset
,
pa
->
colIdx
);
tdAppendColValToRow
(
rb
,
pa
->
schema
->
colId
,
pa
->
schema
->
type
,
TD_VTYPE_NORM
,
rowEnd
,
fals
e
,
pa
->
toffset
,
pa
->
colIdx
);
}
else
if
(
TSDB_DATA_TYPE_NCHAR
==
pa
->
schema
->
type
)
{
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t
output
=
0
;
...
...
source/libs/planner/CMakeLists.txt
浏览文件 @
4364e7d4
...
...
@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries
(
planner
PRIVATE os util nodes catalog cjson parser function qcom scalar
PRIVATE os util nodes catalog cjson parser function qcom scalar
index
PUBLIC transport
)
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
4364e7d4
...
...
@@ -418,7 +418,7 @@ static SColumnNode* createColumnByExpr(const char* pStmtName, SExprNode* pExpr)
}
static
int32_t
createAggLogicNode
(
SLogicPlanContext
*
pCxt
,
SSelectStmt
*
pSelect
,
SLogicNode
**
pLogicNode
)
{
if
(
!
pSelect
->
hasAggFuncs
&&
NULL
==
pSelect
->
pGroupByList
)
{
if
(
!
pSelect
->
hasAggFuncs
&&
!
pSelect
->
hasIndefiniteRowsFunc
&&
NULL
==
pSelect
->
pGroupByList
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -442,8 +442,8 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
code
=
rewriteExprForSelect
(
pAgg
->
pGroupKeys
,
pSelect
,
SQL_CLAUSE_GROUP_BY
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
pSelect
->
hasAggFuncs
)
{
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_GROUP_BY
,
fmIs
Agg
Func
,
&
pAgg
->
pAggFuncs
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
(
pSelect
->
hasAggFuncs
||
pSelect
->
hasIndefiniteRowsFunc
)
)
{
code
=
nodesCollectFuncs
(
pSelect
,
SQL_CLAUSE_GROUP_BY
,
fmIs
Vector
Func
,
&
pAgg
->
pAggFuncs
);
}
// rewrite the expression in subsequent clauses
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
4364e7d4
...
...
@@ -15,6 +15,7 @@
#include "filter.h"
#include "functionMgt.h"
#include "index.h"
#include "planInt.h"
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
...
...
@@ -313,22 +314,53 @@ static EDealRes cpdIsPrimaryKeyCondImpl(SNode* pNode, void* pContext) {
}
static
bool
cpdIsPrimaryKeyCond
(
SNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pNode
))
{
return
false
;
}
bool
isPrimaryKeyCond
=
false
;
nodesWalkExpr
(
pNode
,
cpdIsPrimaryKeyCondImpl
,
&
isPrimaryKeyCond
);
return
isPrimaryKeyCond
;
}
static
int32_t
cpdPartitionScanLogicCond
(
SScanLogicNode
*
pScan
,
SNode
**
pPrimaryKeyCond
,
SNode
**
pOtherCond
)
{
static
EDealRes
cpdIsTagCondImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
*
((
bool
*
)
pContext
)
=
((
COLUMN_TYPE_TAG
==
((
SColumnNode
*
)
pNode
)
->
colType
)
?
true
:
false
);
return
*
((
bool
*
)
pContext
)
?
DEAL_RES_CONTINUE
:
DEAL_RES_END
;
}
return
DEAL_RES_CONTINUE
;
}
static
bool
cpdIsTagCond
(
SNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pNode
))
{
return
false
;
}
bool
isTagCond
=
false
;
nodesWalkExpr
(
pNode
,
cpdIsTagCondImpl
,
&
isTagCond
);
return
isTagCond
;
}
static
int32_t
cpdPartitionScanLogicCond
(
SScanLogicNode
*
pScan
,
SNode
**
pPrimaryKeyCond
,
SNode
**
pTagCond
,
SNode
**
pOtherCond
)
{
SLogicConditionNode
*
pLogicCond
=
(
SLogicConditionNode
*
)
pScan
->
node
.
pConditions
;
if
(
LOGIC_COND_TYPE_AND
!=
pLogicCond
->
condType
)
{
*
pPrimaryKeyCond
=
NULL
;
*
pOtherCond
=
pScan
->
node
.
pConditions
;
pScan
->
node
.
pConditions
=
NULL
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
SNodeList
*
pPrimaryKeyConds
=
NULL
;
SNodeList
*
pTagConds
=
NULL
;
SNodeList
*
pOtherConds
=
NULL
;
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pLogicCond
->
pParameterList
)
{
if
(
cpdIsPrimaryKeyCond
(
pCond
))
{
code
=
nodesListMakeAppend
(
&
pPrimaryKeyConds
,
nodesCloneNode
(
pCond
));
}
else
if
(
cpdIsTagCond
(
pScan
->
node
.
pConditions
))
{
code
=
nodesListMakeAppend
(
&
pTagConds
,
nodesCloneNode
(
pCond
));
}
else
{
code
=
nodesListMakeAppend
(
&
pOtherConds
,
nodesCloneNode
(
pCond
));
}
...
...
@@ -338,37 +370,46 @@ static int32_t cpdPartitionScanLogicCond(SScanLogicNode* pScan, SNode** pPrimary
}
SNode
*
pTempPrimaryKeyCond
=
NULL
;
SNode
*
pTempTagCond
=
NULL
;
SNode
*
pTempOtherCond
=
NULL
;
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMergeConds
(
&
pTempPrimaryKeyCond
,
&
pPrimaryKeyConds
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMergeConds
(
&
pTempTagCond
,
&
pTagConds
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
cpdMergeConds
(
&
pTempOtherCond
,
&
pOtherConds
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
*
pPrimaryKeyCond
=
pTempPrimaryKeyCond
;
*
pTagCond
=
pTempTagCond
;
*
pOtherCond
=
pTempOtherCond
;
nodesDestroyNode
(
pScan
->
node
.
pConditions
);
pScan
->
node
.
pConditions
=
NULL
;
}
else
{
nodesDestroyList
(
pPrimaryKeyConds
);
nodesDestroyList
(
pTagConds
);
nodesDestroyList
(
pOtherConds
);
nodesDestroyNode
(
pTempPrimaryKeyCond
);
nodesDestroyNode
(
pTempTagCond
);
nodesDestroyNode
(
pTempOtherCond
);
}
return
code
;
}
static
int32_t
cpdPartitionScanCond
(
SScanLogicNode
*
pScan
,
SNode
**
pPrimaryKeyCond
,
SNode
**
p
OtherCond
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pScan
->
node
.
pConditions
)
&&
LOGIC_COND_TYPE_AND
==
((
SLogicConditionNode
*
)
pScan
->
node
.
pConditions
)
->
condType
)
{
return
cpdPartitionScanLogicCond
(
pScan
,
pPrimaryKeyCond
,
pOtherCond
);
static
int32_t
cpdPartitionScanCond
(
SScanLogicNode
*
pScan
,
SNode
**
pPrimaryKeyCond
,
SNode
**
p
TagCond
,
SNode
**
pOtherCond
)
{
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pScan
->
node
.
pConditions
)
)
{
return
cpdPartitionScanLogicCond
(
pScan
,
pPrimaryKeyCond
,
p
TagCond
,
p
OtherCond
);
}
if
(
cpdIsPrimaryKeyCond
(
pScan
->
node
.
pConditions
))
{
*
pPrimaryKeyCond
=
pScan
->
node
.
pConditions
;
}
else
if
(
cpdIsTagCond
(
pScan
->
node
.
pConditions
))
{
*
pTagCond
=
pScan
->
node
.
pConditions
;
}
else
{
*
pOtherCond
=
pScan
->
node
.
pConditions
;
}
...
...
@@ -391,6 +432,32 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
return
code
;
}
static
int32_t
cpdApplyTagIndex
(
SScanLogicNode
*
pScan
,
SNode
**
pTagCond
,
SNode
**
pOtherCond
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
SIdxFltStatus
idxStatus
=
idxGetFltStatus
(
*
pTagCond
);
switch
(
idxStatus
)
{
case
SFLT_NOT_INDEX
:
code
=
cpdCondAppend
(
pOtherCond
,
pTagCond
);
break
;
case
SFLT_COARSE_INDEX
:
pScan
->
pTagCond
=
nodesCloneNode
(
*
pTagCond
);
if
(
NULL
==
pScan
->
pTagCond
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
break
;
}
code
=
cpdCondAppend
(
pOtherCond
,
pTagCond
);
break
;
case
SFLT_ACCURATE_INDEX
:
pScan
->
pTagCond
=
*
pTagCond
;
*
pTagCond
=
NULL
;
break
;
default:
code
=
TSDB_CODE_FAILED
;
break
;
}
return
code
;
}
static
int32_t
cpdOptimizeScanCondition
(
SOptimizeContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
if
(
NULL
==
pScan
->
node
.
pConditions
||
OPTIMIZE_FLAG_TEST_MASK
(
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
)
||
TSDB_SYSTEM_TABLE
==
pScan
->
pMeta
->
tableType
)
{
...
...
@@ -398,11 +465,15 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
}
SNode
*
pPrimaryKeyCond
=
NULL
;
SNode
*
pTagCond
=
NULL
;
SNode
*
pOtherCond
=
NULL
;
int32_t
code
=
cpdPartitionScanCond
(
pScan
,
&
pPrimaryKeyCond
,
&
pOtherCond
);
int32_t
code
=
cpdPartitionScanCond
(
pScan
,
&
pPrimaryKeyCond
,
&
p
TagCond
,
&
p
OtherCond
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pPrimaryKeyCond
)
{
code
=
cpdCalcTimeRange
(
pScan
,
&
pPrimaryKeyCond
,
&
pOtherCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pTagCond
)
{
code
=
cpdApplyTagIndex
(
pScan
,
&
pTagCond
,
&
pOtherCond
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScan
->
node
.
pConditions
=
pOtherCond
;
}
...
...
@@ -618,30 +689,6 @@ static bool cpdContainPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) {
}
}
// static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) {
// if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// return TSDB_CODE_SUCCESS;
// }
// static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) {
// if (LOGIC_COND_TYPE_AND != pOnCond->condType) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// bool hasPrimaryKeyEqualCond = false;
// SNode* pCond = NULL;
// FOREACH(pCond, pOnCond->pParameterList) {
// if (cpdIsPrimaryKeyEqualCond(pJoin, pCond)) {
// hasPrimaryKeyEqualCond = true;
// }
// }
// if (!hasPrimaryKeyEqualCond) {
// return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
// }
// return TSDB_CODE_SUCCESS;
// }
static
int32_t
cpdCheckJoinOnCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
pOnConditions
)
{
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN
);
...
...
@@ -650,11 +697,6 @@ static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin)
return
generateUsageErrMsg
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
TSDB_CODE_PLAN_EXPECTED_TS_EQUAL
);
}
return
TSDB_CODE_SUCCESS
;
// if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) {
// return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions);
// } else {
// return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions);
// }
}
static
int32_t
cpdPushJoinCondition
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
4364e7d4
...
...
@@ -411,7 +411,7 @@ static int32_t createScanCols(SPhysiPlanContext* pCxt, SScanPhysiNode* pScanPhys
return
sortScanCols
(
pScanPhysiNode
->
pScanCols
);
}
static
int32_t
createScanPhysiNodeFinalize
(
SPhysiPlanContext
*
pCxt
,
SScanLogicNode
*
pScanLogicNode
,
static
int32_t
createScanPhysiNodeFinalize
(
SPhysiPlanContext
*
pCxt
,
SS
ubplan
*
pSubplan
,
SS
canLogicNode
*
pScanLogicNode
,
SScanPhysiNode
*
pScanPhysiNode
,
SPhysiNode
**
pPhyNode
)
{
int32_t
code
=
createScanCols
(
pCxt
,
pScanPhysiNode
,
pScanLogicNode
->
pScanCols
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -438,6 +438,12 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
uid
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
memcpy
(
&
pScanPhysiNode
->
tableName
,
&
pScanLogicNode
->
tableName
,
sizeof
(
SName
));
if
(
NULL
!=
pScanLogicNode
->
pTagCond
)
{
pSubplan
->
pTagCond
=
nodesCloneNode
(
pScanLogicNode
->
pTagCond
);
if
(
NULL
==
pSubplan
->
pTagCond
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
@@ -463,7 +469,7 @@ static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubpla
}
vgroupInfoToNodeAddr
(
pScanLogicNode
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
taosArrayPush
(
pCxt
->
pExecNodeList
,
&
pSubplan
->
execNode
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTagScan
,
pPhyNode
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pS
ubplan
,
pS
canLogicNode
,
(
SScanPhysiNode
*
)
pTagScan
,
pPhyNode
);
}
static
int32_t
createTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
...
...
@@ -498,7 +504,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan
->
intervalUnit
=
pScanLogicNode
->
intervalUnit
;
pTableScan
->
slidingUnit
=
pScanLogicNode
->
slidingUnit
;
return
createScanPhysiNodeFinalize
(
pCxt
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pTableScan
,
pPhyNode
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pS
ubplan
,
pS
canLogicNode
,
(
SScanPhysiNode
*
)
pTableScan
,
pPhyNode
);
}
static
int32_t
createSystemTableScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
...
...
@@ -522,7 +528,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
pScan
->
mgmtEpSet
=
pCxt
->
pPlanCxt
->
mgmtEpSet
;
tNameGetFullDbName
(
&
pScanLogicNode
->
tableName
,
pSubplan
->
dbFName
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pScanLogicNode
,
(
SScanPhysiNode
*
)
pScan
,
pPhyNode
);
return
createScanPhysiNodeFinalize
(
pCxt
,
pS
ubplan
,
pS
canLogicNode
,
(
SScanPhysiNode
*
)
pScan
,
pPhyNode
);
}
static
int32_t
createStreamScanPhysiNode
(
SPhysiPlanContext
*
pCxt
,
SSubplan
*
pSubplan
,
SScanLogicNode
*
pScanLogicNode
,
...
...
source/libs/planner/src/planner.c
浏览文件 @
4364e7d4
...
...
@@ -18,6 +18,13 @@
#include "planInt.h"
#include "scalar.h"
static
void
dumpQueryPlan
(
SQueryPlan
*
pPlan
)
{
char
*
pStr
=
NULL
;
nodesNodeToString
(
pPlan
,
false
,
&
pStr
,
NULL
);
planDebugL
(
"Query Plan: %s"
,
pStr
);
taosMemoryFree
(
pStr
);
}
int32_t
qCreateQueryPlan
(
SPlanContext
*
pCxt
,
SQueryPlan
**
pPlan
,
SArray
*
pExecNodeList
)
{
SLogicNode
*
pLogicNode
=
NULL
;
SLogicSubplan
*
pLogicSubplan
=
NULL
;
...
...
@@ -36,6 +43,9 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
createPhysiPlan
(
pCxt
,
pLogicPlan
,
pPlan
,
pExecNodeList
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
dumpQueryPlan
(
*
pPlan
);
}
nodesDestroyNode
(
pLogicNode
);
nodesDestroyNode
(
pLogicSubplan
);
...
...
source/libs/planner/test/planOptimizeTest.cpp
浏览文件 @
4364e7d4
...
...
@@ -32,6 +32,12 @@ TEST_F(PlanOptimizeTest, optimizeScanData) {
run
(
"SELECT PERCENTILE(c1, 40), COUNT(*) FROM t1"
);
}
TEST_F
(
PlanOptimizeTest
,
ConditionPushDown
)
{
useDb
(
"root"
,
"test"
);
run
(
"SELECT ts, c1 FROM st1 WHERE tag1 > 4"
);
}
TEST_F
(
PlanOptimizeTest
,
orderByPrimaryKey
)
{
useDb
(
"root"
,
"test"
);
...
...
source/libs/planner/test/planTestUtil.cpp
浏览文件 @
4364e7d4
...
...
@@ -233,45 +233,45 @@ class PlannerTestBaseImpl {
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_PARSER
==
module
)
{
if
(
res_
.
prepareAst_
.
empty
())
{
cout
<<
"syntax tree : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
syntax tree : "
<<
endl
;
cout
<<
res_
.
ast_
<<
endl
;
}
else
{
cout
<<
"prepare syntax tree : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
prepare syntax tree : "
<<
endl
;
cout
<<
res_
.
prepareAst_
<<
endl
;
cout
<<
"bound syntax tree : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
bound syntax tree : "
<<
endl
;
cout
<<
res_
.
boundAst_
<<
endl
;
cout
<<
"syntax tree : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
syntax tree : "
<<
endl
;
cout
<<
res_
.
ast_
<<
endl
;
}
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_LOGIC
==
module
)
{
cout
<<
"raw logic plan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
raw logic plan : "
<<
endl
;
cout
<<
res_
.
rawLogicPlan_
<<
endl
;
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_OPTIMIZED
==
module
)
{
cout
<<
"optimized logic plan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
optimized logic plan : "
<<
endl
;
cout
<<
res_
.
optimizedLogicPlan_
<<
endl
;
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_SPLIT
==
module
)
{
cout
<<
"split logic plan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
split logic plan : "
<<
endl
;
cout
<<
res_
.
splitLogicPlan_
<<
endl
;
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_SCALED
==
module
)
{
cout
<<
"scaled logic plan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
scaled logic plan : "
<<
endl
;
cout
<<
res_
.
scaledLogicPlan_
<<
endl
;
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_PHYSICAL
==
module
)
{
cout
<<
"physical plan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
physical plan : "
<<
endl
;
cout
<<
res_
.
physiPlan_
<<
endl
;
}
if
(
DUMP_MODULE_ALL
==
module
||
DUMP_MODULE_SUBPLAN
==
module
)
{
cout
<<
"physical subplan : "
<<
endl
;
cout
<<
"
+++++++++++++++++++++
physical subplan : "
<<
endl
;
for
(
const
auto
&
subplan
:
res_
.
physiSubplans_
)
{
cout
<<
subplan
<<
endl
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录