Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bf18385e
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bf18385e
编写于
7月 25, 2022
作者:
X
Xiaoyu Wang
浏览文件
操作
浏览文件
下载
差异文件
merge szhou/fix/udf
上级
015193e8
f208b507
变更
22
展开全部
隐藏空白更改
内联
并排
Showing
22 changed file
with
816 addition
and
693 deletion
+816
-693
.gitignore
.gitignore
+1
-0
include/common/tcommon.h
include/common/tcommon.h
+4
-5
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+226
-537
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+0
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-4
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+36
-32
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+4
-2
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+3
-8
source/libs/parser/inc/parAst.h
source/libs/parser/inc/parAst.h
+1
-1
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+2
-1
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+10
-6
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+2
-0
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
+250
-0
tests/system-test/7-tmq/dataFromTsdbNWal.py
tests/system-test/7-tmq/dataFromTsdbNWal.py
+24
-27
tests/system-test/7-tmq/tmqDnodeRestart.py
tests/system-test/7-tmq/tmqDnodeRestart.py
+1
-36
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
+225
-0
tests/system-test/7-tmq/tmqDropNtb-snapshot1.py
tests/system-test/7-tmq/tmqDropNtb-snapshot1.py
+12
-12
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+5
-3
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
.gitignore
浏览文件 @
bf18385e
...
...
@@ -22,6 +22,7 @@ mac/
.mypy_cache
*.tmp
*.swp
*.swo
*.orig
src/connector/nodejs/node_modules/
src/connector/nodejs/out/
...
...
include/common/tcommon.h
浏览文件 @
bf18385e
...
...
@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond {
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
SColumnInfo
*
colList
;
int32_t
type
;
// data block load type:
// int32_t numOfTWindows;
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
int32_t
type
;
// data block load type:
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
endVersion
;
}
SQueryTableDataCond
;
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
bf18385e
...
...
@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn
->
numOfQueries
=
pBasic
->
queryDesc
?
taosArrayGetSize
(
pBasic
->
queryDesc
)
:
0
;
pBasic
->
queryDesc
=
NULL
;
mDebug
(
"queries updated in conn %
d
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
mDebug
(
"queries updated in conn %
u
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
taosWUnLockLatch
(
&
pConn
->
queryLock
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
bf18385e
...
...
@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb;
typedef
struct
STsdbReader
STsdbReader
;
#define BLOCK_LOAD_OFFSET_ORDER 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define TIMEWINDOW_RANGE_CONTAINED 1
#define TIMEWINDOW_RANGE_EXTERNAL 2
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
bf18385e
此差异已折叠。
点击以展开。
source/libs/executor/inc/executil.h
浏览文件 @
bf18385e
...
...
@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
bf18385e
...
...
@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder
*
pRecoder
;
uint64_t
elapsedTime
;
uint64_t
firstStageMergeTime
;
uint64_t
winInfoSize
;
uint64_t
tableInfoSize
;
uint64_t
hashSize
;
...
...
@@ -351,9 +350,7 @@ typedef struct STableMergeScanInfo {
SArray
*
pColMatchInfo
;
int32_t
numOfOutput
;
SExprInfo
*
pPseudoExpr
;
int32_t
numOfPseudoExpr
;
SqlFunctionCtx
*
pPseudoCtx
;
SExprSupp
pseudoSup
;
SQueryTableDataCond
cond
;
int32_t
scanFlag
;
// table scan flag to denote if it is a repeat/reverse/main scan
...
...
@@ -592,6 +589,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
SNode
*
pCondition
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
bf18385e
...
...
@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
}
}
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
// do nothing
}
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
...
...
@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
0
;
if
(
pNodeList
!=
NULL
)
{
numOfCols
=
LIST_LENGTH
(
pNodeList
);
}
else
{
numOfCols
=
0
;
}
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
if
(
pList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SSlotDescNode
*
pDescNode
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pNode
->
pSlots
,
i
);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData
idata
=
createColumnInfoData
(
pDescNode
->
dataType
.
type
,
pDescNode
->
dataType
.
bytes
,
pDescNode
->
slotId
);
idata
.
info
.
scale
=
pDescNode
->
dataType
.
scale
;
...
...
@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
}
#ifdef BUF_PAGE_DEBUG
qDebug
(
"page_setSelect num:%d"
,
num
);
#endif
if
(
p
!=
NULL
)
{
p
->
subsidiaries
.
pCtx
=
pValCtx
;
p
->
subsidiaries
.
num
=
num
;
...
...
@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node
pCond
->
twindows
=
pTableScanNode
->
scanRange
;
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
// pCond->type = pTableScanNode->scanFlag;
...
...
@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
}
// get the correct time window according to the handled timestamp
// todo refactor
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
)
{
STimeWindow
w
=
{
0
};
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
bf18385e
...
...
@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize;
// add the merge time
pSummary
->
elapsedTime
+=
pSummary
->
firstStageMergeTime
;
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
...
...
@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// }
//
// calculateOperatorProfResults(pQInfo);
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
" us, total blocks:%d,
"
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pRecorder
->
totalBlock
s
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
qDebug
(
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%
"
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRow
s
,
pRecorder
->
totalCheckedRows
);
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...
...
@@ -3036,7 +3032,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -3334,6 +3329,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if
(
pLimitInfo
->
currentGroupId
==
0
||
pLimitInfo
->
currentGroupId
==
pBlock
->
info
.
groupId
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
...
...
@@ -3342,6 +3338,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
}
}
...
...
@@ -3386,10 +3383,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if
(
pLimitInfo
->
remainOffset
>=
pInfo
->
pRes
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pInfo
->
pRes
->
info
.
rows
;
blockDataCleanup
(
pInfo
->
pRes
);
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pInfo
->
pRes
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pInfo
->
pRes
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
}
// check for the limitation in each group
...
...
@@ -3397,6 +3396,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pLimitInfo
->
numOfOutputRows
+
pInfo
->
pRes
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pInfo
->
pRes
,
keepRows
);
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
}
...
...
@@ -3406,27 +3406,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break
;
}
// no results generated
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
(
!
pProjectInfo
->
mergeDataBlocks
))
{
break
;
}
if
(
pProjectInfo
->
mergeDataBlocks
)
{
pFinalRes
->
info
.
groupId
=
pInfo
->
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pInfo
->
pRes
->
info
.
version
;
if
(
pProjectInfo
->
mergeDataBlocks
&&
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
)
{
if
(
pRes
->
info
.
rows
>
0
)
{
pFinalRes
->
info
.
groupId
=
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pRes
->
info
.
version
;
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pInfo
->
pRes
);
if
(
pFinalRes
->
info
.
rows
+
pInfo
->
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pRes
);
if
(
pFinalRes
->
info
.
rows
+
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
}
}
}
// do apply filter
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
doFilter
(
pProjectInfo
->
pFilterNode
,
p
,
NULL
);
if
(
p
->
info
.
rows
>
0
)
{
// do apply filter
doFilter
(
pProjectInfo
->
pFilterNode
,
pFinalRes
,
NULL
);
if
(
pFinalRes
->
info
.
rows
>
0
||
pRes
->
info
.
rows
==
0
)
{
break
;
}
}
else
{
// do apply filter
if
(
pRes
->
info
.
rows
>
0
)
{
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
==
0
)
{
continue
;
}
}
break
;
}
}
...
...
@@ -3890,8 +3895,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
...
...
@@ -4422,7 +4426,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond
->
twindows
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pCond
->
suid
=
uid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
bf18385e
...
...
@@ -2934,6 +2934,7 @@ void destroyTableMergeScanOperatorInfo(void* param, int32_t numOfOutput) {
pTableScanInfo
->
pSortInputBlock
=
blockDataDestroy
(
pTableScanInfo
->
pSortInputBlock
);
taosArrayDestroy
(
pTableScanInfo
->
pSortInfo
);
cleanupExprSupp
(
&
pTableScanInfo
->
pseudoSup
);
taosMemoryFreeClear
(
pTableScanInfo
->
rowEntryInfoOffset
);
taosMemoryFreeClear
(
param
);
...
...
@@ -2992,8 +2993,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
}
if
(
pTableScanNode
->
scan
.
pScanPseudoCols
!=
NULL
)
{
pInfo
->
pPseudoExpr
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pInfo
->
numOfPseudoExpr
);
pInfo
->
pPseudoCtx
=
createSqlFunctionCtx
(
pInfo
->
pPseudoExpr
,
pInfo
->
numOfPseudoExpr
,
&
pInfo
->
rowEntryInfoOffset
);
SExprSupp
*
pSup
=
&
pInfo
->
pseudoSup
;
pSup
->
pExprInfo
=
createExprInfo
(
pTableScanNode
->
scan
.
pScanPseudoCols
,
NULL
,
&
pSup
->
numOfExprs
);
pSup
->
pCtx
=
createSqlFunctionCtx
(
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
&
pSup
->
rowEntryInfoOffset
);
}
pInfo
->
scanInfo
=
(
SScanInfo
){.
numOfAsc
=
pTableScanNode
->
scanSeq
[
0
],
.
numOfDesc
=
pTableScanNode
->
scanSeq
[
1
]};
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
bf18385e
...
...
@@ -1094,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
}
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
OPTR_SET_OPENED
(
pOperator
);
...
...
@@ -1250,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -2045,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
...
@@ -2209,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
blockDataEnsureCapacity
(
pResBlock
,
pOperator
->
resultInfo
.
capacity
);
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
...
...
@@ -2350,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
interval
.
interval
=
pInterpPhyNode
->
interval
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
...
...
source/libs/parser/inc/parAst.h
浏览文件 @
bf18385e
...
...
@@ -90,7 +90,7 @@ SNode* createValueNode(SAstCreateContext* pCxt, int32_t dataType, const SToken*
SNode
*
createDurationValueNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLiteral
);
SNode
*
createDefaultDatabaseCondValue
(
SAstCreateContext
*
pCxt
);
SNode
*
createPlaceholderValueNode
(
SAstCreateContext
*
pCxt
,
const
SToken
*
pLiteral
);
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
const
SToken
*
pAlias
);
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
SToken
*
pAlias
);
SNode
*
createLogicConditionNode
(
SAstCreateContext
*
pCxt
,
ELogicConditionType
type
,
SNode
*
pParam1
,
SNode
*
pParam2
);
SNode
*
createOperatorNode
(
SAstCreateContext
*
pCxt
,
EOperatorType
type
,
SNode
*
pLeft
,
SNode
*
pRight
);
SNode
*
createBetweenAnd
(
SAstCreateContext
*
pCxt
,
SNode
*
pExpr
,
SNode
*
pLeft
,
SNode
*
pRight
);
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
bf18385e
...
...
@@ -637,8 +637,9 @@ SNode* createInterpTimeRange(SAstCreateContext* pCxt, SNode* pStart, SNode* pEnd
return
createBetweenAnd
(
pCxt
,
createPrimaryKeyCol
(
pCxt
),
pStart
,
pEnd
);
}
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
const
SToken
*
pAlias
)
{
SNode
*
setProjectionAlias
(
SAstCreateContext
*
pCxt
,
SNode
*
pNode
,
SToken
*
pAlias
)
{
CHECK_PARSER_STATUS
(
pCxt
);
trimEscape
(
pAlias
);
int32_t
len
=
TMIN
(
sizeof
(((
SExprNode
*
)
pNode
)
->
aliasName
)
-
1
,
pAlias
->
n
);
strncpy
(((
SExprNode
*
)
pNode
)
->
aliasName
,
pAlias
->
z
,
len
);
((
SExprNode
*
)
pNode
)
->
aliasName
[
len
]
=
'\0'
;
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
bf18385e
...
...
@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
query
End
)
{
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
query
Stop
)
{
int32_t
code
=
0
;
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
...
...
@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_ERR_RET
(
qwHandleTaskComplete
(
QW_FPARAMS
(),
ctx
));
if
(
query
End
)
{
*
query
End
=
true
;
if
(
query
Stop
)
{
*
query
Stop
=
true
;
}
break
;
...
...
@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
rows
,
qcontinue
);
if
(
!
qcontinue
)
{
if
(
queryStop
)
{
*
queryStop
=
true
;
}
break
;
}
...
...
@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput
input
=
{
0
};
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
bool
query
End
=
false
;
bool
query
Stop
=
false
;
do
{
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
NULL
));
...
...
@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryInQueue
,
0
);
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryContinue
,
0
);
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
query
End
));
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
query
Stop
));
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
SOutputData
sOutput
=
{
0
};
...
...
@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
query
End
||
code
||
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryContinue
))
{
if
(
query
Stop
||
code
||
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryContinue
))
{
// Note: query is not running anymore
QW_SET_PHASE
(
ctx
,
0
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
bf18385e
...
...
@@ -1327,6 +1327,8 @@ class Task():
# TDengine 3.0 Error Codes:
0x0333
,
# Object is creating # TODO: this really is NOT an acceptable error
0x0369
,
# Tag already exists
0x0388
,
# Database not exist
0x03A0
,
# STable already exists
0x03A1
,
# STable [does] not exist
0x03AA
,
# Tag already exists
...
...
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
0 → 100644
浏览文件 @
bf18385e
import
taos
import
sys
import
time
import
socket
import
os
import
threading
import
math
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"flush db to let data falls into the disk"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# after start consume, continue insert some data
paraDict
[
'batchNum'
]
=
100
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
self
.
rowsPerTbl
pInsertThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
pInsertThread
.
join
()
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
totalRowsInserted
=
expectRowsList
[
0
]
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
1
expectrowcnt
=
math
.
ceil
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
/
3
)
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 0"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
(
expectrowcnt
<=
actConsumeRows
and
totalRowsInserted
>=
actConsumeRows
):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
consumerId
=
2
expectrowcnt
=
math
.
ceil
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
/
3
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 1"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect rows: %d, act insert rows: %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
((
actConsumeRows
>=
expectrowcnt
)
and
(
totalRowsInserted
>
actConsumeRows
)):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/dataFromTsdbNWal.py
浏览文件 @
bf18385e
...
...
@@ -17,8 +17,8 @@ from tmqCommon import *
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
ctbNum
=
1
00
self
.
vgroups
=
4
self
.
ctbNum
=
1
self
.
rowsPerTbl
=
10000
def
init
(
self
,
conn
,
logSql
):
...
...
@@ -38,9 +38,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
00
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
30
00
,
'batchNum'
:
1
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
...
...
@@ -85,7 +85,7 @@ class TDTestCase:
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
...
...
@@ -117,17 +117,16 @@ class TDTestCase:
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# after start consume, continue insert some data
paraDict
[
'batchNum'
]
=
100
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
self
.
rowsPerTbl
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
pInsertThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
#
pInsertThread
.
join
()
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
...
...
@@ -135,15 +134,16 @@ class TDTestCase:
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
time
.
sleep
(
10
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
...
...
@@ -204,13 +204,12 @@ class TDTestCase:
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
not
(
expectrowcnt
<=
resultList
[
0
]
and
totalRowsInserted
>=
resultList
[
0
]):
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
resultList
[
0
],
expectrowcnt
,
totalRowsInserted
))
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
(
expectrowcnt
<=
actConsumeRows
and
totalRowsInserted
>=
actConsumeRows
):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
firstConsumeRows
=
resultList
[
0
]
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
consumerId
=
2
...
...
@@ -224,15 +223,13 @@ class TDTestCase:
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeTotalRows
=
firstConsumeRows
+
resultList
[
0
]
if
not
(
expectrowcnt
>=
resultList
[
0
]
and
totalRowsInserted
==
actConsumeTotalRows
):
tdLog
.
info
(
"act consume rows, first: %d, second: %d "
%
(
firstConsumeRows
,
resultList
[
0
]))
tdLog
.
info
(
"and sum of two consume rows: %d should be equal to total inserted rows: %d"
%
(
actConsumeTotalRows
,
totalRowsInserted
))
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect rows: %d, act insert rows: %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
((
actConsumeRows
>=
expectrowcnt
)
and
(
totalRowsInserted
>
actConsumeRows
)):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
...
...
@@ -241,7 +238,7 @@ class TDTestCase:
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
#
self.tmqCase2()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
...
...
tests/system-test/7-tmq/tmqDnodeRestart.py
浏览文件 @
bf18385e
...
...
@@ -151,41 +151,6 @@ class TDTestCase:
if
not
(
totalConsumeRows
==
totalRowsFromQury
):
tdLog
.
exit
(
"tmq consume rows error!"
)
# tdLog.info("****************************************************************************")
# tmqCom.initConsumerTable()
# consumerId = 1
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
# topicList = topicFromStb1
# ifcheckdata = 0
# ifManualCommit = 0
# keyList = 'group.id:cgrp2,\
# enable.auto.commit:true,\
# auto.commit.interval.ms:3000,\
# auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# tdLog.info("start consume processor")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
# totalConsumeRows = 0
# for i in range(expectRows):
# totalConsumeRows += resultList[i]
# tdSql.query(queryString)
# totalRowsFromQury = tdSql.getRows()
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
# if not (totalConsumeRows == totalRowsFromQury):
# tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromStb1
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
...
...
@@ -259,7 +224,7 @@ class TDTestCase:
tdLog
.
info
(
"create some new child table and insert data "
)
paraDict
[
"batchNum"
]
=
100
paraDict
[
"ctbPrefix"
]
=
'newCtb'
#
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
...
...
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
0 → 100644
浏览文件 @
bf18385e
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
1000
self
.
rowsPerTbl
=
10
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
# drop some ntbs
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
0
elif
self
.
snapshot
==
1
:
consumerId
=
1
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
])
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
*
3
/
4
)
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
# drop some ntbs and create some new ntbs
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
2
elif
self
.
snapshot
==
1
:
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start create some new normal tables...."
)
paraDict
[
"ctbPrefix"
]
=
'newCtb'
paraDict
[
"ctbNum"
]
=
self
.
ctbNum
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into these new normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
/
2
*
(
1
+
3
/
4
))
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
self
.
tmqCase1
()
self
.
tmqCase2
()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
# self.tmqCase1()
# self.tmqCase2()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqDropNtb.py
→
tests/system-test/7-tmq/tmqDropNtb
-snapshot1
.py
浏览文件 @
bf18385e
...
...
@@ -18,7 +18,7 @@ class TDTestCase:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
ctbNum
=
100
0
self
.
rowsPerTbl
=
10
def
init
(
self
,
conn
,
logSql
):
...
...
@@ -39,9 +39,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
100
0
,
'batchNum'
:
100
0
,
'ctbNum'
:
100
0
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
5
,
...
...
@@ -125,9 +125,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
100
0
,
'batchNum'
:
100
0
,
'ctbNum'
:
100
0
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
10
,
...
...
@@ -203,16 +203,16 @@ class TDTestCase:
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
#
tdLog.printNoPrefix("=============================================")
#
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
#
self.snapshot = 0
# self.tmqCase1()
self
.
tmqCase2
()
#
self.tmqCase2()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
#
self.tmqCase1()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
...
...
tests/system-test/fulltest.sh
浏览文件 @
bf18385e
...
...
@@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#
python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
...
...
@@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqDropStbCtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqDropNtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/stbTagFilter-1ctb.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal-multiCtb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
#------------querPolicy 2-----------
...
...
taos-tools
@
0b8a3373
比较
9cfa1957
...
0b8a3373
Subproject commit
9cfa195713d1cae9edf417a8d49bde87dd971016
Subproject commit
0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录