Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
68e0fb92
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
68e0fb92
编写于
8月 08, 2023
作者:
D
dapan1121
提交者:
GitHub
8月 08, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #22165 from taosdata/feat/3.0/TD-25271
feature: optimize interval with limit
上级
0ea34286
fba43e17
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
602 addition
and
158 deletion
+602
-158
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+9
-0
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+2
-1
source/libs/executor/src/operator.c
source/libs/executor/src/operator.c
+0
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+18
-19
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+89
-3
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+0
-1
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+99
-27
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+12
-0
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+1
-0
tests/script/tsim/query/r/explain_tsorder.result
tests/script/tsim/query/r/explain_tsorder.result
+106
-106
tests/system-test/2-query/interval_limit_opt.py
tests/system-test/2-query/interval_limit_opt.py
+266
-0
未找到文件。
source/libs/executor/inc/executorInt.h
浏览文件 @
68e0fb92
...
@@ -25,6 +25,7 @@ extern "C" {
...
@@ -25,6 +25,7 @@ extern "C" {
#include "tsort.h"
#include "tsort.h"
#include "ttszip.h"
#include "ttszip.h"
#include "tvariant.h"
#include "tvariant.h"
#include "theap.h"
#include "dataSinkMgt.h"
#include "dataSinkMgt.h"
#include "executil.h"
#include "executil.h"
...
@@ -417,6 +418,14 @@ typedef struct SIntervalAggOperatorInfo {
...
@@ -417,6 +418,14 @@ typedef struct SIntervalAggOperatorInfo {
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STimeWindowAggSupp
twAggSup
;
STimeWindowAggSupp
twAggSup
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
// for limit optimization
bool
limited
;
int64_t
limit
;
bool
slimited
;
int64_t
slimit
;
uint64_t
curGroupId
;
// initialize to UINT64_MAX
uint64_t
handledGroupNum
;
BoundedQueue
*
pBQ
;
}
SIntervalAggOperatorInfo
;
}
SIntervalAggOperatorInfo
;
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
typedef
struct
SMergeAlignedIntervalAggOperatorInfo
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
68e0fb92
...
@@ -2118,8 +2118,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
...
@@ -2118,8 +2118,9 @@ int32_t buildGroupIdMapForAllTables(STableListInfo* pTableListInfo, SReadHandle*
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
return
code
;
}
}
if
(
pScanNode
->
groupOrderScan
)
pTableListInfo
->
numOfOuputGroups
=
taosArrayGetSize
(
pTableListInfo
->
pTableList
);
if
(
groupSort
)
{
if
(
groupSort
||
pScanNode
->
groupOrderScan
)
{
code
=
sortTableGroup
(
pTableListInfo
);
code
=
sortTableGroup
(
pTableListInfo
);
}
}
}
}
...
...
source/libs/executor/src/operator.c
浏览文件 @
68e0fb92
...
@@ -275,7 +275,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
...
@@ -275,7 +275,6 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
SNode
*
pTagIndexCond
,
const
char
*
pUser
,
const
char
*
dbname
)
{
SNode
*
pTagIndexCond
,
const
char
*
pUser
,
const
char
*
dbname
)
{
int32_t
type
=
nodeType
(
pPhyNode
);
int32_t
type
=
nodeType
(
pPhyNode
);
const
char
*
idstr
=
GET_TASKID
(
pTaskInfo
);
const
char
*
idstr
=
GET_TASKID
(
pTaskInfo
);
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
if
(
pPhyNode
->
pChildren
==
NULL
||
LIST_LENGTH
(
pPhyNode
->
pChildren
)
==
0
)
{
SOperatorInfo
*
pOperator
=
NULL
;
SOperatorInfo
*
pOperator
=
NULL
;
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
type
)
{
if
(
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
==
type
)
{
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
68e0fb92
...
@@ -848,30 +848,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
...
@@ -848,30 +848,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return
result
;
return
result
;
}
}
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pInfo
->
base
.
pTableListInfo
))
{
while
(
1
)
{
setOperatorCompleted
(
pOperator
);
if
((
++
pInfo
->
currentGroupId
)
>=
tableListGetOutputGroups
(
pInfo
->
base
.
pTableListInfo
))
{
return
NULL
;
setOperatorCompleted
(
pOperator
);
}
return
NULL
;
}
// reset value for the next group data output
// reset value for the next group data output
pOperator
->
status
=
OP_OPENED
;
pOperator
->
status
=
OP_OPENED
;
resetLimitInfoForNextGroup
(
&
pInfo
->
base
.
limitInfo
);
resetLimitInfoForNextGroup
(
&
pInfo
->
base
.
limitInfo
);
int32_t
num
=
0
;
int32_t
num
=
0
;
STableKeyInfo
*
pList
=
NULL
;
STableKeyInfo
*
pList
=
NULL
;
tableListGetGroupList
(
pInfo
->
base
.
pTableListInfo
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
tableListGetGroupList
(
pInfo
->
base
.
pTableListInfo
,
pInfo
->
currentGroupId
,
&
pList
,
&
num
);
pAPI
->
tsdReader
.
tsdSetQueryTableList
(
pInfo
->
base
.
dataReader
,
pList
,
num
);
pAPI
->
tsdReader
.
tsdSetQueryTableList
(
pInfo
->
base
.
dataReader
,
pList
,
num
);
pAPI
->
tsdReader
.
tsdReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pAPI
->
tsdReader
.
tsdReaderResetStatus
(
pInfo
->
base
.
dataReader
,
&
pInfo
->
base
.
cond
);
pInfo
->
scanTimes
=
0
;
pInfo
->
scanTimes
=
0
;
result
=
doGroupedTableScan
(
pOperator
);
result
=
doGroupedTableScan
(
pOperator
);
if
(
result
!=
NULL
)
{
if
(
result
!=
NULL
)
{
return
result
;
return
result
;
}
}
}
setOperatorCompleted
(
pOperator
);
return
NULL
;
}
}
}
}
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
68e0fb92
...
@@ -876,7 +876,67 @@ bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
...
@@ -876,7 +876,67 @@ bool needDeleteWindowBuf(STimeWindow* pWin, STimeWindowAggSupp* pTwSup) {
return
pTwSup
->
maxTs
!=
INT64_MIN
&&
pWin
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
;
return
pTwSup
->
maxTs
!=
INT64_MIN
&&
pWin
->
ekey
<
pTwSup
->
maxTs
-
pTwSup
->
deleteMark
;
}
}
static
void
hashIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
static
bool
tsKeyCompFn
(
void
*
l
,
void
*
r
,
void
*
param
)
{
TSKEY
*
lTS
=
(
TSKEY
*
)
l
;
TSKEY
*
rTS
=
(
TSKEY
*
)
r
;
SIntervalAggOperatorInfo
*
pInfo
=
param
;
return
pInfo
->
binfo
.
outputTsOrder
==
ORDER_ASC
?
*
lTS
<
*
rTS
:
*
lTS
>
*
rTS
;
}
static
bool
isCalculatedWin
(
SIntervalAggOperatorInfo
*
pInfo
,
const
STimeWindow
*
win
,
uint64_t
tableGroupId
)
{
char
keyBuf
[
sizeof
(
TSKEY
)
+
sizeof
(
uint64_t
)]
=
{
0
};
SET_RES_WINDOW_KEY
(
keyBuf
,
(
char
*
)
&
win
->
skey
,
sizeof
(
TSKEY
),
tableGroupId
);
return
tSimpleHashGet
(
pInfo
->
aggSup
.
pResultRowHashTable
,
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
)))
!=
NULL
;
}
/**
* @brief check if cur window should be filtered out by limit info
* @retval true if should be filtered out
* @retval false if not filtering out
* @note If no limit info, we skip filtering.
* If input/output ts order mismatch, we skip filtering too.
* eg. input ts order: desc, and output ts order: asc, limit: 10
* IntervalOperator should output the first 10 windows, however, we can't find the first 10 windows until we scan
* every tuple in every block.
* And the boundedQueue keeps refreshing all records with smaller ts key.
*/
static
bool
filterWindowWithLimit
(
SIntervalAggOperatorInfo
*
pOperatorInfo
,
STimeWindow
*
win
,
uint64_t
groupId
)
{
if
(
!
pOperatorInfo
->
limited
// if no limit info, no filter will be applied
||
pOperatorInfo
->
binfo
.
inputTsOrder
!=
pOperatorInfo
->
binfo
.
outputTsOrder
// if input/output ts order mismatch, no filter
)
{
return
false
;
}
if
(
pOperatorInfo
->
limit
==
0
)
return
true
;
if
(
pOperatorInfo
->
pBQ
==
NULL
)
{
pOperatorInfo
->
pBQ
=
createBoundedQueue
(
pOperatorInfo
->
limit
-
1
,
tsKeyCompFn
,
taosMemoryFree
,
pOperatorInfo
);
}
bool
shouldFilter
=
false
;
// if BQ has been full, compare it with top of BQ
if
(
taosBQSize
(
pOperatorInfo
->
pBQ
)
==
taosBQMaxSize
(
pOperatorInfo
->
pBQ
)
+
1
)
{
PriorityQueueNode
*
top
=
taosBQTop
(
pOperatorInfo
->
pBQ
);
shouldFilter
=
tsKeyCompFn
(
top
->
data
,
&
win
->
skey
,
pOperatorInfo
);
}
if
(
shouldFilter
)
{
return
true
;
}
else
if
(
isCalculatedWin
(
pOperatorInfo
,
win
,
groupId
))
{
return
false
;
}
// cur win not been filtered out and not been pushed into BQ yet, push it into BQ
PriorityQueueNode
node
=
{.
data
=
taosMemoryMalloc
(
sizeof
(
TSKEY
))};
*
((
TSKEY
*
)
node
.
data
)
=
win
->
skey
;
if
(
NULL
==
taosBQPush
(
pOperatorInfo
->
pBQ
,
&
node
))
{
taosMemoryFree
(
node
.
data
);
return
true
;
}
return
false
;
}
static
bool
hashIntervalAgg
(
SOperatorInfo
*
pOperatorInfo
,
SResultRowInfo
*
pResultRowInfo
,
SSDataBlock
*
pBlock
,
int32_t
scanFlag
)
{
int32_t
scanFlag
)
{
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
pInfo
=
(
SIntervalAggOperatorInfo
*
)
pOperatorInfo
->
info
;
...
@@ -891,8 +951,21 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -891,8 +951,21 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
SResultRow
*
pResult
=
NULL
;
if
(
tableGroupId
!=
pInfo
->
curGroupId
)
{
pInfo
->
handledGroupNum
+=
1
;
if
(
pInfo
->
slimited
&&
pInfo
->
handledGroupNum
>
pInfo
->
slimit
)
{
return
true
;
}
else
{
pInfo
->
curGroupId
=
tableGroupId
;
destroyBoundedQueue
(
pInfo
->
pBQ
);
pInfo
->
pBQ
=
NULL
;
}
}
STimeWindow
win
=
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
binfo
.
inputTsOrder
);
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
binfo
.
inputTsOrder
);
if
(
filterWindowWithLimit
(
pInfo
,
&
win
,
tableGroupId
))
return
false
;
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
@@ -929,7 +1002,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -929,7 +1002,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
while
(
1
)
{
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
binfo
.
inputTsOrder
);
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
binfo
.
inputTsOrder
);
if
(
startPos
<
0
)
{
if
(
startPos
<
0
||
filterWindowWithLimit
(
pInfo
,
&
nextWin
,
tableGroupId
)
)
{
break
;
break
;
}
}
// null data, failed to allocate more memory buffer
// null data, failed to allocate more memory buffer
...
@@ -963,6 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -963,6 +1036,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
pInfo
->
timeWindowInterpo
)
{
if
(
pInfo
->
timeWindowInterpo
)
{
saveDataBlockLastRow
(
pInfo
->
pPrevValues
,
pBlock
,
pInfo
->
pInterpCols
);
saveDataBlockLastRow
(
pInfo
->
pPrevValues
,
pBlock
,
pInfo
->
pInterpCols
);
}
}
return
false
;
}
}
void
doCloseWindow
(
SResultRowInfo
*
pResultRowInfo
,
const
SIntervalAggOperatorInfo
*
pInfo
,
SResultRow
*
pResult
)
{
void
doCloseWindow
(
SResultRowInfo
*
pResultRowInfo
,
const
SIntervalAggOperatorInfo
*
pInfo
,
SResultRow
*
pResult
)
{
...
@@ -1043,7 +1117,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -1043,7 +1117,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
pInfo
->
binfo
.
inputTsOrder
,
scanFlag
,
true
);
setInputDataBlock
(
pSup
,
pBlock
,
pInfo
->
binfo
.
inputTsOrder
,
scanFlag
,
true
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
)
;
if
(
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
))
break
;
}
}
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
binfo
.
outputTsOrder
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
binfo
.
outputTsOrder
);
...
@@ -1495,6 +1569,7 @@ void destroyIntervalOperatorInfo(void* param) {
...
@@ -1495,6 +1569,7 @@ void destroyIntervalOperatorInfo(void* param) {
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
cleanupGroupResInfo
(
&
pInfo
->
groupResInfo
);
colDataDestroy
(
&
pInfo
->
twAggSup
.
timeWindowData
);
colDataDestroy
(
&
pInfo
->
twAggSup
.
timeWindowData
);
destroyBoundedQueue
(
pInfo
->
pBQ
);
taosMemoryFreeClear
(
param
);
taosMemoryFreeClear
(
param
);
}
}
...
@@ -1658,6 +1733,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
...
@@ -1658,6 +1733,17 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
pInfo
->
interval
=
interval
;
pInfo
->
interval
=
interval
;
pInfo
->
twAggSup
=
as
;
pInfo
->
twAggSup
=
as
;
pInfo
->
binfo
.
mergeResultBlock
=
pPhyNode
->
window
.
mergeDataBlock
;
pInfo
->
binfo
.
mergeResultBlock
=
pPhyNode
->
window
.
mergeDataBlock
;
if
(
pPhyNode
->
window
.
node
.
pLimit
)
{
SLimitNode
*
pLimit
=
(
SLimitNode
*
)
pPhyNode
->
window
.
node
.
pLimit
;
pInfo
->
limited
=
true
;
pInfo
->
limit
=
pLimit
->
limit
+
pLimit
->
offset
;
}
if
(
pPhyNode
->
window
.
node
.
pSlimit
)
{
SLimitNode
*
pLimit
=
(
SLimitNode
*
)
pPhyNode
->
window
.
node
.
pSlimit
;
pInfo
->
slimited
=
true
;
pInfo
->
slimit
=
pLimit
->
limit
+
pLimit
->
offset
;
pInfo
->
curGroupId
=
UINT64_MAX
;
}
if
(
pPhyNode
->
window
.
pExprs
!=
NULL
)
{
if
(
pPhyNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalar
=
0
;
int32_t
numOfScalar
=
0
;
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
68e0fb92
...
@@ -847,7 +847,6 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
...
@@ -847,7 +847,6 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
:
(
pSelect
->
hasTimeLineFunc
?
getRequireDataOrder
(
true
,
pSelect
)
:
DATA_ORDER_LEVEL_IN_BLOCK
);
:
(
pSelect
->
hasTimeLineFunc
?
getRequireDataOrder
(
true
,
pSelect
)
:
DATA_ORDER_LEVEL_IN_BLOCK
);
pWindow
->
node
.
resultDataOrder
=
pWindow
->
node
.
resultDataOrder
=
pCxt
->
pPlanCxt
->
streamQuery
?
DATA_ORDER_LEVEL_GLOBAL
:
getRequireDataOrder
(
true
,
pSelect
);
pCxt
->
pPlanCxt
->
streamQuery
?
DATA_ORDER_LEVEL_GLOBAL
:
getRequireDataOrder
(
true
,
pSelect
);
pWindow
->
pTspk
=
nodesCloneNode
(
pInterval
->
pCol
);
pWindow
->
pTspk
=
nodesCloneNode
(
pInterval
->
pCol
);
if
(
NULL
==
pWindow
->
pTspk
)
{
if
(
NULL
==
pWindow
->
pTspk
)
{
nodesDestroyNode
((
SNode
*
)
pWindow
);
nodesDestroyNode
((
SNode
*
)
pWindow
);
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
68e0fb92
...
@@ -368,7 +368,7 @@ static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
...
@@ -368,7 +368,7 @@ static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
if
(
pScan
->
node
.
pParent
&&
nodeType
(
pScan
->
node
.
pParent
)
==
QUERY_NODE_LOGIC_PLAN_AGG
)
{
if
(
pScan
->
node
.
pParent
&&
nodeType
(
pScan
->
node
.
pParent
)
==
QUERY_NODE_LOGIC_PLAN_AGG
)
{
SAggLogicNode
*
pAgg
=
(
SAggLogicNode
*
)
pScan
->
node
.
pParent
;
SAggLogicNode
*
pAgg
=
(
SAggLogicNode
*
)
pScan
->
node
.
pParent
;
bool
withSlimit
=
pAgg
->
node
.
pSlimit
!=
NULL
||
(
pAgg
->
node
.
pParent
&&
pAgg
->
node
.
pParent
->
pSlimit
);
bool
withSlimit
=
pAgg
->
node
.
pSlimit
!=
NULL
||
(
pAgg
->
node
.
pParent
&&
pAgg
->
node
.
pParent
->
pSlimit
);
if
(
withSlimit
&&
isPartTableAgg
(
pAgg
))
{
if
(
withSlimit
&&
isPartTableAgg
(
pAgg
))
{
pScan
->
groupOrderScan
=
pAgg
->
node
.
forceCreateNonBlockingOptr
=
true
;
pScan
->
groupOrderScan
=
pAgg
->
node
.
forceCreateNonBlockingOptr
=
true
;
}
}
...
@@ -1562,11 +1562,33 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
...
@@ -1562,11 +1562,33 @@ static bool planOptNodeListHasTbname(SNodeList* pKeys) {
}
}
static
bool
partTagsIsOptimizableNode
(
SLogicNode
*
pNode
)
{
static
bool
partTagsIsOptimizableNode
(
SLogicNode
*
pNode
)
{
return
((
QUERY_NODE_LOGIC_PLAN_PARTITION
==
nodeType
(
pNode
)
||
bool
ret
=
1
==
LIST_LENGTH
(
pNode
->
pChildren
)
&&
(
QUERY_NODE_LOGIC_PLAN_AGG
==
nodeType
(
pNode
)
&&
NULL
!=
((
SAggLogicNode
*
)
pNode
)
->
pGroupKeys
&&
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
nodesListGetNode
(
pNode
->
pChildren
,
0
));
NULL
!=
((
SAggLogicNode
*
)
pNode
)
->
pAggFuncs
))
&&
if
(
!
ret
)
return
ret
;
1
==
LIST_LENGTH
(
pNode
->
pChildren
)
&&
switch
(
nodeType
(
pNode
))
{
QUERY_NODE_LOGIC_PLAN_SCAN
==
nodeType
(
nodesListGetNode
(
pNode
->
pChildren
,
0
)));
case
QUERY_NODE_LOGIC_PLAN_PARTITION
:
{
if
(
pNode
->
pParent
&&
nodeType
(
pNode
->
pParent
)
==
QUERY_NODE_LOGIC_PLAN_WINDOW
)
{
SWindowLogicNode
*
pWindow
=
(
SWindowLogicNode
*
)
pNode
->
pParent
;
if
(
pWindow
->
winType
==
WINDOW_TYPE_INTERVAL
)
{
// if interval has slimit, we push down partition node to scan, and scan will set groupOrderScan to true
// we want to skip groups of blocks after slimit satisfied
// if interval only has limit, we do not push down partition node to scan
// we want to get grouped output from partition node and make use of limit
// if no slimit and no limit, we push down partition node and groupOrderScan is false, cause we do not need
// group ordered output
if
(
!
pWindow
->
node
.
pSlimit
&&
pWindow
->
node
.
pLimit
)
ret
=
false
;
}
}
}
break
;
case
QUERY_NODE_LOGIC_PLAN_AGG
:
{
SAggLogicNode
*
pAgg
=
(
SAggLogicNode
*
)
pNode
;
ret
=
pAgg
->
pGroupKeys
&&
pAgg
->
pAggFuncs
;
}
break
;
default:
ret
=
false
;
break
;
}
return
ret
;
}
}
static
SNodeList
*
partTagsGetPartKeys
(
SLogicNode
*
pNode
)
{
static
SNodeList
*
partTagsGetPartKeys
(
SLogicNode
*
pNode
)
{
...
@@ -1707,6 +1729,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
...
@@ -1707,6 +1729,8 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
scanPathOptSetGroupOrderScan
(
pScan
);
scanPathOptSetGroupOrderScan
(
pScan
);
pParent
->
hasGroupKeyOptimized
=
true
;
pParent
->
hasGroupKeyOptimized
=
true
;
}
}
if
(
pNode
->
pParent
->
pSlimit
)
pScan
->
groupOrderScan
=
true
;
NODES_CLEAR_LIST
(
pNode
->
pChildren
);
NODES_CLEAR_LIST
(
pNode
->
pChildren
);
nodesDestroyNode
((
SNode
*
)
pNode
);
nodesDestroyNode
((
SNode
*
)
pNode
);
...
@@ -2660,23 +2684,79 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
...
@@ -2660,23 +2684,79 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
}
static
bool
pushDownLimitOptShouldBeOptimized
(
SLogicNode
*
pNode
)
{
static
bool
pushDownLimitOptShouldBeOptimized
(
SLogicNode
*
pNode
)
{
if
(
NULL
==
pNode
->
pLimit
||
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
if
(
(
NULL
==
pNode
->
pLimit
&&
pNode
->
pSlimit
==
NULL
)
||
1
!=
LIST_LENGTH
(
pNode
->
pChildren
))
{
return
false
;
return
false
;
}
}
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
);
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
);
// push down to sort node
if
(
pChild
->
pLimit
||
pChild
->
pSlimit
)
return
false
;
if
(
QUERY_NODE_LOGIC_PLAN_SORT
==
nodeType
(
pChild
))
{
// if we have pushed down, we skip it
if
(
pChild
->
pLimit
)
return
false
;
}
else
if
(
QUERY_NODE_LOGIC_PLAN_SCAN
!=
nodeType
(
pChild
)
||
QUERY_NODE_LOGIC_PLAN_SORT
==
nodeType
(
pNode
))
{
// push down to table scan node
// if pNode is sortNode, we skip push down limit info to table scan node
return
false
;
}
return
true
;
return
true
;
}
}
static
void
swapLimit
(
SLogicNode
*
pParent
,
SLogicNode
*
pChild
)
{
pChild
->
pLimit
=
pParent
->
pLimit
;
pParent
->
pLimit
=
NULL
;
}
static
void
cloneLimit
(
SLogicNode
*
pParent
,
SLogicNode
*
pChild
)
{
SLimitNode
*
pLimit
=
NULL
;
if
(
pParent
->
pLimit
)
{
pChild
->
pLimit
=
nodesCloneNode
(
pParent
->
pLimit
);
pLimit
=
(
SLimitNode
*
)
pChild
->
pLimit
;
pLimit
->
limit
+=
pLimit
->
offset
;
pLimit
->
offset
=
0
;
}
if
(
pParent
->
pSlimit
)
{
pChild
->
pSlimit
=
nodesCloneNode
(
pParent
->
pSlimit
);
pLimit
=
(
SLimitNode
*
)
pChild
->
pSlimit
;
pLimit
->
limit
+=
pLimit
->
offset
;
pLimit
->
offset
=
0
;
}
}
static
bool
pushDownLimitHow
(
SLogicNode
*
pNodeWithLimit
,
SLogicNode
*
pNodeLimitPushTo
);
static
bool
pushDownLimitTo
(
SLogicNode
*
pNodeWithLimit
,
SLogicNode
*
pNodeLimitPushTo
)
{
switch
(
nodeType
(
pNodeLimitPushTo
))
{
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
{
SWindowLogicNode
*
pWindow
=
(
SWindowLogicNode
*
)
pNodeLimitPushTo
;
if
(
pWindow
->
winType
!=
WINDOW_TYPE_INTERVAL
)
break
;
cloneLimit
(
pNodeWithLimit
,
pNodeLimitPushTo
);
return
true
;
}
case
QUERY_NODE_LOGIC_PLAN_FILL
:
case
QUERY_NODE_LOGIC_PLAN_SORT
:
{
cloneLimit
(
pNodeWithLimit
,
pNodeLimitPushTo
);
SNode
*
pChild
=
NULL
;
FOREACH
(
pChild
,
pNodeLimitPushTo
->
pChildren
)
{
pushDownLimitHow
(
pNodeLimitPushTo
,
(
SLogicNode
*
)
pChild
);
}
return
true
;
}
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
if
(
nodeType
(
pNodeWithLimit
)
==
QUERY_NODE_LOGIC_PLAN_PROJECT
&&
pNodeWithLimit
->
pLimit
)
{
swapLimit
(
pNodeWithLimit
,
pNodeLimitPushTo
);
return
true
;
}
default:
break
;
}
return
false
;
}
static
bool
pushDownLimitHow
(
SLogicNode
*
pNodeWithLimit
,
SLogicNode
*
pNodeLimitPushTo
)
{
switch
(
nodeType
(
pNodeWithLimit
))
{
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
case
QUERY_NODE_LOGIC_PLAN_FILL
:
return
pushDownLimitTo
(
pNodeWithLimit
,
pNodeLimitPushTo
);
case
QUERY_NODE_LOGIC_PLAN_SORT
:
{
SSortLogicNode
*
pSort
=
(
SSortLogicNode
*
)
pNodeWithLimit
;
if
(
sortPriKeyOptIsPriKeyOrderBy
(
pSort
->
pSortKeys
))
return
pushDownLimitTo
(
pNodeWithLimit
,
pNodeLimitPushTo
);
}
default:
break
;
}
return
false
;
}
static
int32_t
pushDownLimitOptimize
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
)
{
static
int32_t
pushDownLimitOptimize
(
SOptimizeContext
*
pCxt
,
SLogicSubplan
*
pLogicSubplan
)
{
SLogicNode
*
pNode
=
optFindPossibleNode
(
pLogicSubplan
->
pNode
,
pushDownLimitOptShouldBeOptimized
);
SLogicNode
*
pNode
=
optFindPossibleNode
(
pLogicSubplan
->
pNode
,
pushDownLimitOptShouldBeOptimized
);
if
(
NULL
==
pNode
)
{
if
(
NULL
==
pNode
)
{
...
@@ -2685,17 +2765,9 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
...
@@ -2685,17 +2765,9 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
);
SLogicNode
*
pChild
=
(
SLogicNode
*
)
nodesListGetNode
(
pNode
->
pChildren
,
0
);
nodesDestroyNode
(
pChild
->
pLimit
);
nodesDestroyNode
(
pChild
->
pLimit
);
if
(
QUERY_NODE_LOGIC_PLAN_SORT
==
nodeType
(
pChild
))
{
if
(
pushDownLimitHow
(
pNode
,
pChild
))
{
pChild
->
pLimit
=
nodesCloneNode
(
pNode
->
pLimit
);
pCxt
->
optimized
=
true
;
SLimitNode
*
pLimit
=
(
SLimitNode
*
)
pChild
->
pLimit
;
pLimit
->
limit
+=
pLimit
->
offset
;
pLimit
->
offset
=
0
;
}
else
{
pChild
->
pLimit
=
pNode
->
pLimit
;
pNode
->
pLimit
=
NULL
;
}
}
pCxt
->
optimized
=
true
;
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
...
@@ -2996,6 +3068,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
...
@@ -2996,6 +3068,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.
pName
=
"sortNonPriKeyOptimize"
,
.
optimizeFunc
=
sortNonPriKeyOptimize
},
{.
pName
=
"sortNonPriKeyOptimize"
,
.
optimizeFunc
=
sortNonPriKeyOptimize
},
{.
pName
=
"SortPrimaryKey"
,
.
optimizeFunc
=
sortPrimaryKeyOptimize
},
{.
pName
=
"SortPrimaryKey"
,
.
optimizeFunc
=
sortPrimaryKeyOptimize
},
{.
pName
=
"SmaIndex"
,
.
optimizeFunc
=
smaIndexOptimize
},
{.
pName
=
"SmaIndex"
,
.
optimizeFunc
=
smaIndexOptimize
},
{.
pName
=
"PushDownLimit"
,
.
optimizeFunc
=
pushDownLimitOptimize
},
{.
pName
=
"PartitionTags"
,
.
optimizeFunc
=
partTagsOptimize
},
{.
pName
=
"PartitionTags"
,
.
optimizeFunc
=
partTagsOptimize
},
{.
pName
=
"MergeProjects"
,
.
optimizeFunc
=
mergeProjectsOptimize
},
{.
pName
=
"MergeProjects"
,
.
optimizeFunc
=
mergeProjectsOptimize
},
{.
pName
=
"EliminateProject"
,
.
optimizeFunc
=
eliminateProjOptimize
},
{.
pName
=
"EliminateProject"
,
.
optimizeFunc
=
eliminateProjOptimize
},
...
@@ -3004,7 +3077,6 @@ static const SOptimizeRule optimizeRuleSet[] = {
...
@@ -3004,7 +3077,6 @@ static const SOptimizeRule optimizeRuleSet[] = {
{.
pName
=
"RewriteUnique"
,
.
optimizeFunc
=
rewriteUniqueOptimize
},
{.
pName
=
"RewriteUnique"
,
.
optimizeFunc
=
rewriteUniqueOptimize
},
{.
pName
=
"LastRowScan"
,
.
optimizeFunc
=
lastRowScanOptimize
},
{.
pName
=
"LastRowScan"
,
.
optimizeFunc
=
lastRowScanOptimize
},
{.
pName
=
"TagScan"
,
.
optimizeFunc
=
tagScanOptimize
},
{.
pName
=
"TagScan"
,
.
optimizeFunc
=
tagScanOptimize
},
{.
pName
=
"PushDownLimit"
,
.
optimizeFunc
=
pushDownLimitOptimize
},
{.
pName
=
"TableCountScan"
,
.
optimizeFunc
=
tableCountScanOptimize
},
{.
pName
=
"TableCountScan"
,
.
optimizeFunc
=
tableCountScanOptimize
},
};
};
// clang-format on
// clang-format on
...
...
source/libs/planner/src/planSpliter.c
浏览文件 @
68e0fb92
...
@@ -498,6 +498,18 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
...
@@ -498,6 +498,18 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
}
}
break
;
break
;
}
}
case
QUERY_NODE_LOGIC_PLAN_WINDOW
:
{
SWindowLogicNode
*
pWindow
=
(
SWindowLogicNode
*
)
pNode
;
if
(
pMerge
->
node
.
pLimit
)
{
nodesDestroyNode
(
pMerge
->
node
.
pLimit
);
pMerge
->
node
.
pLimit
=
NULL
;
}
if
(
pMerge
->
node
.
pSlimit
)
{
nodesDestroyNode
(
pMerge
->
node
.
pSlimit
);
pMerge
->
node
.
pSlimit
=
NULL
;
}
break
;
}
default:
default:
break
;
break
;
}
}
...
...
tests/parallel_test/cases.task
浏览文件 @
68e0fb92
...
@@ -25,6 +25,7 @@
...
@@ -25,6 +25,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
...
...
tests/script/tsim/query/r/explain_tsorder.result
浏览文件 @
68e0fb92
此差异已折叠。
点击以展开。
tests/system-test/2-query/interval_limit_opt.py
0 → 100644
浏览文件 @
68e0fb92
import
taos
import
sys
import
time
import
socket
import
os
import
threading
import
math
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
# from tmqCommon import *
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
4
self
.
ctbNum
=
10
self
.
rowsPerTbl
=
10000
self
.
duraion
=
'1h'
def
init
(
self
,
conn
,
logSql
,
replicaVar
=
1
):
self
.
replicaVar
=
int
(
replicaVar
)
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
create_database
(
self
,
tsql
,
dbName
,
dropFlag
=
1
,
vgroups
=
2
,
replica
=
1
,
duration
:
str
=
'1d'
):
if
dropFlag
==
1
:
tsql
.
execute
(
"drop database if exists %s"
%
(
dbName
))
tsql
.
execute
(
"create database if not exists %s vgroups %d replica %d duration %s"
%
(
dbName
,
vgroups
,
replica
,
duration
))
tdLog
.
debug
(
"complete to create database %s"
%
(
dbName
))
return
def
create_stable
(
self
,
tsql
,
paraDict
):
colString
=
tdCom
.
gen_column_type_str
(
colname_prefix
=
paraDict
[
"colPrefix"
],
column_elm_list
=
paraDict
[
"colSchema"
])
tagString
=
tdCom
.
gen_tag_type_str
(
tagname_prefix
=
paraDict
[
"tagPrefix"
],
tag_elm_list
=
paraDict
[
"tagSchema"
])
sqlString
=
f
"create table if not exists %s.%s (%s) tags (%s)"
%
(
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
colString
,
tagString
)
tdLog
.
debug
(
"%s"
%
(
sqlString
))
tsql
.
execute
(
sqlString
)
return
def
create_ctable
(
self
,
tsql
=
None
,
dbName
=
'dbx'
,
stbName
=
'stb'
,
ctbPrefix
=
'ctb'
,
ctbNum
=
1
,
ctbStartIdx
=
0
):
for
i
in
range
(
ctbNum
):
sqlString
=
"create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)"
%
\
(
dbName
,
ctbPrefix
,
i
+
ctbStartIdx
,
dbName
,
stbName
,(
i
+
ctbStartIdx
)
%
5
,
i
+
ctbStartIdx
,
i
+
ctbStartIdx
,
i
+
ctbStartIdx
,
i
+
ctbStartIdx
,
i
+
ctbStartIdx
)
tsql
.
execute
(
sqlString
)
tdLog
.
debug
(
"complete to create %d child tables by %s.%s"
%
(
ctbNum
,
dbName
,
stbName
))
return
def
insert_data
(
self
,
tsql
,
dbName
,
ctbPrefix
,
ctbNum
,
rowsPerTbl
,
batchNum
,
startTs
,
tsStep
):
tdLog
.
debug
(
"start to insert data ............"
)
tsql
.
execute
(
"use %s"
%
dbName
)
pre_insert
=
"insert into "
sql
=
pre_insert
for
i
in
range
(
ctbNum
):
rowsBatched
=
0
sql
+=
" %s%d values "
%
(
ctbPrefix
,
i
)
for
j
in
range
(
rowsPerTbl
):
if
(
i
<
ctbNum
/
2
):
sql
+=
"(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "
%
(
startTs
+
j
*
tsStep
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
)
else
:
sql
+=
"(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "
%
(
startTs
+
j
*
tsStep
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
,
j
%
10
)
rowsBatched
+=
1
if
((
rowsBatched
==
batchNum
)
or
(
j
==
rowsPerTbl
-
1
)):
tsql
.
execute
(
sql
)
rowsBatched
=
0
if
j
<
rowsPerTbl
-
1
:
sql
=
"insert into %s%d values "
%
(
ctbPrefix
,
i
)
else
:
sql
=
"insert into "
if
sql
!=
pre_insert
:
tsql
.
execute
(
sql
)
tdLog
.
debug
(
"insert data ............ [OK]"
)
return
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'test'
,
'dropFlag'
:
1
,
'vgroups'
:
2
,
'stbName'
:
'meters'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'FLOAT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'smallint'
,
'count'
:
1
},{
'type'
:
'tinyint'
,
'count'
:
1
},{
'type'
:
'bool'
,
'count'
:
1
},{
'type'
:
'binary'
,
'len'
:
10
,
'count'
:
1
},{
'type'
:
'nchar'
,
'len'
:
10
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'nchar'
,
'len'
:
20
,
'count'
:
1
},{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'smallint'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
}],
'ctbPrefix'
:
't'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
3000
,
'startTs'
:
1537146000000
,
'tsStep'
:
600000
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tdLog
.
info
(
"create database"
)
self
.
create_database
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
dropFlag
=
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
self
.
replicaVar
,
duration
=
self
.
duraion
)
tdLog
.
info
(
"create stb"
)
self
.
create_stable
(
tsql
=
tdSql
,
paraDict
=
paraDict
)
tdLog
.
info
(
"create child tables"
)
self
.
create_ctable
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
\
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
\
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
self
.
insert_data
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
\
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
\
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
\
startTs
=
paraDict
[
"startTs"
],
tsStep
=
paraDict
[
"tsStep"
])
return
def
check_first_rows
(
self
,
all_rows
,
limited_rows
,
offset
:
int
=
0
):
for
i
in
range
(
0
,
len
(
limited_rows
)
-
1
):
if
limited_rows
[
i
]
!=
all_rows
[
i
+
offset
]:
tdLog
.
info
(
"row: %d, row in all: %s"
%
(
i
+
offset
+
1
,
str
(
all_rows
[
i
+
offset
])))
tdLog
.
info
(
"row: %d, row in limted: %s"
%
(
i
+
1
,
str
(
limited_rows
[
i
])))
tdLog
.
exit
(
"row data check failed"
)
tdLog
.
info
(
"all rows are the same as query without limit.."
)
def
query_and_check_with_slimit
(
self
,
sql
:
str
,
max_limit
:
int
,
step
:
int
,
offset
:
int
=
0
):
self
.
query_and_check_with_limit
(
sql
,
max_limit
,
step
,
offset
,
' slimit '
)
def
query_and_check_with_limit
(
self
,
sql
:
str
,
max_limit
:
int
,
step
:
int
,
offset
:
int
=
0
,
limit_str
:
str
=
' limit '
):
for
limit
in
range
(
0
,
max_limit
,
step
):
limited_sql
=
sql
+
limit_str
+
str
(
offset
)
+
","
+
str
(
limit
)
tdLog
.
info
(
"query with sql: %s "
%
(
sql
)
+
limit_str
+
" %d,%d"
%
(
offset
,
limit
))
all_rows
=
tdSql
.
getResult
(
sql
)
limited_rows
=
tdSql
.
getResult
(
limited_sql
)
tdLog
.
info
(
"all rows: %d, limited rows: %d"
%
(
len
(
all_rows
),
len
(
limited_rows
)))
if
limit_str
==
' limit '
:
if
limit
+
offset
<=
len
(
all_rows
)
and
len
(
limited_rows
)
!=
limit
:
tdLog
.
exit
(
"limited sql has less rows than limit value which is not right,
\
limit: %d, limited_rows: %d, all_rows: %d, offset: %d"
%
(
limit
,
len
(
limited_rows
),
len
(
all_rows
),
offset
))
elif
limit
+
offset
>
len
(
all_rows
)
and
offset
<
len
(
all_rows
)
and
offset
+
len
(
limited_rows
)
!=
len
(
all_rows
):
tdLog
.
exit
(
"limited sql has less rows than all_rows which is not right,
\
limit: %d, limited_rows: %d, all_rows: %d, offset: %d"
%
(
limit
,
len
(
limited_rows
),
len
(
all_rows
),
offset
))
elif
offset
>=
len
(
all_rows
)
and
len
(
limited_rows
)
!=
0
:
tdLog
.
exit
(
"limited rows should be zero,
\
limit: %d, limited_rows: %d, all_rows: %d, offset: %d"
%
(
limit
,
len
(
limited_rows
),
len
(
all_rows
),
offset
))
self
.
check_first_rows
(
all_rows
,
limited_rows
,
offset
)
def
test_interval_limit_asc
(
self
,
offset
:
int
=
0
):
sqls
=
[
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1s) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1m) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1h) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from meters interval(1d) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1s) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1m) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1h) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), first(ts) from t1 interval(1d) "
]
for
sql
in
sqls
:
self
.
query_and_check_with_limit
(
sql
,
5000
,
500
,
offset
)
def
test_interval_limit_desc
(
self
,
offset
:
int
=
0
):
sqls
=
[
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1s) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1m) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1h) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from meters interval(1d) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1s) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1m) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1h) "
,
"select _wstart, _wend, count(*), sum(c1), avg(c2), last(ts) from t1 interval(1d) "
]
for
sql
in
sqls
:
self
.
query_and_check_with_limit
(
sql
,
5000
,
500
,
offset
)
def
test_interval_limit_offset
(
self
):
for
offset
in
range
(
0
,
1000
,
500
):
self
.
test_interval_limit_asc
(
offset
)
self
.
test_interval_limit_desc
(
offset
)
self
.
test_interval_fill_limit
(
offset
)
self
.
test_interval_order_by_limit
(
offset
)
self
.
test_interval_partition_by_slimit
(
offset
)
def
test_interval_fill_limit
(
self
,
offset
:
int
=
0
):
sqls
=
[
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1s) fill(linear)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1m) fill(linear)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1h) fill(linear)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-09-17 09:30:00.000' interval(1d) fill(linear)"
]
for
sql
in
sqls
:
self
.
query_and_check_with_limit
(
sql
,
5000
,
1000
,
offset
)
def
test_interval_order_by_limit
(
self
,
offset
:
int
=
0
):
sqls
=
[
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by b"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), last(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a desc"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by count(*), sum(c1), a"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) order by a, count(*), sum(c1)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by b"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc"
,
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a desc"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by count(*), sum(c1), a"
,
"select _wstart as a, _wend as b, count(*), sum(c1), avg(c2), first(ts) from meters
\
where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' interval(1m) fill(linear) order by a, count(*), sum(c1)"
,
]
for
sql
in
sqls
:
self
.
query_and_check_with_limit
(
sql
,
6000
,
2000
,
offset
)
def
test_interval_partition_by_slimit
(
self
,
offset
:
int
=
0
):
sqls
=
[
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1m)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by t1 interval(1h)"
,
"select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts) from meters "
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m)"
,
]
for
sql
in
sqls
:
self
.
query_and_check_with_slimit
(
sql
,
10
,
2
,
offset
)
def
test_interval_partition_by_slimit_limit
(
self
):
sql
=
"select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters "
\
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 10 limit 2) order by c3 asc"
tdSql
.
query
(
sql
)
tdSql
.
checkRows
(
20
)
tdSql
.
checkData
(
0
,
4
,
0
)
tdSql
.
checkData
(
1
,
4
,
0
)
tdSql
.
checkData
(
2
,
4
,
1
)
tdSql
.
checkData
(
3
,
4
,
1
)
tdSql
.
checkData
(
18
,
4
,
9
)
tdSql
.
checkData
(
19
,
4
,
9
)
sql
=
"select * from (select _wstart as a, _wend as b, count(*), sum(c1), last(c2), first(ts),c3 from meters "
\
"where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-10-17 09:30:00.000' partition by c3 interval(1m) slimit 2,2 limit 2) order by c3 asc"
tdSql
.
query
(
sql
)
tdSql
.
checkRows
(
4
)
tdSql
.
checkData
(
0
,
4
,
2
)
tdSql
.
checkData
(
1
,
4
,
2
)
tdSql
.
checkData
(
2
,
4
,
9
)
tdSql
.
checkData
(
3
,
4
,
9
)
def
run
(
self
):
self
.
prepareTestEnv
()
self
.
test_interval_limit_offset
()
self
.
test_interval_partition_by_slimit_limit
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录