Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e778e824
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e778e824
编写于
4月 24, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/tq
上级
631d3ede
6d8566ca
变更
24
隐藏空白更改
内联
并排
Showing
24 changed file
with
626 addition
and
234 deletion
+626
-234
cmake/cmake.define
cmake/cmake.define
+1
-1
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+2
-5
include/libs/planner/planner.h
include/libs/planner/planner.h
+2
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+3
-1
source/dnode/mgmt/implement/src/dmHandle.c
source/dnode/mgmt/implement/src/dmHandle.c
+3
-3
source/dnode/mgmt/implement/src/dmTransport.c
source/dnode/mgmt/implement/src/dmTransport.c
+6
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+0
-27
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+8
-4
source/libs/function/inc/builtinsimpl.h
source/libs/function/inc/builtinsimpl.h
+5
-0
source/libs/function/src/builtins.c
source/libs/function/src/builtins.c
+11
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+253
-97
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+10
-24
source/libs/parser/inc/sql.y
source/libs/parser/inc/sql.y
+1
-0
source/libs/parser/src/parCalcConst.c
source/libs/parser/src/parCalcConst.c
+1
-1
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+45
-20
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-0
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+2
-1
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+221
-30
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+1
-4
source/libs/planner/test/planOptTest.cpp
source/libs/planner/test/planOptTest.cpp
+32
-0
source/libs/scalar/src/sclfunc.c
source/libs/scalar/src/sclfunc.c
+11
-9
tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
+1
-1
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+4
-4
未找到文件。
cmake/cmake.define
浏览文件 @
e778e824
cmake_minimum_required(VERSION 3.16)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
#set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
...
...
include/libs/nodes/plannodes.h
浏览文件 @
e778e824
...
...
@@ -46,7 +46,7 @@ typedef struct SScanLogicNode {
struct
STableMeta
*
pMeta
;
SVgroupsInfo
*
pVgroupList
;
EScanType
scanType
;
uint8_t
scan
Flag
;
// denotes reversed scan of data or no
t
uint8_t
scan
Seq
[
2
];
// first is scan count, and second is reverse scan coun
t
STimeWindow
scanRange
;
SName
tableName
;
bool
showRewrite
;
...
...
@@ -189,9 +189,6 @@ typedef struct SScanPhysiNode {
SNodeList
*
pScanCols
;
uint64_t
uid
;
// unique id of the table
int8_t
tableType
;
int32_t
order
;
// scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC
int32_t
count
;
// repeat count
int32_t
reverse
;
// reverse scan count
SName
tableName
;
}
SScanPhysiNode
;
...
...
@@ -207,7 +204,7 @@ typedef struct SSystemTableScanPhysiNode {
typedef
struct
STableScanPhysiNode
{
SScanPhysiNode
scan
;
uint8_t
scan
Flag
;
// denotes reversed scan of data or no
t
uint8_t
scan
Seq
[
2
];
// first is scan count, and second is reverse scan coun
t
STimeWindow
scanRange
;
double
ratio
;
int32_t
dataRequired
;
...
...
include/libs/planner/planner.h
浏览文件 @
e778e824
...
...
@@ -37,6 +37,8 @@ typedef struct SPlanContext {
bool
isStmtQuery
;
void
*
pTransporter
;
struct
SCatalog
*
pCatalog
;
char
*
pMsg
;
int32_t
msgLen
;
}
SPlanContext
;
// Create the physical plan for the query, according to the AST.
...
...
include/util/taoserror.h
浏览文件 @
e778e824
...
...
@@ -623,6 +623,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636)
#define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637)
#define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638)
#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...
...
source/client/src/clientImpl.c
浏览文件 @
e778e824
...
...
@@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
.
mgmtEpSet
=
getEpSet_s
(
&
pRequest
->
pTscObj
->
pAppInfo
->
mgmtEp
),
.
pAstRoot
=
pQuery
->
pRoot
,
.
showRewrite
=
pQuery
->
showRewrite
,
.
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
.
pTransporter
=
pRequest
->
pTscObj
->
pAppInfo
->
pTransporter
,
.
pMsg
=
pRequest
->
msgBuf
,
.
msgLen
=
ERROR_MSG_BUF_DEFAULT_SIZE
};
int32_t
code
=
catalogGetHandle
(
pRequest
->
pTscObj
->
pAppInfo
->
clusterId
,
&
cxt
.
pCatalog
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/dnode/mgmt/implement/src/dmHandle.c
浏览文件 @
e778e824
...
...
@@ -146,10 +146,10 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs
dError
(
"node:%s, failed to create since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
dDebug
(
"node:%s, has been created"
,
pWrapper
->
name
);
(
void
)
dmOpenNode
(
pWrapper
);
pWrapper
->
required
=
true
;
pWrapper
->
deployed
=
true
;
pWrapper
->
procType
=
pDnode
->
ptype
;
(
void
)
dmOpenNode
(
pWrapper
);
}
taosThreadMutexUnlock
(
&
pDnode
->
mutex
);
...
...
@@ -171,13 +171,13 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg)
dError
(
"node:%s, failed to drop since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
dDebug
(
"node:%s, has been dropped"
,
pWrapper
->
name
);
pWrapper
->
required
=
false
;
pWrapper
->
deployed
=
false
;
}
dmReleaseWrapper
(
pWrapper
);
if
(
code
==
0
)
{
pWrapper
->
required
=
false
;
pWrapper
->
deployed
=
false
;
dmCloseNode
(
pWrapper
);
taosRemoveDir
(
pWrapper
->
path
);
}
...
...
source/dnode/mgmt/implement/src/dmTransport.c
浏览文件 @
e778e824
...
...
@@ -71,12 +71,15 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe
SNodeMsg
*
pMsg
=
NULL
;
NodeMsgFp
msgFp
=
NULL
;
uint16_t
msgType
=
pRpc
->
msgType
;
bool
needRelease
=
false
;
if
(
pEpSet
&&
pEpSet
->
numOfEps
>
0
&&
msgType
==
TDMT_MND_STATUS_RSP
)
{
dmSetMnodeEpSet
(
pWrapper
->
pDnode
,
pEpSet
);
}
if
(
dmMarkWrapper
(
pWrapper
)
!=
0
)
goto
_OVER
;
needRelease
=
true
;
if
((
msgFp
=
dmGetMsgFp
(
pWrapper
,
pRpc
))
==
NULL
)
goto
_OVER
;
if
((
pMsg
=
taosAllocateQitem
(
sizeof
(
SNodeMsg
)))
==
NULL
)
goto
_OVER
;
if
(
dmBuildMsg
(
pMsg
,
pRpc
)
!=
0
)
goto
_OVER
;
...
...
@@ -119,7 +122,9 @@ _OVER:
rpcFreeCont
(
pRpc
->
pCont
);
}
dmReleaseWrapper
(
pWrapper
);
if
(
needRelease
)
{
dmReleaseWrapper
(
pWrapper
);
}
}
static
void
dmProcessMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
...
...
source/libs/command/src/explain.c
浏览文件 @
e778e824
...
...
@@ -344,11 +344,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pTagScanNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pTagScanNode
->
count
);
if
(
pTagScanNode
->
reverse
)
{
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pTagScanNode
->
reverse
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -361,10 +356,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pTagScanNode
->
order
));
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendVerboseExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
if
(
tlen
)
{
...
...
@@ -388,11 +379,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pTblScanNode
->
scan
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pTblScanNode
->
scan
.
count
);
if
(
pTblScanNode
->
scan
.
reverse
)
{
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pTblScanNode
->
scan
.
reverse
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -405,10 +391,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pTblScanNode
->
scan
.
order
));
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_TIMERANGE_FORMAT
,
pTblScanNode
->
scanRange
.
skey
,
pTblScanNode
->
scanRange
.
ekey
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
...
...
@@ -434,11 +416,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pSTblScanNode
->
scan
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LOOPS_FORMAT
,
pSTblScanNode
->
scan
.
count
);
if
(
pSTblScanNode
->
scan
.
reverse
)
{
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_REVERSE_FORMAT
,
pSTblScanNode
->
scan
.
reverse
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -451,10 +428,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pSTblScanNode
->
scan
.
order
));
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
+
1
));
if
(
pSTblScanNode
->
scan
.
node
.
pConditions
)
{
EXPLAIN_ROW_NEW
(
level
+
1
,
EXPLAIN_FILTER_FORMAT
);
QRY_ERR_RET
(
nodesNodeToSQL
(
pSTblScanNode
->
scan
.
node
.
pConditions
,
tbuf
+
VARSTR_HEADER_SIZE
,
TSDB_EXPLAIN_RESULT_ROW_SIZE
,
&
tlen
));
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
e778e824
...
...
@@ -1121,6 +1121,10 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
// todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
if
(
createDummyCol
&&
pOneExpr
->
base
.
numOfParams
==
1
)
{
pInput
->
totalRows
=
pBlock
->
info
.
rows
;
pInput
->
numOfRows
=
pBlock
->
info
.
rows
;
pInput
->
startRowIndex
=
0
;
code
=
doCreateConstantValColumnInfo
(
pInput
,
pFuncParam
,
pFuncParam
->
param
.
nType
,
j
,
pBlock
->
info
.
rows
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
...
...
@@ -6571,9 +6575,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.
offset
=
pTableScanNode
->
offset
,
};
return
createTableScanOperatorInfo
(
pDataReader
,
p
ScanPhyNode
->
order
,
numOfCols
,
pTableScanNode
->
dataRequired
,
pScanPhyNode
->
count
,
pScanPhyNode
->
reverse
,
pColList
,
pResBlock
,
pScanPhyNode
->
node
.
pConditions
,
&
interval
,
pTableScanNode
->
ratio
,
pTaskInfo
);
return
createTableScanOperatorInfo
(
pDataReader
,
p
TableScanNode
->
scanSeq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
,
numOfCols
,
pTableScanNode
->
dataRequired
,
pTableScanNode
->
scanSeq
[
0
],
pTableScanNode
->
scanSeq
[
1
],
pColList
,
pResBlock
,
pScanPhyNode
->
node
.
pConditions
,
&
interval
,
pTableScanNode
->
ratio
,
pTaskInfo
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_EXCHANGE
==
type
)
{
SExchangePhysiNode
*
pExchange
=
(
SExchangePhysiNode
*
)
pPhyNode
;
SSDataBlock
*
pResBlock
=
createResDataBlock
(
pExchange
->
node
.
pOutputDataBlockDesc
);
...
...
@@ -6721,7 +6725,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa
void
*
readHandle
,
uint64_t
queryId
,
uint64_t
taskId
)
{
STsdbQueryCond
cond
=
{.
loadExternalRows
=
false
};
cond
.
order
=
pTableScanNode
->
scan
.
order
;
cond
.
order
=
pTableScanNode
->
scan
Seq
[
0
]
>
0
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
cond
.
numOfCols
=
LIST_LENGTH
(
pTableScanNode
->
scan
.
pScanCols
);
cond
.
colList
=
taosMemoryCalloc
(
cond
.
numOfCols
,
sizeof
(
SColumnInfo
));
if
(
cond
.
colList
==
NULL
)
{
...
...
source/libs/function/inc/builtinsimpl.h
浏览文件 @
e778e824
...
...
@@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t
minFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
maxFunction
(
SqlFunctionCtx
*
pCtx
);
bool
getAvgFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
avgFunction
(
SqlFunctionCtx
*
pCtx
);
int32_t
avgFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
slotId
);
bool
getStddevFuncEnv
(
struct
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
);
bool
stddevFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
);
int32_t
stddevFunction
(
SqlFunctionCtx
*
pCtx
);
...
...
source/libs/function/src/builtins.c
浏览文件 @
e778e824
...
...
@@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
if
(
1
!=
LIST_LENGTH
(
pFunc
->
pParameterList
))
{
return
invaildFuncParaNumErrMsg
(
pErrBuf
,
len
,
pFunc
->
functionName
);
}
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
sizeof
(
int64_t
)
,
.
type
=
TSDB_DATA_TYPE_BIGINT
};
pFunc
->
node
.
resType
=
(
SDataType
){.
bytes
=
tDataTypes
[
TSDB_DATA_TYPE_BIGINT
].
bytes
,
.
type
=
TSDB_DATA_TYPE_BIGINT
};
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -479,6 +479,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.
processFunc
=
stddevFunction
,
.
finalizeFunc
=
stddevFinalize
},
{
.
name
=
"avg"
,
.
type
=
FUNCTION_TYPE_AVG
,
.
classification
=
FUNC_MGT_AGG_FUNC
,
.
translateFunc
=
translateInNumOutDou
,
.
getEnvFunc
=
getAvgFuncEnv
,
.
initFunc
=
avgFunctionSetup
,
.
processFunc
=
avgFunction
,
.
finalizeFunc
=
avgFinalize
},
{
.
name
=
"percentile"
,
.
type
=
FUNCTION_TYPE_PERCENTILE
,
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
e778e824
...
...
@@ -20,6 +20,59 @@
#include "tdatablock.h"
#include "tpercentile.h"
typedef
struct
SSumRes
{
union
{
int64_t
isum
;
uint64_t
usum
;
double
dsum
;
};
}
SSumRes
;
typedef
struct
SAvgRes
{
double
result
;
SSumRes
sum
;
int64_t
count
;
}
SAvgRes
;
typedef
struct
STopBotResItem
{
SVariant
v
;
uint64_t
uid
;
// it is a table uid, used to extract tag data during building of the final result for the tag data
struct
{
int32_t
pageId
;
int32_t
offset
;
}
tuplePos
;
// tuple data of this chosen row
}
STopBotResItem
;
typedef
struct
STopBotRes
{
int32_t
pageId
;
// int32_t num;
STopBotResItem
*
pItems
;
}
STopBotRes
;
typedef
struct
SStddevRes
{
double
result
;
int64_t
count
;
union
{
double
quadraticDSum
;
int64_t
quadraticISum
;};
union
{
double
dsum
;
int64_t
isum
;};
}
SStddevRes
;
typedef
struct
SPercentileInfo
{
double
result
;
tMemBucket
*
pMemBucket
;
int32_t
stage
;
double
minval
;
double
maxval
;
int64_t
numOfElems
;
}
SPercentileInfo
;
typedef
struct
SDiffInfo
{
bool
hasPrev
;
bool
includeNull
;
bool
ignoreNegative
;
bool
firstOutput
;
union
{
int64_t
i64
;
double
d64
;}
prev
;
}
SDiffInfo
;
#define SET_VAL(_info, numOfElem, res) \
do { \
if ((numOfElem) <= 0) { \
...
...
@@ -28,13 +81,50 @@
(_info)->numOfRes = (res); \
} while (0)
typedef
struct
SSumRes
{
union
{
int64_t
isum
;
uint64_t
usum
;
double
dsum
;
};
}
SSumRes
;
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
bool
functionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
pResultInfo
->
initialized
)
{
...
...
@@ -135,7 +225,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) {
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SSumRes
*
pSumRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
pInput
->
colDataAggIsSet
)
{
numOfElem
=
pInput
->
numOfRows
-
pAgg
->
numOfNull
;
ASSERT
(
numOfElem
>=
0
);
...
...
@@ -190,6 +280,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
bool
getAvgFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
double
);
return
true
;
}
bool
avgFunctionSetup
(
SqlFunctionCtx
*
pCtx
,
SResultRowEntryInfo
*
pResultInfo
)
{
if
(
!
functionSetup
(
pCtx
,
pResultInfo
))
{
return
false
;
}
SAvgRes
*
pRes
=
GET_ROWCELL_INTERBUF
(
pResultInfo
);
memset
(
pRes
,
0
,
sizeof
(
SAvgRes
));
return
true
;
}
int32_t
avgFunction
(
SqlFunctionCtx
*
pCtx
)
{
int32_t
numOfElem
=
0
;
// Only the pre-computing information loaded and actual data does not loaded
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SAvgRes
*
pAvgRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
// computing based on the true data block
SColumnInfoData
*
pCol
=
pInput
->
pData
[
0
];
int32_t
start
=
pInput
->
startRowIndex
;
int32_t
numOfRows
=
pInput
->
numOfRows
;
switch
(
type
)
{
case
TSDB_DATA_TYPE_TINYINT
:
{
int8_t
*
plist
=
(
int8_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_SMALLINT
:
{
int16_t
*
plist
=
(
int16_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_INT
:
{
int32_t
*
plist
=
(
int32_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_BIGINT
:
{
int64_t
*
plist
=
(
int64_t
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
isum
+=
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_FLOAT
:
{
float
*
plist
=
(
float
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
dsum
+=
plist
[
i
];
}
break
;
}
case
TSDB_DATA_TYPE_DOUBLE
:
{
double
*
plist
=
(
double
*
)
pCol
->
pData
;
for
(
int32_t
i
=
start
;
i
<
numOfRows
+
pInput
->
startRowIndex
;
++
i
)
{
if
(
pCol
->
hasNull
&&
colDataIsNull_f
(
pCol
->
nullbitmap
,
i
))
{
continue
;
}
numOfElem
+=
1
;
pAvgRes
->
count
+=
1
;
pAvgRes
->
sum
.
dsum
+=
plist
[
i
];
}
break
;
}
default:
break
;
}
// data in the check operation are all null, not output
SET_VAL
(
GET_RES_INFO
(
pCtx
),
numOfElem
,
1
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
avgFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
slotId
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SAvgRes
*
pAvgRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
if
(
IS_INTEGER_TYPE
(
type
))
{
pAvgRes
->
result
=
pAvgRes
->
sum
.
isum
/
((
double
)
pAvgRes
->
count
);
}
else
{
pAvgRes
->
result
=
pAvgRes
->
sum
.
dsum
/
((
double
)
pAvgRes
->
count
);
}
return
functionFinalize
(
pCtx
,
pBlock
,
slotId
);
}
EFuncDataRequired
statisDataRequired
(
SFunctionNode
*
pFunc
,
STimeWindow
*
pTimeWindow
){
return
FUNC_DATA_REQUIRED_STATIS_LOAD
;
}
...
...
@@ -292,49 +521,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return
true
;
}
#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList))
#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)])
#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \
do { \
for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \
__ctx->fpSet.process(__ctx); \
} \
} while (0);
#define DO_UPDATE_SUBSID_RES(ctx, ts) \
do { \
for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \
SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \
if (__ctx->functionId == FUNCTION_TS_DUMMY) { \
__ctx->tag.i = (ts); \
__ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \
} \
__ctx->fpSet.process(__ctx); \
} \
} while (0)
#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \
do { \
if (((left) < (right)) ^ (sign)) { \
(left) = (right); \
DO_UPDATE_SUBSID_RES(ctx, _ts); \
(num) += 1; \
} \
} while (0)
#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \
do { \
_t *d = (_t *)((_col)->pData); \
for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \
if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \
continue; \
} \
TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \
UPDATE_DATA(ctx, val, d[i], num, sign, ts); \
} \
} while (0)
int32_t
doMinMaxHelper
(
SqlFunctionCtx
*
pCtx
,
int32_t
isMinFunc
)
{
int32_t
numOfElems
=
0
;
...
...
@@ -479,13 +665,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SStddevRes
{
double
result
;
int64_t
count
;
union
{
double
quadraticDSum
;
int64_t
quadraticISum
;};
union
{
double
dsum
;
int64_t
isum
;};
}
SStddevRes
;
bool
getStddevFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SStddevRes
);
return
true
;
...
...
@@ -588,8 +767,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
i
sum
+=
plist
[
i
];
pStddevRes
->
quadratic
I
Sum
+=
plist
[
i
]
*
plist
[
i
];
pStddevRes
->
d
sum
+=
plist
[
i
];
pStddevRes
->
quadratic
D
Sum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
...
...
@@ -603,8 +782,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
numOfElem
+=
1
;
pStddevRes
->
count
+=
1
;
pStddevRes
->
i
sum
+=
plist
[
i
];
pStddevRes
->
quadratic
I
Sum
+=
plist
[
i
]
*
plist
[
i
];
pStddevRes
->
d
sum
+=
plist
[
i
];
pStddevRes
->
quadratic
D
Sum
+=
plist
[
i
]
*
plist
[
i
];
}
break
;
}
...
...
@@ -619,21 +798,21 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
}
int32_t
stddevFinalize
(
SqlFunctionCtx
*
pCtx
,
SSDataBlock
*
pBlock
,
int32_t
slotId
)
{
SInputColumnInfoData
*
pInput
=
&
pCtx
->
input
;
int32_t
type
=
pInput
->
pData
[
0
]
->
info
.
type
;
SStddevRes
*
pStddevRes
=
GET_ROWCELL_INTERBUF
(
GET_RES_INFO
(
pCtx
));
double
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
double
avg
;
if
(
IS_INTEGER_TYPE
(
type
))
{
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
}
else
{
avg
=
pStddevRes
->
dsum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticDSum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
}
return
functionFinalize
(
pCtx
,
pBlock
,
slotId
);
}
typedef
struct
SPercentileInfo
{
double
result
;
tMemBucket
*
pMemBucket
;
int32_t
stage
;
double
minval
;
double
maxval
;
int64_t
numOfElems
;
}
SPercentileInfo
;
bool
getPercentileFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SPercentileInfo
);
return
true
;
...
...
@@ -928,14 +1107,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
return
TSDB_CODE_SUCCESS
;
}
typedef
struct
SDiffInfo
{
bool
hasPrev
;
bool
includeNull
;
bool
ignoreNegative
;
bool
firstOutput
;
union
{
int64_t
i64
;
double
d64
;}
prev
;
}
SDiffInfo
;
bool
getDiffFuncEnv
(
SFunctionNode
*
UNUSED_PARAM
(
pFunc
),
SFuncExecEnv
*
pEnv
)
{
pEnv
->
calcMemSize
=
sizeof
(
SDiffInfo
);
return
true
;
...
...
@@ -1168,21 +1339,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
}
typedef
struct
STopBotResItem
{
SVariant
v
;
uint64_t
uid
;
// it is a table uid, used to extract tag data during building of the final result for the tag data
struct
{
int32_t
pageId
;
int32_t
offset
;
}
tuplePos
;
// tuple data of this chosen row
}
STopBotResItem
;
typedef
struct
STopBotRes
{
int32_t
pageId
;
// int32_t num;
STopBotResItem
*
pItems
;
}
STopBotRes
;
bool
getTopBotFuncEnv
(
SFunctionNode
*
pFunc
,
SFuncExecEnv
*
pEnv
)
{
SValueNode
*
pkNode
=
(
SValueNode
*
)
nodesListGetNode
(
pFunc
->
pParameterList
,
1
);
pEnv
->
calcMemSize
=
sizeof
(
STopBotRes
)
+
pkNode
->
datum
.
i
*
sizeof
(
STopBotResItem
);
...
...
@@ -1335,4 +1491,4 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId
return
pEntryInfo
->
numOfRes
;
// return functionFinalize(pCtx, pBlock, slotId);
}
\ No newline at end of file
}
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
e778e824
...
...
@@ -671,9 +671,6 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) {
static
const
char
*
jkScanPhysiPlanScanCols
=
"ScanCols"
;
static
const
char
*
jkScanPhysiPlanTableId
=
"TableId"
;
static
const
char
*
jkScanPhysiPlanTableType
=
"TableType"
;
static
const
char
*
jkScanPhysiPlanScanOrder
=
"ScanOrder"
;
static
const
char
*
jkScanPhysiPlanScanCount
=
"ScanCount"
;
static
const
char
*
jkScanPhysiPlanReverseScanCount
=
"ReverseScanCount"
;
static
const
char
*
jkScanPhysiPlanTableName
=
"TableName"
;
static
int32_t
physiScanNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -689,15 +686,6 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanTableType
,
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanScanOrder
,
pNode
->
order
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanScanCount
,
pNode
->
count
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkScanPhysiPlanReverseScanCount
,
pNode
->
reverse
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkScanPhysiPlanTableName
,
nameToJson
,
&
pNode
->
tableName
);
}
...
...
@@ -718,15 +706,6 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkScanPhysiPlanTableType
,
&
pNode
->
tableType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkScanPhysiPlanScanOrder
,
&
pNode
->
order
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkScanPhysiPlanScanCount
,
&
pNode
->
count
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetIntValue
(
pJson
,
jkScanPhysiPlanReverseScanCount
,
&
pNode
->
reverse
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonToObject
(
pJson
,
jkScanPhysiPlanTableName
,
jsonToName
,
&
pNode
->
tableName
);
}
...
...
@@ -742,7 +721,8 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
return
jsonToPhysiScanNode
(
pJson
,
pObj
);
}
static
const
char
*
jkTableScanPhysiPlanScanFlag
=
"ScanFlag"
;
static
const
char
*
jkTableScanPhysiPlanScanCount
=
"ScanCount"
;
static
const
char
*
jkTableScanPhysiPlanReverseScanCount
=
"ReverseScanCount"
;
static
const
char
*
jkTableScanPhysiPlanStartKey
=
"StartKey"
;
static
const
char
*
jkTableScanPhysiPlanEndKey
=
"EndKey"
;
static
const
char
*
jkTableScanPhysiPlanRatio
=
"Ratio"
;
...
...
@@ -759,7 +739,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
int32_t
code
=
physiScanNodeToJson
(
pObj
,
pJson
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanScanFlag
,
pNode
->
scanFlag
);
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanScanCount
,
pNode
->
scanSeq
[
0
]);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanReverseScanCount
,
pNode
->
scanSeq
[
1
]);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkTableScanPhysiPlanStartKey
,
pNode
->
scanRange
.
skey
);
...
...
@@ -800,7 +783,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
int32_t
code
=
jsonToPhysiScanNode
(
pJson
,
pObj
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUTinyIntValue
(
pJson
,
jkTableScanPhysiPlanScanFlag
,
&
pNode
->
scanFlag
);
code
=
tjsonGetUTinyIntValue
(
pJson
,
jkTableScanPhysiPlanScanCount
,
&
pNode
->
scanSeq
[
0
]);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetUTinyIntValue
(
pJson
,
jkTableScanPhysiPlanReverseScanCount
,
&
pNode
->
scanSeq
[
1
]);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkTableScanPhysiPlanStartKey
,
&
pNode
->
scanRange
.
skey
);
...
...
source/libs/parser/inc/sql.y
浏览文件 @
e778e824
...
...
@@ -611,6 +611,7 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D).
function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); }
function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); }
function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); }
//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); }
%type noarg_func { SToken }
%destructor noarg_func { }
...
...
source/libs/parser/src/parCalcConst.c
浏览文件 @
e778e824
...
...
@@ -83,7 +83,7 @@ static EDealRes calcConstOperator(SOperatorNode** pNode, void* pContext) {
static
EDealRes
calcConstFunction
(
SFunctionNode
**
pNode
,
void
*
pContext
)
{
SFunctionNode
*
pFunc
=
*
pNode
;
if
(
!
fmIsScalarFunc
(
pFunc
->
funcId
))
{
if
(
!
fmIsScalarFunc
(
pFunc
->
funcId
)
||
fmIsUserDefinedFunc
(
pFunc
->
funcId
)
)
{
return
DEAL_RES_CONTINUE
;
}
SNode
*
pParam
=
NULL
;
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
e778e824
...
...
@@ -2616,37 +2616,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen
(
FSerializeFunc
)
tSerializeSCreateDropMQSBNodeReq
,
&
dropReq
);
}
static
int32_t
translateCreateTopic
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
)
{
SCMCreateTopicReq
createReq
=
{
0
};
static
int32_t
buildCreateTopicReq
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
,
SCMCreateTopicReq
*
pReq
)
{
SName
name
;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
// tNameGetFullDbName(&name, pReq->name);
tNameExtractFullName
(
toName
(
pCxt
->
pParseCxt
->
acctId
,
pCxt
->
pParseCxt
->
db
,
pStmt
->
topicName
,
&
name
),
pReq
->
name
);
pReq
->
igExists
=
pStmt
->
ignoreExists
;
pReq
->
withTbName
=
pStmt
->
pOptions
->
withTable
;
pReq
->
withSchema
=
pStmt
->
pOptions
->
withSchema
;
pReq
->
withTag
=
pStmt
->
pOptions
->
withTag
;
pReq
->
sql
=
strdup
(
pCxt
->
pParseCxt
->
pSql
);
if
(
NULL
==
pReq
->
sql
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
NULL
!=
pStmt
->
pQuery
)
{
strcpy
(
pReq
->
subscribeDbName
,
((
SRealTableNode
*
)(((
SSelectStmt
*
)
pStmt
->
pQuery
)
->
pFromTable
))
->
table
.
dbName
);
pCxt
->
pParseCxt
->
topicQuery
=
true
;
int32_t
code
=
translateQuery
(
pCxt
,
pStmt
->
pQuery
);
code
=
translateQuery
(
pCxt
,
pStmt
->
pQuery
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
nodesNodeToString
(
pStmt
->
pQuery
,
false
,
&
createReq
.
ast
,
NULL
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
return
code
;
code
=
nodesNodeToString
(
pStmt
->
pQuery
,
false
,
&
pReq
->
ast
,
NULL
);
}
}
else
{
strcpy
(
createReq
.
subscribeDbName
,
pStmt
->
subscribeDbName
);
strcpy
(
pReq
->
subscribeDbName
,
pStmt
->
subscribeDbName
);
}
createReq
.
sql
=
strdup
(
pCxt
->
pParseCxt
->
pSql
);
if
(
NULL
==
createReq
.
sql
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
return
code
;
}
static
int32_t
checkCreateTopic
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
)
{
if
(
NULL
==
pStmt
->
pQuery
)
{
return
TSDB_CODE_SUCCESS
;
}
SName
name
;
// tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db));
// tNameGetFullDbName(&name, createReq.name);
tNameExtractFullName
(
toName
(
pCxt
->
pParseCxt
->
acctId
,
pCxt
->
pParseCxt
->
db
,
pStmt
->
topicName
,
&
name
),
createReq
.
name
);
createReq
.
igExists
=
pStmt
->
ignoreExists
;
createReq
.
withTbName
=
pStmt
->
pOptions
->
withTable
;
createReq
.
withSchema
=
pStmt
->
pOptions
->
withSchema
;
createReq
.
withTag
=
pStmt
->
pOptions
->
withTag
;
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pStmt
->
pQuery
))
{
SSelectStmt
*
pSelect
=
(
SSelectStmt
*
)
pStmt
->
pQuery
;
if
(
!
pSelect
->
isDistinct
&&
QUERY_NODE_REAL_TABLE
==
nodeType
(
pSelect
->
pFromTable
)
&&
NULL
==
pSelect
->
pGroupByList
&&
NULL
==
pSelect
->
pLimit
&&
NULL
==
pSelect
->
pSlimit
&&
NULL
==
pSelect
->
pOrderByList
&&
NULL
==
pSelect
->
pPartitionByList
)
{
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_CREATE_TOPIC
,
(
FSerializeFunc
)
tSerializeSCMCreateTopicReq
,
&
createReq
);
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_TOPIC_QUERY
);
}
static
int32_t
translateCreateTopic
(
STranslateContext
*
pCxt
,
SCreateTopicStmt
*
pStmt
)
{
SCMCreateTopicReq
createReq
=
{
0
};
int32_t
code
=
checkCreateTopic
(
pCxt
,
pStmt
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildCreateTopicReq
(
pCxt
,
pStmt
,
&
createReq
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
buildCmdMsg
(
pCxt
,
TDMT_MND_CREATE_TOPIC
,
(
FSerializeFunc
)
tSerializeSCMCreateTopicReq
,
&
createReq
);
}
tFreeSCMCreateTopicReq
(
&
createReq
);
return
code
;
}
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
e778e824
...
...
@@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return
"soffset/offset can not be less than 0"
;
case
TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY
:
return
"slimit/soffset only available for PARTITION BY query"
;
case
TSDB_CODE_PAR_INVALID_TOPIC_QUERY
:
return
"Invalid topic query"
;
case
TSDB_CODE_OUT_OF_MEMORY
:
return
"Out of memory"
;
default:
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
e778e824
...
...
@@ -199,7 +199,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
TSWAP
(
pScan
->
pMeta
,
pRealTable
->
pMeta
,
STableMeta
*
);
TSWAP
(
pScan
->
pVgroupList
,
pRealTable
->
pVgroupList
,
SVgroupsInfo
*
);
pScan
->
scanFlag
=
MAIN_SCAN
;
pScan
->
scanSeq
[
0
]
=
1
;
pScan
->
scanSeq
[
1
]
=
0
;
pScan
->
scanRange
=
TSWINDOW_INITIALIZER
;
pScan
->
tableName
.
type
=
TSDB_TABLE_NAME_T
;
pScan
->
tableName
.
acctId
=
pCxt
->
pPlanCxt
->
acctId
;
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
e778e824
...
...
@@ -20,11 +20,14 @@
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
#define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0)
#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1)
#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2)
#define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask)
#define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
typedef
struct
SOptimizeContext
{
SPlanContext
*
pPlanCxt
;
bool
optimized
;
}
SOptimizeContext
;
...
...
@@ -57,7 +60,23 @@ typedef enum ECondAction {
// after supporting outer join, there are other possibilities
}
ECondAction
;
EDealRes
haveNormalColImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
typedef
bool
(
*
FMayBeOptimized
)(
SLogicNode
*
pNode
);
static
SLogicNode
*
optFindPossibleNode
(
SLogicNode
*
pNode
,
FMayBeOptimized
func
)
{
if
(
func
(
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pScanNode
=
optFindPossibleNode
((
SLogicNode
*
)
pChild
,
func
);
if
(
NULL
!=
pScanNode
)
{
return
pScanNode
;
}
}
return
NULL
;
}
EDealRes
osdHaveNormalColImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
*
((
bool
*
)
pContext
)
=
(
COLUMN_TYPE_TAG
!=
((
SColumnNode
*
)
pNode
)
->
colType
);
return
*
((
bool
*
)
pContext
)
?
DEAL_RES_END
:
DEAL_RES_IGNORE_CHILD
;
...
...
@@ -65,9 +84,9 @@ EDealRes haveNormalColImpl(SNode* pNode, void* pContext) {
return
DEAL_RES_CONTINUE
;
}
static
bool
h
aveNormalCol
(
SNodeList
*
pList
)
{
static
bool
osdH
aveNormalCol
(
SNodeList
*
pList
)
{
bool
res
=
false
;
nodesWalkExprsPostOrder
(
pList
,
h
aveNormalColImpl
,
&
res
);
nodesWalkExprsPostOrder
(
pList
,
osdH
aveNormalColImpl
,
&
res
);
return
res
;
}
...
...
@@ -89,21 +108,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
if
(
QUERY_NODE_LOGIC_PLAN_WINDOW
==
nodeType
(
pNode
->
pParent
))
{
return
(
WINDOW_TYPE_INTERVAL
==
((
SWindowLogicNode
*
)
pNode
->
pParent
)
->
winType
);
}
return
!
haveNormalCol
(((
SAggLogicNode
*
)
pNode
->
pParent
)
->
pGroupKeys
);
}
static
SLogicNode
*
osdFindPossibleScanNode
(
SLogicNode
*
pNode
)
{
if
(
osdMayBeOptimized
(
pNode
))
{
return
pNode
;
}
SNode
*
pChild
;
FOREACH
(
pChild
,
pNode
->
pChildren
)
{
SLogicNode
*
pScanNode
=
osdFindPossibleScanNode
((
SLogicNode
*
)
pChild
);
if
(
NULL
!=
pScanNode
)
{
return
pScanNode
;
}
}
return
NULL
;
return
!
osdHaveNormalCol
(((
SAggLogicNode
*
)
pNode
->
pParent
)
->
pGroupKeys
);
}
static
SNodeList
*
osdGetAllFuncs
(
SLogicNode
*
pNode
)
{
...
...
@@ -138,7 +143,7 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs,
}
static
int32_t
osdMatch
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
,
SOsdInfo
*
pInfo
)
{
pInfo
->
pScan
=
(
SScanLogicNode
*
)
o
sdFindPossibleScanNode
(
pLogicNode
);
pInfo
->
pScan
=
(
SScanLogicNode
*
)
o
ptFindPossibleNode
(
pLogicNode
,
osdMayBeOptimized
);
if
(
NULL
==
pInfo
->
pScan
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -345,7 +350,7 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond,
}
static
int32_t
cpdOptimizeScanCondition
(
SOptimizeContext
*
pCxt
,
SScanLogicNode
*
pScan
)
{
if
(
NULL
==
pScan
->
node
.
pConditions
)
{
if
(
NULL
==
pScan
->
node
.
pConditions
||
OPTIMIZE_FLAG_TEST_MASK
(
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
)
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -359,7 +364,10 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
pScan
->
node
.
pConditions
=
pOtherCond
;
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pScan
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
);
pCxt
->
optimized
=
true
;
}
else
{
nodesDestroyNode
(
pPrimaryKeyCond
);
nodesDestroyNode
(
pOtherCond
);
}
...
...
@@ -367,7 +375,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode*
return
code
;
}
static
bool
b
elongThisTable
(
SNode
*
pCondCol
,
SNodeList
*
pTableCols
)
{
static
bool
cpdB
elongThisTable
(
SNode
*
pCondCol
,
SNodeList
*
pTableCols
)
{
SNode
*
pTableCol
=
NULL
;
FOREACH
(
pTableCol
,
pTableCols
)
{
if
(
nodesEqualNode
(
pCondCol
,
pTableCol
))
{
...
...
@@ -380,9 +388,9 @@ static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) {
static
EDealRes
cpdIsMultiTableCondImpl
(
SNode
*
pNode
,
void
*
pContext
)
{
SCpdIsMultiTableCondCxt
*
pCxt
=
pContext
;
if
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
))
{
if
(
b
elongThisTable
(
pNode
,
pCxt
->
pLeftCols
))
{
if
(
cpdB
elongThisTable
(
pNode
,
pCxt
->
pLeftCols
))
{
pCxt
->
havaLeftCol
=
true
;
}
else
if
(
b
elongThisTable
(
pNode
,
pCxt
->
pRightCols
))
{
}
else
if
(
cpdB
elongThisTable
(
pNode
,
pCxt
->
pRightCols
))
{
pCxt
->
haveRightCol
=
true
;
}
return
pCxt
->
havaLeftCol
&&
pCxt
->
haveRightCol
?
DEAL_RES_END
:
DEAL_RES_CONTINUE
;
...
...
@@ -508,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN
return
TSDB_CODE_PLAN_INTERNAL_ERROR
;
}
static
bool
cpdIsPrimaryKey
(
SNode
*
pNode
,
SNodeList
*
pTableCols
)
{
if
(
QUERY_NODE_COLUMN
!=
nodeType
(
pNode
))
{
return
false
;
}
SColumnNode
*
pCol
=
(
SColumnNode
*
)
pNode
;
if
(
PRIMARYKEY_TIMESTAMP_COL_ID
!=
pCol
->
colId
)
{
return
false
;
}
return
cpdBelongThisTable
(
pNode
,
pTableCols
);
}
static
bool
cpdIsPrimaryKeyEqualCond
(
SJoinLogicNode
*
pJoin
,
SNode
*
pCond
)
{
if
(
QUERY_NODE_OPERATOR
!=
nodeType
(
pCond
))
{
return
false
;
}
SNodeList
*
pLeftCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
0
))
->
pTargets
;
SNodeList
*
pRightCols
=
((
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
))
->
pTargets
;
SOperatorNode
*
pOper
=
(
SOperatorNode
*
)
pJoin
->
pOnConditions
;
if
(
cpdIsPrimaryKey
(
pOper
->
pLeft
,
pLeftCols
))
{
return
cpdIsPrimaryKey
(
pOper
->
pRight
,
pRightCols
);
}
else
if
(
cpdIsPrimaryKey
(
pOper
->
pLeft
,
pRightCols
))
{
return
cpdIsPrimaryKey
(
pOper
->
pRight
,
pLeftCols
);
}
return
false
;
}
static
int32_t
cpdCheckOpCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SNode
*
pOnCond
)
{
if
(
!
cpdIsPrimaryKeyEqualCond
(
pJoin
,
pOnCond
))
{
snprintf
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
"l.ts = r.ts is expected in join expression"
);
return
TSDB_CODE_FAILED
;
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdCheckLogicCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
,
SLogicConditionNode
*
pOnCond
)
{
if
(
LOGIC_COND_TYPE_AND
!=
pOnCond
->
condType
)
{
snprintf
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
"l.ts = r.ts is expected in join expression"
);
return
TSDB_CODE_FAILED
;
}
SNode
*
pCond
=
NULL
;
FOREACH
(
pCond
,
pOnCond
->
pParameterList
)
{
if
(
!
cpdIsPrimaryKeyEqualCond
(
pJoin
,
pCond
))
{
snprintf
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
"l.ts = r.ts is expected in join expression"
);
return
TSDB_CODE_FAILED
;
}
}
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
cpdCheckJoinOnCond
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
pOnConditions
)
{
snprintf
(
pCxt
->
pPlanCxt
->
pMsg
,
pCxt
->
pPlanCxt
->
msgLen
,
"not support cross join"
);
return
TSDB_CODE_FAILED
;
}
if
(
QUERY_NODE_LOGIC_CONDITION
==
nodeType
(
pJoin
->
pOnConditions
))
{
return
cpdCheckLogicCond
(
pCxt
,
pJoin
,
(
SLogicConditionNode
*
)
pJoin
->
pOnConditions
);
}
else
{
return
cpdCheckOpCond
(
pCxt
,
pJoin
,
pJoin
->
pOnConditions
);
}
}
static
int32_t
cpdPushJoinCondition
(
SOptimizeContext
*
pCxt
,
SJoinLogicNode
*
pJoin
)
{
if
(
NULL
==
pJoin
->
node
.
pConditions
)
{
if
(
OPTIMIZE_FLAG_TEST_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
)
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
NULL
==
pJoin
->
node
.
pConditions
)
{
return
cpdCheckJoinOnCond
(
pCxt
,
pJoin
);
}
SNode
*
pOnCond
=
NULL
;
SNode
*
pLeftChildCond
=
NULL
;
SNode
*
pRightChildCond
=
NULL
;
...
...
@@ -527,7 +600,11 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi
code
=
cpdPushCondToChild
(
pCxt
,
(
SLogicNode
*
)
nodesListGetNode
(
pJoin
->
node
.
pChildren
,
1
),
&
pRightChildCond
);
}
if
(
TSDB_CODE_SUCCESS
!=
code
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
OPTIMIZE_FLAG_SET_MASK
(
pJoin
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_CPD
);
pCxt
->
optimized
=
true
;
code
=
cpdCheckJoinOnCond
(
pCxt
,
pJoin
);
}
else
{
nodesDestroyNode
(
pOnCond
);
nodesDestroyNode
(
pLeftChildCond
);
nodesDestroyNode
(
pRightChildCond
);
...
...
@@ -572,15 +649,129 @@ static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
return
cpdPushCondition
(
pCxt
,
pLogicNode
);
}
static
bool
opkIsPrimaryKeyOrderBy
(
SNodeList
*
pSortKeys
)
{
if
(
1
!=
LIST_LENGTH
(
pSortKeys
))
{
return
false
;
}
SNode
*
pNode
=
((
SOrderByExprNode
*
)
nodesListGetNode
(
pSortKeys
,
0
))
->
pExpr
;
return
(
QUERY_NODE_COLUMN
==
nodeType
(
pNode
)
?
(
PRIMARYKEY_TIMESTAMP_COL_ID
==
((
SColumnNode
*
)
pNode
)
->
colId
)
:
false
);
}
static
bool
opkSortMayBeOptimized
(
SLogicNode
*
pNode
)
{
if
(
QUERY_NODE_LOGIC_PLAN_SORT
!=
nodeType
(
pNode
))
{
return
false
;
}
if
(
OPTIMIZE_FLAG_TEST_MASK
(
pNode
->
optimizedFlag
,
OPTIMIZE_FLAG_OPK
))
{
return
false
;
}
return
true
;
}
static
int32_t
opkGetScanNodesImpl
(
SLogicNode
*
pNode
,
bool
*
pNotOptimize
,
SNodeList
**
pScanNodes
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
switch
(
nodeType
(
pNode
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
nodesListMakeAppend
(
pScanNodes
,
pNode
);
case
QUERY_NODE_LOGIC_PLAN_JOIN
:
code
=
opkGetScanNodesImpl
(
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
opkGetScanNodesImpl
(
nodesListGetNode
(
pNode
->
pChildren
,
1
),
pNotOptimize
,
pScanNodes
);
}
return
code
;
case
QUERY_NODE_LOGIC_PLAN_AGG
:
*
pNotOptimize
=
true
;
return
code
;
default:
break
;
}
if
(
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
*
pNotOptimize
=
true
;
}
return
opkGetScanNodesImpl
(
nodesListGetNode
(
pNode
->
pChildren
,
0
),
pNotOptimize
,
pScanNodes
);
}
static
int32_t
opkGetScanNodes
(
SLogicNode
*
pNode
,
SNodeList
**
pScanNodes
)
{
bool
notOptimize
=
false
;
int32_t
code
=
opkGetScanNodesImpl
(
pNode
,
&
notOptimize
,
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
!=
code
||
notOptimize
)
{
nodesClearList
(
*
pScanNodes
);
}
return
code
;
}
static
EOrder
opkGetPrimaryKeyOrder
(
SSortLogicNode
*
pSort
)
{
return
((
SOrderByExprNode
*
)
nodesListGetNode
(
pSort
->
pSortKeys
,
0
))
->
order
;
}
static
SNode
*
opkRewriteDownNode
(
SSortLogicNode
*
pSort
)
{
SNode
*
pDownNode
=
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
);
// todo
pSort
->
node
.
pChildren
=
NULL
;
return
pDownNode
;
}
static
int32_t
opkDoOptimized
(
SOptimizeContext
*
pCxt
,
SSortLogicNode
*
pSort
,
SNodeList
*
pScanNodes
)
{
EOrder
order
=
opkGetPrimaryKeyOrder
(
pSort
);
if
(
ORDER_DESC
==
order
)
{
SNode
*
pScan
=
NULL
;
FOREACH
(
pScan
,
pScanNodes
)
{
((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
0
]
=
0
;
((
SScanLogicNode
*
)
pScan
)
->
scanSeq
[
1
]
=
1
;
}
}
if
(
NULL
==
pSort
->
node
.
pParent
)
{
// todo
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pDownNode
=
opkRewriteDownNode
(
pSort
);
SNode
*
pNode
;
FOREACH
(
pNode
,
pSort
->
node
.
pParent
->
pChildren
)
{
if
(
nodesEqualNode
(
pNode
,
pSort
))
{
REPLACE_NODE
(
pDownNode
);
break
;
}
}
nodesDestroyNode
(
pSort
);
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
opkOptimizeImpl
(
SOptimizeContext
*
pCxt
,
SSortLogicNode
*
pSort
)
{
OPTIMIZE_FLAG_SET_MASK
(
pSort
->
node
.
optimizedFlag
,
OPTIMIZE_FLAG_OPK
);
if
(
!
opkIsPrimaryKeyOrderBy
(
pSort
->
pSortKeys
)
||
1
!=
LIST_LENGTH
(
pSort
->
node
.
pChildren
))
{
return
TSDB_CODE_SUCCESS
;
}
SNodeList
*
pScanNodes
=
NULL
;
int32_t
code
=
opkGetScanNodes
(
nodesListGetNode
(
pSort
->
node
.
pChildren
,
0
),
&
pScanNodes
);
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pScanNodes
)
{
code
=
opkDoOptimized
(
pCxt
,
pSort
,
pScanNodes
);
}
nodesClearList
(
pScanNodes
);
return
code
;
}
static
int32_t
opkOptimize
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
SSortLogicNode
*
pSort
=
(
SSortLogicNode
*
)
optFindPossibleNode
(
pLogicNode
,
opkSortMayBeOptimized
);
if
(
NULL
==
pSort
)
{
return
TSDB_CODE_SUCCESS
;
}
return
opkOptimizeImpl
(
pCxt
,
pSort
);
}
static
const
SOptimizeRule
optimizeRuleSet
[]
=
{
{
.
pName
=
"OptimizeScanData"
,
.
optimizeFunc
=
osdOptimize
},
{
.
pName
=
"ConditionPushDown"
,
.
optimizeFunc
=
cpdOptimize
}
{
.
pName
=
"ConditionPushDown"
,
.
optimizeFunc
=
cpdOptimize
},
{
.
pName
=
"OrderByPrimaryKey"
,
.
optimizeFunc
=
opkOptimize
}
};
static
const
int32_t
optimizeRuleNum
=
(
sizeof
(
optimizeRuleSet
)
/
sizeof
(
SOptimizeRule
));
static
int32_t
applyOptimizeRule
(
SLogicNode
*
pLogicNode
)
{
SOptimizeContext
cxt
=
{
.
optimized
=
false
};
static
int32_t
applyOptimizeRule
(
S
PlanContext
*
pCxt
,
S
LogicNode
*
pLogicNode
)
{
SOptimizeContext
cxt
=
{
.
pPlanCxt
=
pCxt
,
.
optimized
=
false
};
do
{
cxt
.
optimized
=
false
;
for
(
int32_t
i
=
0
;
i
<
optimizeRuleNum
;
++
i
)
{
...
...
@@ -594,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) {
}
int32_t
optimizeLogicPlan
(
SPlanContext
*
pCxt
,
SLogicNode
*
pLogicNode
)
{
return
applyOptimizeRule
(
pLogicNode
);
return
applyOptimizeRule
(
p
Cxt
,
p
LogicNode
);
}
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
e778e824
...
...
@@ -398,9 +398,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pScanPhysiNode
->
uid
=
pScanLogicNode
->
pMeta
->
uid
;
pScanPhysiNode
->
tableType
=
pScanLogicNode
->
pMeta
->
tableType
;
pScanPhysiNode
->
order
=
TSDB_ORDER_ASC
;
pScanPhysiNode
->
count
=
1
;
pScanPhysiNode
->
reverse
=
0
;
memcpy
(
&
pScanPhysiNode
->
tableName
,
&
pScanLogicNode
->
tableName
,
sizeof
(
SName
));
}
...
...
@@ -432,7 +429,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pTableScan
->
scanFlag
=
pScanLogicNode
->
scanFlag
;
memcpy
(
pTableScan
->
scanSeq
,
pScanLogicNode
->
scanSeq
,
sizeof
(
pScanLogicNode
->
scanSeq
))
;
pTableScan
->
scanRange
=
pScanLogicNode
->
scanRange
;
pTableScan
->
ratio
=
pScanLogicNode
->
ratio
;
vgroupInfoToNodeAddr
(
pScanLogicNode
->
pVgroupList
->
vgroups
,
&
pSubplan
->
execNode
);
...
...
source/libs/planner/test/planOptTest.cpp
0 → 100644
浏览文件 @
e778e824
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "planTestUtil.h"
#include "planner.h"
using
namespace
std
;
class
PlanOptimizeTest
:
public
PlannerTestBase
{
};
TEST_F
(
PlanOptimizeTest
,
orderByPrimaryKey
)
{
useDb
(
"root"
,
"test"
);
run
(
"select * from t1 order by ts"
);
run
(
"select * from t1 order by ts desc"
);
run
(
"select c1 from t1 order by ts"
);
run
(
"select c1 from t1 order by ts desc"
);
}
source/libs/scalar/src/sclfunc.c
浏览文件 @
e778e824
...
...
@@ -1247,26 +1247,28 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
}
int32_t
nowFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
if
(
inputNum
!=
1
)
{
return
TSDB_CODE_FAILED
;
int64_t
ts
=
taosGetTimestamp
(
TSDB_TIME_PRECISION_MILLI
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
colDataAppendInt64
(
pOutput
->
columnData
,
i
,
&
ts
);
}
colDataAppendInt64
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
(
int64_t
*
)
colDataGetData
(
pInput
->
columnData
,
0
))
;
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
todayFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
if
(
inputNum
!=
1
)
{
return
TSDB_CODE_FAILED
;
int64_t
ts
=
taosGetTimestampToday
(
TSDB_TIME_PRECISION_MILLI
);
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
colDataAppendInt64
(
pOutput
->
columnData
,
i
,
&
ts
);
}
colDataAppendInt64
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
(
int64_t
*
)
colDataGetData
(
pInput
->
columnData
,
0
))
;
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
timezoneFunction
(
SScalarParam
*
pInput
,
int32_t
inputNum
,
SScalarParam
*
pOutput
)
{
if
(
inputNum
!=
1
)
{
return
TSDB_CODE_FAILED
;
for
(
int32_t
i
=
0
;
i
<
pInput
->
numOfRows
;
++
i
)
{
colDataAppend
(
pOutput
->
columnData
,
i
,
tsTimezoneStr
,
false
)
;
}
colDataAppend
(
pOutput
->
columnData
,
pOutput
->
numOfRows
,
(
char
*
)
colDataGetData
(
pInput
->
columnData
,
0
),
false
)
;
pOutput
->
numOfRows
=
pInput
->
numOfRows
;
return
TSDB_CODE_SUCCESS
;
}
...
...
tests/script/tsim/tmq/basic2Of2ConsOverlap.sim
浏览文件 @
e778e824
...
...
@@ -292,7 +292,7 @@ print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
if $rows != 2 then
sleep 1000
goto wait_consumer_end_from_
c
tb
goto wait_consumer_end_from_
n
tb
endi
if $data[0][1] == 0 then
if $data[1][1] != 1 then
...
...
tests/test/c/tmqSim.c
浏览文件 @
e778e824
...
...
@@ -125,13 +125,13 @@ void saveConfigToLogFile() {
for
(
int32_t
i
=
0
;
i
<
g_stConfInfo
.
numOfThread
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"# consumer %d info:
\n
"
,
g_stConfInfo
.
stThreads
[
i
].
consumerId
);
taosFprintfFile
(
g_fp
,
" Topics: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"%s, "
,
g_stConfInfo
.
stThreads
[
i
].
topics
[
i
]);
for
(
int
j
=
0
;
j
<
g_stConfInfo
.
stThreads
[
i
].
numOfTopic
;
j
++
)
{
taosFprintfFile
(
g_fp
,
"%s, "
,
g_stConfInfo
.
stThreads
[
i
].
topics
[
j
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
taosFprintfFile
(
g_fp
,
" Key: "
);
for
(
int
i
=
0
;
i
<
g_stConfInfo
.
stThreads
[
i
].
numOfKey
;
i
++
)
{
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
i
],
g_stConfInfo
.
stThreads
[
i
].
value
[
i
]);
for
(
int
k
=
0
;
k
<
g_stConfInfo
.
stThreads
[
i
].
numOfKey
;
k
++
)
{
taosFprintfFile
(
g_fp
,
"%s:%s, "
,
g_stConfInfo
.
stThreads
[
i
].
key
[
k
],
g_stConfInfo
.
stThreads
[
i
].
value
[
k
]);
}
taosFprintfFile
(
g_fp
,
"
\n
"
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录