Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f208b507
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f208b507
编写于
7月 25, 2022
作者:
S
shenglian zhou
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '2.0' of github.com:taosdata/TDengine into szhou/fix/udf
上级
9e9a12cb
e82b1260
变更
26
展开全部
隐藏空白更改
内联
并排
Showing
26 changed file
with
884 addition
and
729 deletion
+884
-729
.gitignore
.gitignore
+1
-0
include/common/tcommon.h
include/common/tcommon.h
+4
-5
source/common/src/tglobal.c
source/common/src/tglobal.c
+3
-6
source/dnode/mnode/impl/src/mndProfile.c
source/dnode/mnode/impl/src/mndProfile.c
+1
-1
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+2
-3
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+226
-537
source/libs/executor/inc/executil.h
source/libs/executor/inc/executil.h
+5
-2
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+1
-1
source/libs/executor/src/executil.c
source/libs/executor/src/executil.c
+4
-12
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+36
-32
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+7
-8
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+10
-6
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+2
-0
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+26
-12
source/libs/transport/src/transComm.c
source/libs/transport/src/transComm.c
+7
-0
source/libs/transport/src/transSvr.c
source/libs/transport/src/transSvr.c
+25
-21
tests/pytest/crash_gen/crash_gen_main.py
tests/pytest/crash_gen/crash_gen_main.py
+2
-0
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/parser/create_tb_with_tag_name.sim
tests/script/tsim/parser/create_tb_with_tag_name.sim
+3
-3
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
+250
-0
tests/system-test/7-tmq/dataFromTsdbNWal.py
tests/system-test/7-tmq/dataFromTsdbNWal.py
+24
-27
tests/system-test/7-tmq/tmqDnodeRestart.py
tests/system-test/7-tmq/tmqDnodeRestart.py
+1
-36
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
+225
-0
tests/system-test/7-tmq/tmqDropNtb-snapshot1.py
tests/system-test/7-tmq/tmqDropNtb-snapshot1.py
+12
-12
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+5
-3
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
.gitignore
浏览文件 @
f208b507
...
@@ -22,6 +22,7 @@ mac/
...
@@ -22,6 +22,7 @@ mac/
.mypy_cache
.mypy_cache
*.tmp
*.tmp
*.swp
*.swp
*.swo
*.orig
*.orig
src/connector/nodejs/node_modules/
src/connector/nodejs/node_modules/
src/connector/nodejs/out/
src/connector/nodejs/out/
...
...
include/common/tcommon.h
浏览文件 @
f208b507
...
@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond {
...
@@ -153,11 +153,10 @@ typedef struct SQueryTableDataCond {
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
order
;
// desc|asc order to iterate the data block
int32_t
numOfCols
;
int32_t
numOfCols
;
SColumnInfo
*
colList
;
SColumnInfo
*
colList
;
int32_t
type
;
// data block load type:
int32_t
type
;
// data block load type:
// int32_t numOfTWindows;
STimeWindow
twindows
;
STimeWindow
twindows
;
int64_t
startVersion
;
int64_t
startVersion
;
int64_t
endVersion
;
int64_t
endVersion
;
}
SQueryTableDataCond
;
}
SQueryTableDataCond
;
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
int32_t
tEncodeDataBlock
(
void
**
buf
,
const
SSDataBlock
*
pBlock
);
...
...
source/common/src/tglobal.c
浏览文件 @
f208b507
...
@@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
...
@@ -213,10 +213,6 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
memcpy
(
&
tsDiskCfg
[
index
],
pCfg
,
sizeof
(
SDiskCfg
));
memcpy
(
&
tsDiskCfg
[
index
],
pCfg
,
sizeof
(
SDiskCfg
));
if
(
pCfg
->
level
==
0
&&
pCfg
->
primary
==
1
)
{
if
(
pCfg
->
level
==
0
&&
pCfg
->
primary
==
1
)
{
tstrncpy
(
tsDataDir
,
pCfg
->
dir
,
PATH_MAX
);
tstrncpy
(
tsDataDir
,
pCfg
->
dir
,
PATH_MAX
);
if
(
taosMulMkDir
(
tsDataDir
)
!=
0
)
{
uError
(
"failed to create dataDir:%s since %s"
,
tsDataDir
,
terrstr
());
return
-
1
;
}
}
}
if
(
taosMulMkDir
(
pCfg
->
dir
)
!=
0
)
{
if
(
taosMulMkDir
(
pCfg
->
dir
)
!=
0
)
{
uError
(
"failed to create tfsDir:%s since %s"
,
tsDataDir
,
terrstr
());
uError
(
"failed to create tfsDir:%s since %s"
,
tsDataDir
,
terrstr
());
...
@@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
...
@@ -227,12 +223,13 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
if
(
tsDataDir
[
0
]
==
0
)
{
if
(
tsDataDir
[
0
]
==
0
)
{
if
(
pItem
->
str
!=
NULL
)
{
if
(
pItem
->
str
!=
NULL
)
{
taosAddDataDir
(
0
,
pItem
->
str
,
0
,
1
);
taosAddDataDir
(
tsDiskCfgNum
,
pItem
->
str
,
0
,
1
);
tstrncpy
(
tsDataDir
,
pItem
->
str
,
PATH_MAX
);
tstrncpy
(
tsDataDir
,
pItem
->
str
,
PATH_MAX
);
if
(
taosMulMkDir
(
tsDataDir
)
!=
0
)
{
if
(
taosMulMkDir
(
tsDataDir
)
!=
0
)
{
uError
(
"failed to create
data
Dir:%s since %s"
,
tsDataDir
,
terrstr
());
uError
(
"failed to create
tfs
Dir:%s since %s"
,
tsDataDir
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
tsDiskCfgNum
++
;
}
else
{
}
else
{
uError
(
"datadir not set"
);
uError
(
"datadir not set"
);
return
-
1
;
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndProfile.c
浏览文件 @
f208b507
...
@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
...
@@ -312,7 +312,7 @@ static int32_t mndSaveQueryList(SConnObj *pConn, SQueryHbReqBasic *pBasic) {
pConn
->
numOfQueries
=
pBasic
->
queryDesc
?
taosArrayGetSize
(
pBasic
->
queryDesc
)
:
0
;
pConn
->
numOfQueries
=
pBasic
->
queryDesc
?
taosArrayGetSize
(
pBasic
->
queryDesc
)
:
0
;
pBasic
->
queryDesc
=
NULL
;
pBasic
->
queryDesc
=
NULL
;
mDebug
(
"queries updated in conn %
d
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
mDebug
(
"queries updated in conn %
u
, num:%d"
,
pConn
->
id
,
pConn
->
numOfQueries
);
taosWUnLockLatch
(
&
pConn
->
queryLock
);
taosWUnLockLatch
(
&
pConn
->
queryLock
);
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
f208b507
...
@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
...
@@ -118,9 +118,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
// typedef struct STsdb STsdb;
// typedef struct STsdb STsdb;
typedef
struct
STsdbReader
STsdbReader
;
typedef
struct
STsdbReader
STsdbReader
;
#define BLOCK_LOAD_OFFSET_ORDER 1
#define TIMEWINDOW_RANGE_CONTAINED 1
#define BLOCK_LOAD_TABLESEQ_ORDER 2
#define TIMEWINDOW_RANGE_EXTERNAL 2
#define BLOCK_LOAD_EXTERN_ORDER 3
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
f208b507
此差异已折叠。
点击以展开。
source/libs/executor/inc/executil.h
浏览文件 @
f208b507
...
@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
...
@@ -82,8 +82,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
cleanupResultRowInfo
(
SResultRowInfo
*
pResultRowInfo
);
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
initResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
void
closeResultRow
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
bool
isResultRowClosed
(
SResultRow
*
pResultRow
);
...
@@ -96,6 +94,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
...
@@ -96,6 +94,11 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return
pRow
;
return
pRow
;
}
}
static
FORCE_INLINE
void
setResultBufPageDirty
(
SDiskbasedBuf
*
pBuf
,
SResultRowPosition
*
pos
)
{
void
*
pPage
=
getBufPage
(
pBuf
,
pos
->
pageId
);
setBufPageDirty
(
pPage
,
true
);
}
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pHashmap
,
int32_t
order
);
void
initGroupedResultInfo
(
SGroupResInfo
*
pGroupResInfo
,
SHashObj
*
pHashmap
,
int32_t
order
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
void
cleanupGroupResInfo
(
SGroupResInfo
*
pGroupResInfo
);
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
f208b507
...
@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
...
@@ -108,7 +108,6 @@ typedef struct STaskCostInfo {
SFileBlockLoadRecorder
*
pRecoder
;
SFileBlockLoadRecorder
*
pRecoder
;
uint64_t
elapsedTime
;
uint64_t
elapsedTime
;
uint64_t
firstStageMergeTime
;
uint64_t
winInfoSize
;
uint64_t
winInfoSize
;
uint64_t
tableInfoSize
;
uint64_t
tableInfoSize
;
uint64_t
hashSize
;
uint64_t
hashSize
;
...
@@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo {
...
@@ -549,6 +548,7 @@ typedef struct SProjectOperatorInfo {
SLimitInfo
limitInfo
;
SLimitInfo
limitInfo
;
bool
mergeDataBlocks
;
bool
mergeDataBlocks
;
SSDataBlock
*
pFinalRes
;
SSDataBlock
*
pFinalRes
;
SNode
*
pCondition
;
}
SProjectOperatorInfo
;
}
SProjectOperatorInfo
;
typedef
struct
SIndefOperatorInfo
{
typedef
struct
SIndefOperatorInfo
{
...
...
source/libs/executor/src/executil.c
浏览文件 @
f208b507
...
@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
...
@@ -43,10 +43,6 @@ void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo) {
}
}
}
}
void
closeAllResultRows
(
SResultRowInfo
*
pResultRowInfo
)
{
// do nothing
}
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
bool
isResultRowClosed
(
SResultRow
*
pRow
)
{
return
(
pRow
->
closed
==
true
);
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
void
closeResultRow
(
SResultRow
*
pResultRow
)
{
pResultRow
->
closed
=
true
;
}
...
@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
...
@@ -160,11 +156,13 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
SArray
*
createSortInfo
(
SNodeList
*
pNodeList
)
{
size_t
numOfCols
=
0
;
size_t
numOfCols
=
0
;
if
(
pNodeList
!=
NULL
)
{
if
(
pNodeList
!=
NULL
)
{
numOfCols
=
LIST_LENGTH
(
pNodeList
);
numOfCols
=
LIST_LENGTH
(
pNodeList
);
}
else
{
}
else
{
numOfCols
=
0
;
numOfCols
=
0
;
}
}
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
SArray
*
pList
=
taosArrayInit
(
numOfCols
,
sizeof
(
SBlockOrderInfo
));
if
(
pList
==
NULL
)
{
if
(
pList
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
...
@@ -196,10 +194,6 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
numOfCols
;
++
i
)
{
SSlotDescNode
*
pDescNode
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pNode
->
pSlots
,
i
);
SSlotDescNode
*
pDescNode
=
(
SSlotDescNode
*
)
nodesListGetNode
(
pNode
->
pSlots
,
i
);
/*if (!pDescNode->output) { // todo disable it temporarily*/
/*continue;*/
/*}*/
SColumnInfoData
idata
=
SColumnInfoData
idata
=
createColumnInfoData
(
pDescNode
->
dataType
.
type
,
pDescNode
->
dataType
.
bytes
,
pDescNode
->
slotId
);
createColumnInfoData
(
pDescNode
->
dataType
.
type
,
pDescNode
->
dataType
.
bytes
,
pDescNode
->
slotId
);
idata
.
info
.
scale
=
pDescNode
->
dataType
.
scale
;
idata
.
info
.
scale
=
pDescNode
->
dataType
.
scale
;
...
@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
...
@@ -701,9 +695,6 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu
}
}
}
}
#ifdef BUF_PAGE_DEBUG
qDebug
(
"page_setSelect num:%d"
,
num
);
#endif
if
(
p
!=
NULL
)
{
if
(
p
!=
NULL
)
{
p
->
subsidiaries
.
pCtx
=
pValCtx
;
p
->
subsidiaries
.
pCtx
=
pValCtx
;
p
->
subsidiaries
.
num
=
num
;
p
->
subsidiaries
.
num
=
num
;
...
@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
...
@@ -852,7 +843,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
// TODO: get it from stable scan node
// TODO: get it from stable scan node
pCond
->
twindows
=
pTableScanNode
->
scanRange
;
pCond
->
twindows
=
pTableScanNode
->
scanRange
;
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
pCond
->
suid
=
pTableScanNode
->
scan
.
suid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
// pCond->type = pTableScanNode->scanFlag;
// pCond->type = pTableScanNode->scanFlag;
...
@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
...
@@ -947,6 +938,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter
}
}
// get the correct time window according to the handled timestamp
// get the correct time window according to the handled timestamp
// todo refactor
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
STimeWindow
getActiveTimeWindow
(
SDiskbasedBuf
*
pBuf
,
SResultRowInfo
*
pResultRowInfo
,
int64_t
ts
,
SInterval
*
pInterval
,
int32_t
order
)
{
int32_t
order
)
{
STimeWindow
w
=
{
0
};
STimeWindow
w
=
{
0
};
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
f208b507
...
@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
...
@@ -1665,9 +1665,6 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// hashSize += taosHashGetMemSize(pRuntimeEnv->tableqinfoGroupInfo.map);
// pSummary->hashSize = hashSize;
// pSummary->hashSize = hashSize;
// add the merge time
pSummary
->
elapsedTime
+=
pSummary
->
firstStageMergeTime
;
// SResultRowPool* p = pTaskInfo->pool;
// SResultRowPool* p = pTaskInfo->pool;
// if (p != NULL) {
// if (p != NULL) {
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
// pSummary->winInfoSize = getResultRowPoolMemSize(p);
...
@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
...
@@ -1676,17 +1673,16 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo) {
// pSummary->winInfoSize = 0;
// pSummary->winInfoSize = 0;
// pSummary->numOfTimeWindows = 0;
// pSummary->numOfTimeWindows = 0;
// }
// }
//
// calculateOperatorProfResults(pQInfo);
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
SFileBlockLoadRecorder
*
pRecorder
=
pSummary
->
pRecoder
;
if
(
pSummary
->
pRecoder
!=
NULL
)
{
if
(
pSummary
->
pRecoder
!=
NULL
)
{
qDebug
(
"%s :cost summary: elapsed time:%"
PRId64
" us, first merge:%"
PRId64
qDebug
(
" us, total blocks:%d,
"
"%s :cost summary: elapsed time:%.2f ms, total blocks:%d, load block SMA:%d, load data block:%d, total rows:%
"
"load block statis:%d, load data block:%d, total rows:%"
PRId64
", check rows:%"
PRId64
,
PRId64
", check rows:%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
/
1000
.
0
,
GET_TASKID
(
pTaskInfo
),
pSummary
->
elapsedTime
,
pSummary
->
firstStageMergeTime
,
pRecorder
->
totalBlock
s
,
pRecorder
->
totalBlocks
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRow
s
,
pRecorder
->
loadBlockStatis
,
pRecorder
->
loadBlocks
,
pRecorder
->
totalRows
,
pRecorder
->
totalCheckedRows
);
pRecorder
->
totalCheckedRows
);
}
}
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// qDebug("QInfo:0x%"PRIx64" :cost summary: winResPool size:%.2f Kb, numOfWin:%"PRId64", tableInfoSize:%.2f Kb,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// hashTable:%.2f Kb", pQInfo->qId, pSummary->winInfoSize/1024.0,
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
// pSummary->numOfTimeWindows, pSummary->tableInfoSize/1024.0, pSummary->hashSize/1024.0);
...
@@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
...
@@ -3031,7 +3027,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
}
}
}
closeAllResultRows
(
&
pAggInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
initGroupedResultInfo
(
&
pAggInfo
->
groupResInfo
,
pAggInfo
->
aggSup
.
pResultRowHashTable
,
0
);
OPTR_SET_OPENED
(
pOperator
);
OPTR_SET_OPENED
(
pOperator
);
...
@@ -3328,6 +3323,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
...
@@ -3328,6 +3323,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
currentGroupId
==
0
||
pLimitInfo
->
currentGroupId
==
pBlock
->
info
.
groupId
)
{
// it is the first group
if
(
pLimitInfo
->
currentGroupId
==
0
||
pLimitInfo
->
currentGroupId
==
pBlock
->
info
.
groupId
)
{
// it is the first group
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
pLimitInfo
->
currentGroupId
=
pBlock
->
info
.
groupId
;
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
continue
;
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
}
else
if
(
pLimitInfo
->
currentGroupId
!=
pBlock
->
info
.
groupId
)
{
// now it is the data from a new group
// now it is the data from a new group
...
@@ -3336,6 +3332,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
...
@@ -3336,6 +3332,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// ignore data block in current group
// ignore data block in current group
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
if
(
pLimitInfo
->
remainGroupOffset
>
0
)
{
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
continue
;
}
}
}
}
...
@@ -3380,10 +3377,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
...
@@ -3380,10 +3377,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
if
(
pLimitInfo
->
remainOffset
>=
pInfo
->
pRes
->
info
.
rows
)
{
if
(
pLimitInfo
->
remainOffset
>=
pInfo
->
pRes
->
info
.
rows
)
{
pLimitInfo
->
remainOffset
-=
pInfo
->
pRes
->
info
.
rows
;
pLimitInfo
->
remainOffset
-=
pInfo
->
pRes
->
info
.
rows
;
blockDataCleanup
(
pInfo
->
pRes
);
blockDataCleanup
(
pInfo
->
pRes
);
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
continue
;
continue
;
}
else
if
(
pLimitInfo
->
remainOffset
<
pInfo
->
pRes
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
}
else
if
(
pLimitInfo
->
remainOffset
<
pInfo
->
pRes
->
info
.
rows
&&
pLimitInfo
->
remainOffset
>
0
)
{
blockDataTrimFirstNRows
(
pInfo
->
pRes
,
pLimitInfo
->
remainOffset
);
blockDataTrimFirstNRows
(
pInfo
->
pRes
,
pLimitInfo
->
remainOffset
);
pLimitInfo
->
remainOffset
=
0
;
pLimitInfo
->
remainOffset
=
0
;
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
}
}
// check for the limitation in each group
// check for the limitation in each group
...
@@ -3391,6 +3390,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
...
@@ -3391,6 +3390,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pLimitInfo
->
numOfOutputRows
+
pInfo
->
pRes
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
pLimitInfo
->
numOfOutputRows
+
pInfo
->
pRes
->
info
.
rows
>=
pLimitInfo
->
limit
.
limit
)
{
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
int32_t
keepRows
=
(
int32_t
)(
pLimitInfo
->
limit
.
limit
-
pLimitInfo
->
numOfOutputRows
);
blockDataKeepFirstNRows
(
pInfo
->
pRes
,
keepRows
);
blockDataKeepFirstNRows
(
pInfo
->
pRes
,
keepRows
);
ASSERT
(
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
);
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
if
(
pLimitInfo
->
slimit
.
limit
>
0
&&
pLimitInfo
->
slimit
.
limit
<=
pLimitInfo
->
numOfOutputGroups
)
{
pOperator
->
status
=
OP_EXEC_DONE
;
pOperator
->
status
=
OP_EXEC_DONE
;
}
}
...
@@ -3400,27 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
...
@@ -3400,27 +3400,32 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break
;
break
;
}
}
// no results generated
if
(
pProjectInfo
->
mergeDataBlocks
&&
pTaskInfo
->
execModel
!=
OPTR_EXEC_MODEL_STREAM
)
{
if
(
pInfo
->
pRes
->
info
.
rows
==
0
||
(
!
pProjectInfo
->
mergeDataBlocks
))
{
if
(
pRes
->
info
.
rows
>
0
)
{
break
;
pFinalRes
->
info
.
groupId
=
pRes
->
info
.
groupId
;
}
pFinalRes
->
info
.
version
=
pRes
->
info
.
version
;
if
(
pProjectInfo
->
mergeDataBlocks
)
{
pFinalRes
->
info
.
groupId
=
pInfo
->
pRes
->
info
.
groupId
;
pFinalRes
->
info
.
version
=
pInfo
->
pRes
->
info
.
version
;
// continue merge data, ignore the group id
// continue merge data, ignore the group id
blockDataMerge
(
pFinalRes
,
pInfo
->
pRes
);
blockDataMerge
(
pFinalRes
,
pRes
);
if
(
pFinalRes
->
info
.
rows
+
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
if
(
pFinalRes
->
info
.
rows
+
pInfo
->
pRes
->
info
.
rows
<=
pOperator
->
resultInfo
.
threshold
)
{
continue
;
continue
;
}
}
}
}
// do apply filter
// do apply filter
SSDataBlock
*
p
=
pProjectInfo
->
mergeDataBlocks
?
pFinalRes
:
pRes
;
doFilter
(
pProjectInfo
->
pFilterNode
,
pFinalRes
,
NULL
);
doFilter
(
pProjectInfo
->
pFilterNode
,
p
,
NULL
);
if
(
pFinalRes
->
info
.
rows
>
0
||
pRes
->
info
.
rows
==
0
)
{
if
(
p
->
info
.
rows
>
0
)
{
break
;
}
}
else
{
// do apply filter
if
(
pRes
->
info
.
rows
>
0
)
{
doFilter
(
pProjectInfo
->
pFilterNode
,
pRes
,
NULL
);
if
(
pRes
->
info
.
rows
==
0
)
{
continue
;
}
}
break
;
break
;
}
}
}
}
...
@@ -3884,8 +3889,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
...
@@ -3884,8 +3889,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
initLimitInfo
(
pProjPhyNode
->
node
.
pLimit
,
pProjPhyNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
pFilterNode
=
pProjPhyNode
->
node
.
pConditions
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
pInfo
->
mergeDataBlocks
=
pProjPhyNode
->
mergeDataBlock
;
...
@@ -4416,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
...
@@ -4416,7 +4420,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
pCond
->
twindows
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pCond
->
twindows
=
(
STimeWindow
){.
skey
=
INT64_MIN
,
.
ekey
=
INT64_MAX
};
pCond
->
suid
=
uid
;
pCond
->
suid
=
uid
;
pCond
->
type
=
BLOCK_LOAD_OFFSET_ORDER
;
pCond
->
type
=
TIMEWINDOW_RANGE_CONTAINED
;
pCond
->
startVersion
=
-
1
;
pCond
->
startVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
pCond
->
endVersion
=
-
1
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
f208b507
...
@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -940,6 +940,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
}
}
}
...
@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
...
@@ -996,6 +997,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
if
(
pInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
&&
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
}
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
...
@@ -1092,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
...
@@ -1092,7 +1094,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
}
}
closeAllResultRows
(
&
pInfo
->
binfo
.
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
order
);
OPTR_SET_OPENED
(
pOperator
);
OPTR_SET_OPENED
(
pOperator
);
...
@@ -1248,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
...
@@ -1248,7 +1249,6 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
@@ -2043,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
...
@@ -2043,7 +2043,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// restore the value
// restore the value
pOperator
->
status
=
OP_RES_TO_RETURN
;
pOperator
->
status
=
OP_RES_TO_RETURN
;
closeAllResultRows
(
&
pBInfo
->
resultRowInfo
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
TSDB_ORDER_ASC
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
blockDataEnsureCapacity
(
pBInfo
->
pRes
,
pOperator
->
resultInfo
.
capacity
);
...
@@ -2207,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
...
@@ -2207,8 +2206,6 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
blockDataEnsureCapacity
(
pResBlock
,
pOperator
->
resultInfo
.
capacity
);
// if (pOperator->status == OP_RES_TO_RETURN) {
// if (pOperator->status == OP_RES_TO_RETURN) {
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
// if (pResBlock->info.rows == 0 || !hasDataInGroupInfo(&pSliceInfo->groupResInfo)) {
...
@@ -2348,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
...
@@ -2348,10 +2345,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
initResultSizeInfo
(
&
pOperator
->
resultInfo
,
4096
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pFillColInfo
=
createFillColInfo
(
pExprInfo
,
numOfExprs
,
(
SNodeListNode
*
)
pInterpPhyNode
->
pFillValues
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
pRes
=
createResDataBlock
(
pPhyNode
->
pOutputDataBlockDesc
);
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
win
=
pInterpPhyNode
->
timeRange
;
pInfo
->
interval
.
interval
=
pInterpPhyNode
->
interval
;
pInfo
->
interval
.
interval
=
pInterpPhyNode
->
interval
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pInfo
->
current
=
pInfo
->
win
.
skey
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
name
=
"TimeSliceOperator"
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC
;
...
@@ -2542,6 +2539,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
...
@@ -2542,6 +2539,7 @@ static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExpr
}
}
if
(
find
&&
pUpdated
)
{
if
(
find
&&
pUpdated
)
{
saveResultRow
(
pCurResult
,
pWinRes
->
groupId
,
pUpdated
);
saveResultRow
(
pCurResult
,
pWinRes
->
groupId
,
pUpdated
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pInfo
->
binfo
.
resultRowInfo
.
cur
);
}
}
}
}
}
}
...
@@ -2662,6 +2660,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
...
@@ -2662,6 +2660,7 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
}
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdated
)
{
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
&&
pUpdated
)
{
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
saveResultRow
(
pResult
,
tableGroupId
,
pUpdated
);
setResultBufPageDirty
(
pInfo
->
aggSup
.
pResultBuf
,
&
pResultRowInfo
->
cur
);
}
}
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
updateTimeWindowInfo
(
&
pInfo
->
twAggSup
.
timeWindowData
,
&
nextWin
,
true
);
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
doApplyFunctions
(
pTaskInfo
,
pSup
->
pCtx
,
&
nextWin
,
&
pInfo
->
twAggSup
.
timeWindowData
,
startPos
,
forwardRows
,
tsCols
,
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
f208b507
...
@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
...
@@ -66,7 +66,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
query
End
)
{
int32_t
qwExecTask
(
QW_FPARAMS_DEF
,
SQWTaskCtx
*
ctx
,
bool
*
query
Stop
)
{
int32_t
code
=
0
;
int32_t
code
=
0
;
bool
qcontinue
=
true
;
bool
qcontinue
=
true
;
SSDataBlock
*
pRes
=
NULL
;
SSDataBlock
*
pRes
=
NULL
;
...
@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
...
@@ -104,8 +104,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_ERR_RET
(
qwHandleTaskComplete
(
QW_FPARAMS
(),
ctx
));
QW_ERR_RET
(
qwHandleTaskComplete
(
QW_FPARAMS
(),
ctx
));
if
(
query
End
)
{
if
(
query
Stop
)
{
*
query
End
=
true
;
*
query
Stop
=
true
;
}
}
break
;
break
;
...
@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
...
@@ -125,6 +125,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
rows
,
qcontinue
);
QW_TASK_DLOG
(
"data put into sink, rows:%d, continueExecTask:%d"
,
rows
,
qcontinue
);
if
(
!
qcontinue
)
{
if
(
!
qcontinue
)
{
if
(
queryStop
)
{
*
queryStop
=
true
;
}
break
;
break
;
}
}
...
@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -566,7 +570,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
SQWPhaseInput
input
=
{
0
};
SQWPhaseInput
input
=
{
0
};
void
*
rsp
=
NULL
;
void
*
rsp
=
NULL
;
int32_t
dataLen
=
0
;
int32_t
dataLen
=
0
;
bool
query
End
=
false
;
bool
query
Stop
=
false
;
do
{
do
{
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
NULL
));
QW_ERR_JRET
(
qwHandlePrePhaseEvents
(
QW_FPARAMS
(),
QW_PHASE_PRE_CQUERY
,
&
input
,
NULL
));
...
@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -576,7 +580,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryInQueue
,
0
);
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryInQueue
,
0
);
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryContinue
,
0
);
atomic_store_8
((
int8_t
*
)
&
ctx
->
queryContinue
,
0
);
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
query
End
));
QW_ERR_JRET
(
qwExecTask
(
QW_FPARAMS
(),
ctx
,
&
query
Stop
));
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
if
(
QW_EVENT_RECEIVED
(
ctx
,
QW_EVENT_FETCH
))
{
SOutputData
sOutput
=
{
0
};
SOutputData
sOutput
=
{
0
};
...
@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
...
@@ -627,7 +631,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
}
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_LOCK
(
QW_WRITE
,
&
ctx
->
lock
);
if
(
query
End
||
code
||
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryContinue
))
{
if
(
query
Stop
||
code
||
0
==
atomic_load_8
((
int8_t
*
)
&
ctx
->
queryContinue
))
{
// Note: query is not running anymore
// Note: query is not running anymore
QW_SET_PHASE
(
ctx
,
0
);
QW_SET_PHASE
(
ctx
,
0
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
QW_UNLOCK
(
QW_WRITE
,
&
ctx
->
lock
);
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
f208b507
...
@@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg);
...
@@ -300,6 +300,8 @@ int transSendResponse(const STransMsg* msg);
int
transRegisterMsg
(
const
STransMsg
*
msg
);
int
transRegisterMsg
(
const
STransMsg
*
msg
);
int
transSetDefaultAddr
(
void
*
shandle
,
const
char
*
ip
,
const
char
*
fqdn
);
int
transSetDefaultAddr
(
void
*
shandle
,
const
char
*
ip
,
const
char
*
fqdn
);
int
transGetSockDebugInfo
(
struct
sockaddr
*
sockname
,
char
*
dst
);
int64_t
transAllocHandle
();
int64_t
transAllocHandle
();
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
void
*
transInitServer
(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
f208b507
...
@@ -37,9 +37,11 @@ typedef struct SCliConn {
...
@@ -37,9 +37,11 @@ typedef struct SCliConn {
uint32_t
port
;
uint32_t
port
;
SDelayTask
*
task
;
SDelayTask
*
task
;
// debug and log info
// debug and log info
struct
sockaddr_in
addr
;
char
src
[
32
];
struct
sockaddr_in
localAddr
;
char
dst
[
32
];
}
SCliConn
;
}
SCliConn
;
typedef
struct
SCliMsg
{
typedef
struct
SCliMsg
{
...
@@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
...
@@ -95,6 +97,14 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port);
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
);
static
void
addConnToPool
(
void
*
pool
,
SCliConn
*
conn
);
static
void
doCloseIdleConn
(
void
*
param
);
static
void
doCloseIdleConn
(
void
*
param
);
static
int
sockDebugInfo
(
struct
sockaddr
*
sockname
,
char
*
dst
)
{
struct
sockaddr_in
addr
=
*
(
struct
sockaddr_in
*
)
sockname
;
char
buf
[
20
]
=
{
0
};
int
r
=
uv_ip4_name
(
&
addr
,
(
char
*
)
buf
,
sizeof
(
buf
));
sprintf
(
dst
,
"%s:%d"
,
buf
,
ntohs
(
addr
.
sin_port
));
return
r
;
}
// register timer in each thread to clear expire conn
// register timer in each thread to clear expire conn
// static void cliTimeoutCb(uv_timer_t* handle);
// static void cliTimeoutCb(uv_timer_t* handle);
// alloc buf for recv
// alloc buf for recv
...
@@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) {
...
@@ -363,9 +373,9 @@ void cliHandleResp(SCliConn* conn) {
}
}
STraceId
*
trace
=
&
transMsg
.
info
.
traceId
;
STraceId
*
trace
=
&
transMsg
.
info
.
traceId
;
tGTrace
(
"%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, code:0x%x"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
conn
->
addr
.
sin_addr
),
ntohs
(
conn
->
addr
.
sin_port
)
,
tGTrace
(
"%s conn %p %s received from %s, local info:%s, msg size:%d, code:0x%x"
,
CONN_GET_INST_LABEL
(
conn
),
conn
,
taosInetNtoa
(
conn
->
localAddr
.
sin_addr
),
ntohs
(
conn
->
localAddr
.
sin_port
)
,
transMsg
.
contLen
,
transMsg
.
code
);
TMSG_INFO
(
pHead
->
msgType
),
conn
->
dst
,
conn
->
src
,
transMsg
.
contLen
,
transMsg
.
code
);
if
(
pCtx
==
NULL
&&
CONN_NO_PERSIST_BY_APP
(
conn
))
{
if
(
pCtx
==
NULL
&&
CONN_NO_PERSIST_BY_APP
(
conn
))
{
tDebug
(
"%s except, conn %p read while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
tDebug
(
"%s except, conn %p read while cli ignore it"
,
CONN_GET_INST_LABEL
(
conn
),
conn
);
...
@@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) {
...
@@ -741,9 +751,8 @@ void cliSend(SCliConn* pConn) {
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
uv_buf_t
wb
=
uv_buf_init
((
char
*
)
pHead
,
msgLen
);
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info %s:%d"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
tGTrace
(
"%s conn %p %s is sent to %s, local info %s"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
pConn
->
dst
,
pConn
->
src
);
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
));
if
(
pHead
->
persist
==
1
)
{
if
(
pHead
->
persist
==
1
)
{
CONN_SET_PERSIST_BY_APP
(
pConn
);
CONN_SET_PERSIST_BY_APP
(
pConn
);
...
@@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) {
...
@@ -764,11 +773,16 @@ void cliConnCb(uv_connect_t* req, int status) {
cliHandleExcept
(
pConn
);
cliHandleExcept
(
pConn
);
return
;
return
;
}
}
int
addrlen
=
sizeof
(
pConn
->
addr
);
// int addrlen = sizeof(pConn->addr);
uv_tcp_getpeername
((
uv_tcp_t
*
)
pConn
->
stream
,
(
struct
sockaddr
*
)
&
pConn
->
addr
,
&
addrlen
);
struct
sockaddr
peername
,
sockname
;
int
addrlen
=
sizeof
(
peername
);
uv_tcp_getpeername
((
uv_tcp_t
*
)
pConn
->
stream
,
&
peername
,
&
addrlen
);
transGetSockDebugInfo
(
&
peername
,
pConn
->
dst
);
addrlen
=
sizeof
(
pConn
->
localAddr
);
addrlen
=
sizeof
(
sockname
);
uv_tcp_getsockname
((
uv_tcp_t
*
)
pConn
->
stream
,
(
struct
sockaddr
*
)
&
pConn
->
localAddr
,
&
addrlen
);
uv_tcp_getsockname
((
uv_tcp_t
*
)
pConn
->
stream
,
&
sockname
,
&
addrlen
);
transGetSockDebugInfo
(
&
sockname
,
pConn
->
src
);
tTrace
(
"%s conn %p connect to server successfully"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
tTrace
(
"%s conn %p connect to server successfully"
,
CONN_GET_INST_LABEL
(
pConn
),
pConn
);
assert
(
pConn
->
stream
==
req
->
handle
);
assert
(
pConn
->
stream
==
req
->
handle
);
...
...
source/libs/transport/src/transComm.c
浏览文件 @
f208b507
...
@@ -102,7 +102,14 @@ void transFreeMsg(void* msg) {
...
@@ -102,7 +102,14 @@ void transFreeMsg(void* msg) {
}
}
taosMemoryFree
((
char
*
)
msg
-
sizeof
(
STransMsgHead
));
taosMemoryFree
((
char
*
)
msg
-
sizeof
(
STransMsgHead
));
}
}
int
transGetSockDebugInfo
(
struct
sockaddr
*
sockname
,
char
*
dst
)
{
struct
sockaddr_in
addr
=
*
(
struct
sockaddr_in
*
)
sockname
;
char
buf
[
20
]
=
{
0
};
int
r
=
uv_ip4_name
(
&
addr
,
(
char
*
)
buf
,
sizeof
(
buf
));
sprintf
(
dst
,
"%s:%d"
,
buf
,
ntohs
(
addr
.
sin_port
));
return
r
;
}
int
transInitBuffer
(
SConnBuffer
*
buf
)
{
int
transInitBuffer
(
SConnBuffer
*
buf
)
{
transClearBuffer
(
buf
);
transClearBuffer
(
buf
);
return
0
;
return
0
;
...
...
source/libs/transport/src/transSvr.c
浏览文件 @
f208b507
...
@@ -43,9 +43,13 @@ typedef struct SSvrConn {
...
@@ -43,9 +43,13 @@ typedef struct SSvrConn {
SSvrRegArg
regArg
;
SSvrRegArg
regArg
;
bool
broken
;
// conn broken;
bool
broken
;
// conn broken;
ConnStatus
status
;
ConnStatus
status
;
struct
sockaddr_in
addr
;
struct
sockaddr_in
localAddr
;
uint32_t
clientIp
;
uint16_t
port
;
char
src
[
32
];
char
dst
[
32
];
int64_t
refId
;
int64_t
refId
;
int
spi
;
int
spi
;
...
@@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) {
...
@@ -248,15 +252,11 @@ static void uvHandleReq(SSvrConn* pConn) {
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
if
(
pConn
->
status
==
ConnNormal
&&
pHead
->
noResp
==
0
)
{
transRefSrvHandle
(
pConn
);
transRefSrvHandle
(
pConn
);
tGTrace
(
"%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d"
,
transLabel
(
pTransInst
),
pConn
,
tGTrace
(
"%s conn %p %s received from %s, local info:%s, msg size:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
transMsg
.
contLen
);
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
);
}
else
{
}
else
{
tGTrace
(
"%s conn %p %s received from %s:%d, local info:%s:%d, msg size:%d, resp:%d, code:%d"
,
tGTrace
(
"%s conn %p %s received from %s, local info:%s, msg size:%d, resp:%d, code:%d"
,
transLabel
(
pTransInst
),
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
pConn
,
TMSG_INFO
(
transMsg
.
msgType
),
pConn
->
dst
,
pConn
->
src
,
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
ntohs
(
pConn
->
addr
.
sin_port
),
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
transMsg
.
contLen
,
pHead
->
noResp
,
transMsg
.
code
);
// no ref here
}
}
// pHead->noResp = 1,
// pHead->noResp = 1,
...
@@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) {
...
@@ -278,14 +278,13 @@ static void uvHandleReq(SSvrConn* pConn) {
// set up conn info
// set up conn info
SRpcConnInfo
*
pConnInfo
=
&
(
transMsg
.
info
.
conn
);
SRpcConnInfo
*
pConnInfo
=
&
(
transMsg
.
info
.
conn
);
pConnInfo
->
clientIp
=
(
uint32_t
)(
pConn
->
addr
.
sin_addr
.
s_addr
)
;
pConnInfo
->
clientIp
=
pConn
->
clientIp
;
pConnInfo
->
clientPort
=
ntohs
(
pConn
->
addr
.
sin_port
)
;
pConnInfo
->
clientPort
=
pConn
->
port
;
tstrncpy
(
pConnInfo
->
user
,
pConn
->
user
,
sizeof
(
pConnInfo
->
user
));
tstrncpy
(
pConnInfo
->
user
,
pConn
->
user
,
sizeof
(
pConnInfo
->
user
));
transReleaseExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
transReleaseExHandle
(
transGetRefMgt
(),
pConn
->
refId
);
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
(
*
pTransInst
->
cfp
)(
pTransInst
->
parent
,
&
transMsg
,
NULL
);
// uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0);
}
}
void
uvOnRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
void
uvOnRecvCb
(
uv_stream_t
*
cli
,
ssize_t
nread
,
const
uv_buf_t
*
buf
)
{
...
@@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
...
@@ -418,9 +417,8 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STrans
*
pTransInst
=
pConn
->
pTransInst
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
tGTrace
(
"%s conn %p %s is sent to %s:%d, local info:%s:%d, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
tGTrace
(
"%s conn %p %s is sent to %s, local info:%s, msglen:%d"
,
transLabel
(
pTransInst
),
pConn
,
TMSG_INFO
(
pHead
->
msgType
),
taosInetNtoa
(
pConn
->
addr
.
sin_addr
),
ntohs
(
pConn
->
addr
.
sin_port
),
TMSG_INFO
(
pHead
->
msgType
),
pConn
->
dst
,
pConn
->
src
,
len
);
taosInetNtoa
(
pConn
->
localAddr
.
sin_addr
),
ntohs
(
pConn
->
localAddr
.
sin_port
),
len
);
pHead
->
msgLen
=
htonl
(
len
);
pHead
->
msgLen
=
htonl
(
len
);
wb
->
base
=
msg
;
wb
->
base
=
msg
;
...
@@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...
@@ -646,20 +644,26 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
uv_fileno
((
const
uv_handle_t
*
)
pConn
->
pTcp
,
&
fd
);
tTrace
(
"conn %p created, fd:%d"
,
pConn
,
fd
);
tTrace
(
"conn %p created, fd:%d"
,
pConn
,
fd
);
int
addrlen
=
sizeof
(
pConn
->
addr
);
struct
sockaddr
peername
,
sockname
;
if
(
0
!=
uv_tcp_getpeername
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
pConn
->
addr
,
&
addrlen
))
{
int
addrlen
=
sizeof
(
peername
);
if
(
0
!=
uv_tcp_getpeername
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
peername
,
&
addrlen
))
{
tError
(
"conn %p failed to get peer info"
,
pConn
);
tError
(
"conn %p failed to get peer info"
,
pConn
);
transUnrefSrvHandle
(
pConn
);
transUnrefSrvHandle
(
pConn
);
return
;
return
;
}
}
transGetSockDebugInfo
(
&
peername
,
pConn
->
dst
);
addrlen
=
sizeof
(
pConn
->
localAddr
);
addrlen
=
sizeof
(
sockname
);
if
(
0
!=
uv_tcp_getsockname
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
pConn
->
localAddr
,
&
addrlen
))
{
if
(
0
!=
uv_tcp_getsockname
(
pConn
->
pTcp
,
(
struct
sockaddr
*
)
&
sockname
,
&
addrlen
))
{
tError
(
"conn %p failed to get local info"
,
pConn
);
tError
(
"conn %p failed to get local info"
,
pConn
);
transUnrefSrvHandle
(
pConn
);
transUnrefSrvHandle
(
pConn
);
return
;
return
;
}
}
transGetSockDebugInfo
(
&
sockname
,
pConn
->
src
);
struct
sockaddr_in
addr
=
*
(
struct
sockaddr_in
*
)
&
sockname
;
pConn
->
clientIp
=
addr
.
sin_addr
.
s_addr
;
pConn
->
port
=
ntohs
(
addr
.
sin_port
);
uv_read_start
((
uv_stream_t
*
)(
pConn
->
pTcp
),
uvAllocRecvBufferCb
,
uvOnRecvCb
);
uv_read_start
((
uv_stream_t
*
)(
pConn
->
pTcp
),
uvAllocRecvBufferCb
,
uvOnRecvCb
);
}
else
{
}
else
{
...
...
tests/pytest/crash_gen/crash_gen_main.py
浏览文件 @
f208b507
...
@@ -1327,6 +1327,8 @@ class Task():
...
@@ -1327,6 +1327,8 @@ class Task():
# TDengine 3.0 Error Codes:
# TDengine 3.0 Error Codes:
0x0333
,
# Object is creating # TODO: this really is NOT an acceptable error
0x0333
,
# Object is creating # TODO: this really is NOT an acceptable error
0x0369
,
# Tag already exists
0x0388
,
# Database not exist
0x03A0
,
# STable already exists
0x03A0
,
# STable already exists
0x03A1
,
# STable [does] not exist
0x03A1
,
# STable [does] not exist
0x03AA
,
# Tag already exists
0x03AA
,
# Tag already exists
...
...
tests/script/jenkins/basic.txt
浏览文件 @
f208b507
...
@@ -101,7 +101,7 @@
...
@@ -101,7 +101,7 @@
./test.sh -f tsim/parser/constCol.sim
./test.sh -f tsim/parser/constCol.sim
#./test.sh -f tsim/parser/create_db.sim
#./test.sh -f tsim/parser/create_db.sim
./test.sh -f tsim/parser/create_mt.sim
./test.sh -f tsim/parser/create_mt.sim
# TD-17653
./test.sh -f tsim/parser/create_tb_with_tag_name.sim
./test.sh -f tsim/parser/create_tb_with_tag_name.sim
./test.sh -f tsim/parser/create_tb.sim
./test.sh -f tsim/parser/create_tb.sim
./test.sh -f tsim/parser/dbtbnameValidate.sim
./test.sh -f tsim/parser/dbtbnameValidate.sim
./test.sh -f tsim/parser/distinct.sim
./test.sh -f tsim/parser/distinct.sim
...
...
tests/script/tsim/parser/create_tb_with_tag_name.sim
浏览文件 @
f208b507
...
@@ -93,7 +93,7 @@ sql_error create table tb11 using st2 (id,t1,) tags (1,1,1);
...
@@ -93,7 +93,7 @@ sql_error create table tb11 using st2 (id,t1,) tags (1,1,1);
sql create table tb12 using st2 (t1,id) tags (2,1);
sql create table tb12 using st2 (t1,id) tags (2,1);
sql show tags from tb12;
sql show tags from tb12;
if $rows !=
5
then
if $rows !=
4
then
return -1
return -1
endi
endi
if $data05 != 1 then
if $data05 != 1 then
...
@@ -109,9 +109,9 @@ if $data35 != NULL then
...
@@ -109,9 +109,9 @@ if $data35 != NULL then
return -1
return -1
endi
endi
sql create table tb13 using st2 (
"t1",'id'
) tags (2,1);
sql create table tb13 using st2 (
t1,id
) tags (2,1);
sql show tags from tb13;
sql show tags from tb13;
if $rows !=
2
then
if $rows !=
4
then
return -1
return -1
endi
endi
if $data05 != 1 then
if $data05 != 1 then
...
...
tests/system-test/7-tmq/dataFromTsdbNWal-multiCtb.py
0 → 100644
浏览文件 @
f208b507
import
taos
import
sys
import
time
import
socket
import
os
import
threading
import
math
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
rowsPerTbl
=
1000
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
def
prepareTestEnv
(
self
):
tdLog
.
printNoPrefix
(
"======== prepare test env include database, stable, ctables, and insert data: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tmqCom
.
create_stable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
])
tdLog
.
info
(
"create ctb"
)
tmqCom
.
create_ctable
(
tdSql
,
dbName
=
paraDict
[
"dbName"
],
stbName
=
paraDict
[
"stbName"
],
ctbPrefix
=
paraDict
[
'ctbPrefix'
],
ctbNum
=
paraDict
[
"ctbNum"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"flush db to let data falls into the disk"
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
# after start consume, continue insert some data
paraDict
[
'batchNum'
]
=
100
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
self
.
rowsPerTbl
pInsertThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
pInsertThread
.
join
()
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
# tmqCom.checkFileContent(consumerId, queryString)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
1
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'rowsPerTbl'
:
1000
,
'batchNum'
:
500
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
topicNameList
=
[
'topic1'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
0
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
totalRowsInserted
=
expectRowsList
[
0
]
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
1
expectrowcnt
=
math
.
ceil
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
/
3
)
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 0"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
(
expectrowcnt
<=
actConsumeRows
and
totalRowsInserted
>=
actConsumeRows
):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
consumerId
=
2
expectrowcnt
=
math
.
ceil
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
/
3
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor 1"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect rows: %d, act insert rows: %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
((
actConsumeRows
>=
expectrowcnt
)
and
(
totalRowsInserted
>
actConsumeRows
)):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase2
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/dataFromTsdbNWal.py
浏览文件 @
f208b507
...
@@ -17,8 +17,8 @@ from tmqCommon import *
...
@@ -17,8 +17,8 @@ from tmqCommon import *
class
TDTestCase
:
class
TDTestCase
:
def
__init__
(
self
):
def
__init__
(
self
):
self
.
vgroups
=
1
self
.
vgroups
=
4
self
.
ctbNum
=
1
00
self
.
ctbNum
=
1
self
.
rowsPerTbl
=
10000
self
.
rowsPerTbl
=
10000
def
init
(
self
,
conn
,
logSql
):
def
init
(
self
,
conn
,
logSql
):
...
@@ -38,9 +38,9 @@ class TDTestCase:
...
@@ -38,9 +38,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbPrefix'
:
'ctb'
,
'ctbStartIdx'
:
0
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1
00
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
30
00
,
'batchNum'
:
1
00
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showMsg'
:
1
,
...
@@ -85,7 +85,7 @@ class TDTestCase:
...
@@ -85,7 +85,7 @@ class TDTestCase:
'rowsPerTbl'
:
10000
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
3
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showMsg'
:
1
,
'showRow'
:
1
,
'showRow'
:
1
,
'snapshot'
:
1
}
'snapshot'
:
1
}
...
@@ -117,17 +117,16 @@ class TDTestCase:
...
@@ -117,17 +117,16 @@ class TDTestCase:
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
keyList
=
'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:1000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
# after start consume, continue insert some data
# after start consume, continue insert some data
paraDict
[
'batchNum'
]
=
100
paraDict
[
'batchNum'
]
=
100
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
self
.
rowsPerTbl
paraDict
[
'startTs'
]
=
paraDict
[
'startTs'
]
+
self
.
rowsPerTbl
tmqCom
.
insert_data_interlaceByMultiTbl
(
tsql
=
tdSql
,
dbName
=
paraDict
[
"dbName"
],
ctbPrefix
=
paraDict
[
"ctbPrefix"
],
pInsertThread
=
tmqCom
.
asyncInsertDataByInterlace
(
paraDict
)
ctbNum
=
paraDict
[
"ctbNum"
],
rowsPerTbl
=
paraDict
[
"rowsPerTbl"
],
batchNum
=
paraDict
[
"batchNum"
],
startTs
=
paraDict
[
"startTs"
],
ctbStartIdx
=
paraDict
[
'ctbStartIdx'
])
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
#
pInsertThread
.
join
()
tdSql
.
query
(
queryString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
expectRowsList
.
append
(
tdSql
.
getRows
())
...
@@ -135,15 +134,16 @@ class TDTestCase:
...
@@ -135,15 +134,16 @@ class TDTestCase:
expectRows
=
1
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
tmqCom
.
checkFileContent
(
consumerId
,
queryString
)
time
.
sleep
(
10
)
tdSql
.
query
(
"flush database %s"
%
(
paraDict
[
'dbName'
]))
for
i
in
range
(
len
(
topicNameList
)):
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
...
@@ -204,13 +204,12 @@ class TDTestCase:
...
@@ -204,13 +204,12 @@ class TDTestCase:
expectRows
=
1
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeRows
=
resultList
[
0
]
if
not
(
expectrowcnt
<=
resultList
[
0
]
and
totalRowsInserted
>=
resultList
[
0
]):
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
resultList
[
0
],
expectrowcnt
,
totalRowsInserted
))
tdLog
.
info
(
"act consume rows: %d, expect consume rows between %d and %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
(
expectrowcnt
<=
actConsumeRows
and
totalRowsInserted
>=
actConsumeRows
):
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
firstConsumeRows
=
resultList
[
0
]
# reinit consume info, and start tmq_sim, then check consume result
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
tmqCom
.
initConsumerTable
()
consumerId
=
2
consumerId
=
2
...
@@ -224,15 +223,13 @@ class TDTestCase:
...
@@ -224,15 +223,13 @@ class TDTestCase:
expectRows
=
1
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
actConsumeTotalRows
=
firstConsumeRows
+
resultList
[
0
]
actConsumeRows
=
resultList
[
0
]
tdLog
.
info
(
"act consume rows: %d, expect rows: %d, act insert rows: %d"
%
(
actConsumeRows
,
expectrowcnt
,
totalRowsInserted
))
if
not
(
expectrowcnt
>=
resultList
[
0
]
and
totalRowsInserted
==
actConsumeTotalRows
):
if
not
((
actConsumeRows
>=
expectrowcnt
)
and
(
totalRowsInserted
>
actConsumeRows
)):
tdLog
.
info
(
"act consume rows, first: %d, second: %d "
%
(
firstConsumeRows
,
resultList
[
0
]))
tdLog
.
info
(
"and sum of two consume rows: %d should be equal to total inserted rows: %d"
%
(
actConsumeTotalRows
,
totalRowsInserted
))
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
tdLog
.
exit
(
"%d tmq consume rows error!"
%
consumerId
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
for
i
in
range
(
len
(
topicNameList
)):
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
...
@@ -241,7 +238,7 @@ class TDTestCase:
...
@@ -241,7 +238,7 @@ class TDTestCase:
tdSql
.
prepare
()
tdSql
.
prepare
()
self
.
prepareTestEnv
()
self
.
prepareTestEnv
()
self
.
tmqCase1
()
self
.
tmqCase1
()
#
self.tmqCase2()
self
.
tmqCase2
()
def
stop
(
self
):
def
stop
(
self
):
tdSql
.
close
()
tdSql
.
close
()
...
...
tests/system-test/7-tmq/tmqDnodeRestart.py
浏览文件 @
f208b507
...
@@ -151,41 +151,6 @@ class TDTestCase:
...
@@ -151,41 +151,6 @@ class TDTestCase:
if
not
(
totalConsumeRows
==
totalRowsFromQury
):
if
not
(
totalConsumeRows
==
totalRowsFromQury
):
tdLog
.
exit
(
"tmq consume rows error!"
)
tdLog
.
exit
(
"tmq consume rows error!"
)
# tdLog.info("****************************************************************************")
# tmqCom.initConsumerTable()
# consumerId = 1
# expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
# topicList = topicFromStb1
# ifcheckdata = 0
# ifManualCommit = 0
# keyList = 'group.id:cgrp2,\
# enable.auto.commit:true,\
# auto.commit.interval.ms:3000,\
# auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
# tdLog.info("start consume processor")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
# totalConsumeRows = 0
# for i in range(expectRows):
# totalConsumeRows += resultList[i]
# tdSql.query(queryString)
# totalRowsFromQury = tdSql.getRows()
# tdLog.info("act consume rows: %d, act query rows: %d"%(totalConsumeRows, totalRowsFromQury))
# if not (totalConsumeRows == totalRowsFromQury):
# tdLog.exit("tmq consume rows error!")
# tdLog.info("****************************************************************************")
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromStb1
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromStb1
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
tdSql
.
query
(
"drop topic %s"
%
topicFromStb1
)
...
@@ -259,7 +224,7 @@ class TDTestCase:
...
@@ -259,7 +224,7 @@ class TDTestCase:
tdLog
.
info
(
"create some new child table and insert data "
)
tdLog
.
info
(
"create some new child table and insert data "
)
paraDict
[
"batchNum"
]
=
100
paraDict
[
"batchNum"
]
=
100
paraDict
[
"ctbPrefix"
]
=
'newCtb'
paraDict
[
"ctbPrefix"
]
=
'newCtb'
#
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tmqCom
.
insert_data_with_autoCreateTbl
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
])
tdLog
.
info
(
"insert process end, and start to check consume result"
)
tdLog
.
info
(
"insert process end, and start to check consume result"
)
expectRows
=
1
expectRows
=
1
...
...
tests/system-test/7-tmq/tmqDropNtb-snapshot0.py
0 → 100644
浏览文件 @
f208b507
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
enum
import
Enum
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
ctbNum
=
1000
self
.
rowsPerTbl
=
10
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
(),
False
)
# drop some ntbs
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
5
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
0
elif
self
.
snapshot
==
1
:
consumerId
=
1
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
])
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
*
3
/
4
)
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
# drop some ntbs and create some new ntbs
def
tmqCase2
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 2: "
)
paraDict
=
{
'dbName'
:
'dbt'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'TIMESTAMP'
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
1000
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
,
'snapshot'
:
0
}
paraDict
[
'snapshot'
]
=
self
.
snapshot
paraDict
[
'vgroups'
]
=
self
.
vgroups
paraDict
[
'ctbNum'
]
=
self
.
ctbNum
paraDict
[
'rowsPerTbl'
]
=
self
.
rowsPerTbl
tmqCom
.
initConsumerTable
()
tdLog
.
info
(
"start create database...."
)
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
paraDict
[
"vgroups"
],
replica
=
1
)
tdLog
.
info
(
"start create normal tables...."
)
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"create topics from database"
)
topicFromDb
=
'topic_dbt'
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicFromDb
,
paraDict
[
'dbName'
]))
if
self
.
snapshot
==
0
:
consumerId
=
2
elif
self
.
snapshot
==
1
:
consumerId
=
3
expectrowcnt
=
int
(
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
*
2
)
topicList
=
topicFromDb
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1,\
enable.auto.commit:true,\
auto.commit.interval.ms:1000,\
auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
pollDelay
=
paraDict
[
'pollDelay'
],
dbName
=
paraDict
[
"dbName"
],
showMsg
=
paraDict
[
'showMsg'
],
showRow
=
paraDict
[
'showRow'
],
snapshot
=
paraDict
[
'snapshot'
])
tmqCom
.
getStartConsumeNotifyFromTmqsim
()
tdLog
.
info
(
"drop some ntables"
)
# drop 1/4 ctbls from half offset
paraDict
[
"ctbStartIdx"
]
=
paraDict
[
"ctbStartIdx"
]
+
int
(
paraDict
[
"ctbNum"
]
*
1
/
2
)
paraDict
[
"ctbNum"
]
=
int
(
paraDict
[
"ctbNum"
]
/
4
)
tmqCom
.
drop_ctable
(
tdSql
,
dbname
=
paraDict
[
'dbName'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
"ctbPrefix"
],
ctbStartIdx
=
paraDict
[
"ctbStartIdx"
])
tdLog
.
info
(
"start create some new normal tables...."
)
paraDict
[
"ctbPrefix"
]
=
'newCtb'
paraDict
[
"ctbNum"
]
=
self
.
ctbNum
tmqCom
.
create_ntable
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_elm_list
=
paraDict
[
"colSchema"
],
colPrefix
=
'c'
,
tblNum
=
paraDict
[
"ctbNum"
])
tdLog
.
info
(
"start insert data into these new normal tables...."
)
tmqCom
.
insert_rows_into_ntbl
(
tsql
=
tdSql
,
dbname
=
paraDict
[
"dbName"
],
tbname_prefix
=
paraDict
[
"ctbPrefix"
],
tbname_index_start_num
=
1
,
column_ele_list
=
paraDict
[
"colSchema"
],
startTs
=
paraDict
[
"startTs"
],
tblNum
=
paraDict
[
"ctbNum"
],
rows
=
paraDict
[
"rowsPerTbl"
])
tdLog
.
info
(
"start to check consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
tdLog
.
info
(
"act consume rows: %d, expect consume rows: %d"
%
(
totalConsumeRows
,
expectrowcnt
))
if
not
((
totalConsumeRows
>=
expectrowcnt
/
2
*
(
1
+
3
/
4
))
and
(
totalConsumeRows
<
expectrowcnt
)):
tdLog
.
exit
(
"tmq consume rows error with snapshot = 0!"
)
tdLog
.
info
(
"wait subscriptions exit ...."
)
tmqCom
.
waitSubscriptionExit
(
tdSql
,
topicFromDb
)
tdSql
.
query
(
"drop topic %s"
%
topicFromDb
)
tdLog
.
info
(
"success dorp topic: %s"
%
topicFromDb
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
tdLog
.
printNoPrefix
(
"============================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
self
.
snapshot
=
0
self
.
tmqCase1
()
self
.
tmqCase2
()
# tdLog.printNoPrefix("====================================================================")
# tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
# self.snapshot = 1
# self.tmqCase1()
# self.tmqCase2()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqDropNtb.py
→
tests/system-test/7-tmq/tmqDropNtb
-snapshot1
.py
浏览文件 @
f208b507
...
@@ -18,7 +18,7 @@ class TDTestCase:
...
@@ -18,7 +18,7 @@ class TDTestCase:
def
__init__
(
self
):
def
__init__
(
self
):
self
.
snapshot
=
0
self
.
snapshot
=
0
self
.
vgroups
=
4
self
.
vgroups
=
4
self
.
ctbNum
=
100
self
.
ctbNum
=
100
0
self
.
rowsPerTbl
=
10
self
.
rowsPerTbl
=
10
def
init
(
self
,
conn
,
logSql
):
def
init
(
self
,
conn
,
logSql
):
...
@@ -39,9 +39,9 @@ class TDTestCase:
...
@@ -39,9 +39,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'ctbNum'
:
100
0
,
'rowsPerTbl'
:
100
0
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
0
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'endTs'
:
0
,
'pollDelay'
:
5
,
'pollDelay'
:
5
,
...
@@ -125,9 +125,9 @@ class TDTestCase:
...
@@ -125,9 +125,9 @@ class TDTestCase:
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},{
'type'
:
'BIGINT'
,
'count'
:
1
},{
'type'
:
'DOUBLE'
,
'count'
:
1
},{
'type'
:
'BINARY'
,
'len'
:
32
,
'count'
:
1
},{
'type'
:
'NCHAR'
,
'len'
:
32
,
'count'
:
1
}],
'ctbPrefix'
:
'ntb'
,
'ctbPrefix'
:
'ntb'
,
'ctbStartIdx'
:
0
,
'ctbStartIdx'
:
0
,
'ctbNum'
:
100
,
'ctbNum'
:
100
0
,
'rowsPerTbl'
:
100
0
,
'rowsPerTbl'
:
100
,
'batchNum'
:
100
0
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'endTs'
:
0
,
'endTs'
:
0
,
'pollDelay'
:
10
,
'pollDelay'
:
10
,
...
@@ -203,16 +203,16 @@ class TDTestCase:
...
@@ -203,16 +203,16 @@ class TDTestCase:
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
tdLog
.
printNoPrefix
(
"======== test case 2 end ...... "
)
def
run
(
self
):
def
run
(
self
):
tdLog
.
printNoPrefix
(
"============================================="
)
#
tdLog.printNoPrefix("=============================================")
tdLog
.
printNoPrefix
(
"======== snapshot is 0: only consume from wal"
)
#
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self
.
snapshot
=
0
#
self.snapshot = 0
# self.tmqCase1()
# self.tmqCase1()
self
.
tmqCase2
()
#
self.tmqCase2()
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"===================================================================="
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
tdLog
.
printNoPrefix
(
"======== snapshot is 1: firstly consume from tsbs, and then from wal"
)
self
.
snapshot
=
1
self
.
snapshot
=
1
#
self.tmqCase1()
self
.
tmqCase1
()
self
.
tmqCase2
()
self
.
tmqCase2
()
def
stop
(
self
):
def
stop
(
self
):
...
...
tests/system-test/fulltest.sh
浏览文件 @
f208b507
...
@@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
...
@@ -210,7 +210,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
python3 ./test.py
-f
7-tmq/tmqAutoCreateTbl.py
#
python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqDnodeRestart.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdate-1ctb.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdateWithConsume.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUpdate-multiCtb-snapshot0.py
...
@@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
...
@@ -219,12 +219,14 @@ python3 ./test.py -f 7-tmq/tmqDelete-1ctb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDelete-multiCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqDropStb.py
python3 ./test.py
-f
7-tmq/tmqDropStbCtb.py
python3 ./test.py
-f
7-tmq/tmqDropStbCtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb.py
python3 ./test.py
-f
7-tmq/tmqDropNtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqDropNtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot0.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/tmqUdf-multCtb-snapshot1.py
python3 ./test.py
-f
7-tmq/stbTagFilter-1ctb.py
python3 ./test.py
-f
7-tmq/stbTagFilter-1ctb.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal.py
python3 ./test.py
-f
7-tmq/dataFromTsdbNWal-multiCtb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
# python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
#------------querPolicy 2-----------
#------------querPolicy 2-----------
...
...
taos-tools
@
0b8a3373
比较
9cfa1957
...
0b8a3373
Subproject commit
9cfa195713d1cae9edf417a8d49bde87dd971016
Subproject commit
0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录