Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9e3e3513
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9e3e3513
编写于
6月 20, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/3.0' into fix/dnode
上级
003be444
3155a228
变更
26
展开全部
隐藏空白更改
内联
并排
Showing
26 changed file
with
770 addition
and
428 deletion
+770
-428
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+6
-0
include/libs/qworker/qworker.h
include/libs/qworker/qworker.h
+2
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+2
-0
source/dnode/mnode/impl/src/mndQuery.c
source/dnode/mnode/impl/src/mndQuery.c
+7
-0
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+4
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+5
-3
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+3
-0
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+21
-0
source/libs/parser/src/parAstCreater.c
source/libs/parser/src/parAstCreater.c
+9
-6
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+90
-23
source/libs/parser/src/parUtil.c
source/libs/parser/src/parUtil.c
+2
-0
source/libs/parser/test/parSelectTest.cpp
source/libs/parser/test/parSelectTest.cpp
+0
-2
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+9
-4
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+6
-0
source/libs/qworker/inc/qwMsg.h
source/libs/qworker/inc/qwMsg.h
+1
-0
source/libs/qworker/src/qwMsg.c
source/libs/qworker/src/qwMsg.c
+20
-0
source/libs/qworker/src/qworker.c
source/libs/qworker/src/qworker.c
+7
-0
tests/pytest/util/common.py
tests/pytest/util/common.py
+127
-2
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-0
tests/script/tsim/stream/distributesession0.sim
tests/script/tsim/stream/distributesession0.sim
+58
-0
tests/system-test/2-query/Today.py
tests/system-test/2-query/Today.py
+144
-375
tests/system-test/7-tmq/subscribeDb4.py
tests/system-test/7-tmq/subscribeDb4.py
+116
-0
tests/system-test/7-tmq/subscribeStb4.py
tests/system-test/7-tmq/subscribeStb4.py
+10
-12
tests/system-test/7-tmq/tmqCommon.py
tests/system-test/7-tmq/tmqCommon.py
+118
-0
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
9e3e3513
...
...
@@ -96,6 +96,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
int32_t
mndProcessRpcMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndPreProcessMsg
(
SRpcMsg
*
pMsg
);
void
mndAbortPreprocessMsg
(
SRpcMsg
*
pMsg
);
/**
* @brief Generate machine code
...
...
include/libs/nodes/plannodes.h
浏览文件 @
9e3e3513
...
...
@@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode {
SNodeList
*
pFuncs
;
STimeWindow
timeRange
;
int64_t
interval
;
EFillMode
fillMode
;
SNode
*
pFillValues
;
// SNodeListNode
SNode
*
pTimeSeries
;
// SColumnNode
}
SInterpFuncLogicNode
;
typedef
enum
EModifyTableType
{
MODIFY_TABLE_TYPE_INSERT
=
1
,
MODIFY_TABLE_TYPE_DELETE
}
EModifyTableType
;
...
...
@@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode {
SNodeList
*
pFuncs
;
STimeWindow
timeRange
;
int64_t
interval
;
EFillMode
fillMode
;
SNode
*
pFillValues
;
// SNodeListNode
SNode
*
pTimeSeries
;
// SColumnNode
}
SInterpFuncPhysiNode
;
typedef
struct
SJoinPhysiNode
{
...
...
include/libs/qworker/qworker.h
浏览文件 @
9e3e3513
...
...
@@ -64,6 +64,8 @@ typedef struct {
int32_t
qWorkerInit
(
int8_t
nodeType
,
int32_t
nodeId
,
SQWorkerCfg
*
cfg
,
void
**
qWorkerMgmt
,
const
SMsgCb
*
pMsgCb
);
int32_t
qWorkerAbortPreprocessQueryMsg
(
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerPreprocessQueryMsg
(
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
);
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
);
...
...
include/util/taoserror.h
浏览文件 @
9e3e3513
...
...
@@ -564,6 +564,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A)
#define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B)
#define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C)
#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
9e3e3513
...
...
@@ -550,6 +550,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
pMsg
->
msgType
!=
TDMT_MND_TRANS_TIMER
)
{
mError
(
"msg:%p, failed to check mnode state since %s, type:%s"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
));
mndAbortPreprocessMsg
(
pMsg
);
SEpSet
epSet
=
{
0
};
mndGetMnodeEpSet
(
pMsg
->
info
.
node
,
&
epSet
);
...
...
source/dnode/mnode/impl/src/mndQuery.c
浏览文件 @
9e3e3513
...
...
@@ -25,6 +25,13 @@ int32_t mndPreProcessMsg(SRpcMsg *pMsg) {
return
qWorkerPreprocessQueryMsg
(
pMnode
->
pQuery
,
pMsg
);
}
void
mndAbortPreprocessMsg
(
SRpcMsg
*
pMsg
)
{
if
(
TDMT_VND_QUERY
!=
pMsg
->
msgType
)
return
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
qWorkerAbortPreprocessQueryMsg
(
pMnode
->
pQuery
,
pMsg
);
}
int32_t
mndProcessQueryMsg
(
SRpcMsg
*
pMsg
)
{
int32_t
code
=
-
1
;
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
9e3e3513
...
...
@@ -2892,7 +2892,10 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
SArray
*
pChWins
=
getWinInfos
(
&
pChInfo
->
streamAggSup
,
groupId
);
int32_t
chWinSize
=
taosArrayGetSize
(
pChWins
);
int32_t
index
=
binarySearch
(
pChWins
,
chWinSize
,
pParentWin
->
win
.
skey
,
TSDB_ORDER_DESC
,
getSessionWindowEndkey
);
for
(
int32_t
k
=
index
;
k
>
0
&&
k
<
chWinSize
;
k
++
)
{
if
(
index
<
0
)
{
index
=
0
;
}
for
(
int32_t
k
=
index
;
k
<
chWinSize
;
k
++
)
{
SResultWindowInfo
*
pcw
=
taosArrayGet
(
pChWins
,
k
);
if
(
pParentWin
->
win
.
skey
<=
pcw
->
win
.
skey
&&
pcw
->
win
.
ekey
<=
pParentWin
->
win
.
ekey
)
{
SResultRow
*
pChResult
=
NULL
;
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
9e3e3513
...
...
@@ -1481,11 +1481,13 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
if
(
pSBuf
->
assign
&&
((((
*
(
double
*
)
&
pDBuf
->
v
)
<
(
*
(
double
*
)
&
pSBuf
->
v
))
^
isMinFunc
)
||
!
pDBuf
->
assign
))
{
*
(
double
*
)
&
pDBuf
->
v
=
*
(
double
*
)
&
pSBuf
->
v
;
replaceTupleData
(
&
pDBuf
->
tuplePos
,
&
pSBuf
->
tuplePos
);
pDBuf
->
assign
=
true
;
}
}
else
{
if
(
pSBuf
->
assign
&&
(((
pDBuf
->
v
<
pSBuf
->
v
)
^
isMinFunc
)
||
!
pDBuf
->
assign
))
{
pDBuf
->
v
=
pSBuf
->
v
;
replaceTupleData
(
&
pDBuf
->
tuplePos
,
&
pSBuf
->
tuplePos
);
pDBuf
->
assign
=
true
;
}
}
pDResInfo
->
numOfRes
=
TMAX
(
pDResInfo
->
numOfRes
,
pSResInfo
->
numOfRes
);
...
...
@@ -1745,10 +1747,10 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if
(
IS_INTEGER_TYPE
(
type
))
{
avg
=
pStddevRes
->
isum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
pStddevRes
->
result
=
sqrt
(
fabs
(
pStddevRes
->
quadraticISum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
)
);
}
else
{
avg
=
pStddevRes
->
dsum
/
((
double
)
pStddevRes
->
count
);
pStddevRes
->
result
=
sqrt
(
pStddevRes
->
quadraticDSum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
);
pStddevRes
->
result
=
sqrt
(
fabs
(
pStddevRes
->
quadraticDSum
/
((
double
)
pStddevRes
->
count
)
-
avg
*
avg
)
);
}
return
functionFinalize
(
pCtx
,
pBlock
);
...
...
@@ -5362,4 +5364,4 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) {
#endif
return
TSDB_CODE_SUCCESS
;
}
\ No newline at end of file
}
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
9e3e3513
...
...
@@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL
CLONE_NODE_LIST_FIELD
(
pFuncs
);
COPY_OBJECT_FIELD
(
timeRange
,
sizeof
(
STimeWindow
));
COPY_SCALAR_FIELD
(
interval
);
COPY_SCALAR_FIELD
(
fillMode
);
CLONE_NODE_FIELD
(
pFillValues
);
CLONE_NODE_FIELD
(
pTimeSeries
);
return
(
SNode
*
)
pDst
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
9e3e3513
...
...
@@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs";
static
const
char
*
jkInterpFuncPhysiPlanStartTime
=
"StartTime"
;
static
const
char
*
jkInterpFuncPhysiPlanEndTime
=
"EndTime"
;
static
const
char
*
jkInterpFuncPhysiPlanInterval
=
"Interval"
;
static
const
char
*
jkInterpFuncPhysiPlanFillMode
=
"FillMode"
;
static
const
char
*
jkInterpFuncPhysiPlanFillValues
=
"FillValues"
;
static
const
char
*
jkInterpFuncPhysiPlanTimeSeries
=
"TimeSeries"
;
static
int32_t
physiInterpFuncNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SInterpFuncPhysiNode
*
pNode
=
(
const
SInterpFuncPhysiNode
*
)
pObj
;
...
...
@@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkInterpFuncPhysiPlanInterval
,
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkInterpFuncPhysiPlanFillMode
,
pNode
->
fillMode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkInterpFuncPhysiPlanFillValues
,
nodeToJson
,
pNode
->
pFillValues
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkInterpFuncPhysiPlanTimeSeries
,
nodeToJson
,
pNode
->
pTimeSeries
);
}
return
code
;
}
...
...
@@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkInterpFuncPhysiPlanInterval
,
&
pNode
->
interval
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkInterpFuncPhysiPlanFillMode
,
pNode
->
fillMode
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkInterpFuncPhysiPlanFillValues
,
&
pNode
->
pFillValues
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkInterpFuncPhysiPlanTimeSeries
,
&
pNode
->
pTimeSeries
);
}
return
code
;
}
...
...
source/libs/parser/src/parAstCreater.c
浏览文件 @
9e3e3513
...
...
@@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) {
SNode
*
addFillClause
(
SAstCreateContext
*
pCxt
,
SNode
*
pStmt
,
SNode
*
pFill
)
{
CHECK_PARSER_STATUS
(
pCxt
);
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pStmt
))
{
((
SSelectStmt
*
)
pStmt
)
->
pFill
=
pFill
;
if
(
QUERY_NODE_SELECT_STMT
==
nodeType
(
pStmt
)
&&
NULL
!=
pFill
)
{
SFillNode
*
pFillClause
=
(
SFillNode
*
)
pFill
;
nodesDestroyNode
(
pFillClause
->
pWStartTs
);
pFillClause
->
pWStartTs
=
createPrimaryKeyCol
(
pCxt
);
((
SSelectStmt
*
)
pStmt
)
->
pFill
=
(
SNode
*
)
pFillClause
;
}
return
pStmt
;
}
...
...
@@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) {
pOptions
->
watermark1
=
TSDB_DEFAULT_ROLLUP_WATERMARK
;
pOptions
->
watermark2
=
TSDB_DEFAULT_ROLLUP_WATERMARK
;
pOptions
->
ttl
=
TSDB_DEFAULT_TABLE_TTL
;
pOptions
->
commentNull
=
true
;
// mark null
pOptions
->
commentNull
=
true
;
// mark null
return
(
SNode
*
)
pOptions
;
}
...
...
@@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) {
STableOptions
*
pOptions
=
(
STableOptions
*
)
nodesMakeNode
(
QUERY_NODE_TABLE_OPTIONS
);
CHECK_OUT_OF_MEM
(
pOptions
);
pOptions
->
ttl
=
-
1
;
pOptions
->
commentNull
=
true
;
// mark null
pOptions
->
commentNull
=
true
;
// mark null
return
(
SNode
*
)
pOptions
;
}
...
...
@@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
case
TABLE_OPTION_ROLLUP
:
((
STableOptions
*
)
pOptions
)
->
pRollupFuncs
=
pVal
;
break
;
case
TABLE_OPTION_TTL
:{
case
TABLE_OPTION_TTL
:
{
int64_t
ttl
=
taosStr2Int64
(((
SToken
*
)
pVal
)
->
z
,
NULL
,
10
);
if
(
ttl
>
INT32_MAX
){
if
(
ttl
>
INT32_MAX
)
{
ttl
=
INT32_MAX
;
}
// ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER)
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
9e3e3513
...
...
@@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin
return
code
;
}
static
int32_t
checkFill
(
STranslateContext
*
pCxt
,
SIntervalWindowNode
*
pInterval
)
{
SFillNode
*
pFill
=
(
SFillNode
*
)
pInterval
->
pFill
;
static
int32_t
checkFill
(
STranslateContext
*
pCxt
,
SFillNode
*
pFill
,
SValueNode
*
pInterval
)
{
if
(
FILL_MODE_NONE
==
pFill
->
mode
)
{
return
TSDB_CODE_SUCCESS
;
}
if
(
TSWINDOW_IS_EQUAL
(
pFill
->
timeRange
,
TSWINDOW_INITIALIZER
)
||
TSWINDOW_IS_EQUAL
(
pFill
->
timeRange
,
TSWINDOW_DESC_INITIALIZER
))
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
);
}
int64_t
timeRange
=
TABS
(
pFill
->
timeRange
.
skey
-
pFill
->
timeRange
.
ekey
);
int64_t
intervalRange
=
0
;
SValueNode
*
pInter
=
(
SValueNode
*
)
pInterval
->
pInterval
;
if
(
TIME_IS_VAR_DURATION
(
pInter
->
unit
))
{
// interp FILL clause
if
(
NULL
==
pInterval
)
{
return
TSDB_CODE_SUCCESS
;
}
int64_t
timeRange
=
TABS
(
pFill
->
timeRange
.
skey
-
pFill
->
timeRange
.
ekey
);
int64_t
intervalRange
=
0
;
if
(
TIME_IS_VAR_DURATION
(
pInterval
->
unit
))
{
int64_t
f
=
1
;
if
(
pInter
->
unit
==
'n'
)
{
if
(
pInter
val
->
unit
==
'n'
)
{
f
=
30L
*
MILLISECOND_PER_DAY
;
}
else
if
(
pInter
->
unit
==
'y'
)
{
}
else
if
(
pInter
val
->
unit
==
'y'
)
{
f
=
365L
*
MILLISECOND_PER_DAY
;
}
intervalRange
=
pInter
->
datum
.
i
*
f
;
intervalRange
=
pInter
val
->
datum
.
i
*
f
;
}
else
{
intervalRange
=
pInter
->
datum
.
i
;
intervalRange
=
pInter
val
->
datum
.
i
;
}
if
((
timeRange
==
0
)
||
(
timeRange
/
intervalRange
)
>=
MAX_INTERVAL_TIME_WINDOW
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE
);
...
...
@@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi
int32_t
code
=
getFillTimeRange
(
pCxt
,
pWhere
,
&
(((
SFillNode
*
)
pInterval
->
pFill
)
->
timeRange
));
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkFill
(
pCxt
,
pInterval
);
code
=
checkFill
(
pCxt
,
(
SFillNode
*
)
pInterval
->
pFill
,
(
SValueNode
*
)
pInterval
->
pInterval
);
}
return
code
;
}
...
...
@@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
return
code
;
}
static
int32_t
createDefaultFillNode
(
STranslateContext
*
pCxt
,
SNode
**
pOutput
)
{
SFillNode
*
pFill
=
(
SFillNode
*
)
nodesMakeNode
(
QUERY_NODE_FILL
);
if
(
NULL
==
pFill
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pFill
->
mode
=
FILL_MODE_NONE
;
SColumnNode
*
pCol
=
(
SColumnNode
*
)
nodesMakeNode
(
QUERY_NODE_COLUMN
);
if
(
NULL
==
pCol
)
{
nodesDestroyNode
((
SNode
*
)
pFill
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pCol
->
colId
=
PRIMARYKEY_TIMESTAMP_COL_ID
;
strcpy
(
pCol
->
colName
,
PK_TS_COL_INTERNAL_NAME
);
pFill
->
pWStartTs
=
(
SNode
*
)
pCol
;
*
pOutput
=
(
SNode
*
)
pFill
;
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
translateInterpFill
(
STranslateContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
NULL
==
pSelect
->
pFill
)
{
code
=
createDefaultFillNode
(
pCxt
,
&
pSelect
->
pFill
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateExpr
(
pCxt
,
&
pSelect
->
pFill
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
getFillTimeRange
(
pCxt
,
pSelect
->
pRange
,
&
(((
SFillNode
*
)
pSelect
->
pFill
)
->
timeRange
));
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkFill
(
pCxt
,
(
SFillNode
*
)
pSelect
->
pFill
,
(
SValueNode
*
)
pSelect
->
pEvery
);
}
return
code
;
}
static
int32_t
translateInterp
(
STranslateContext
*
pCxt
,
SSelectStmt
*
pSelect
)
{
if
(
!
pSelect
->
hasInterpFunc
)
{
if
(
NULL
!=
pSelect
->
pRange
||
NULL
!=
pSelect
->
pEvery
||
NULL
!=
pSelect
->
pFill
)
{
return
generateSyntaxErrMsg
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_INVALID_INTERP_CLAUSE
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
code
=
translateExpr
(
pCxt
,
&
pSelect
->
pRange
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateExpr
(
pCxt
,
&
pSelect
->
pEvery
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateInterpFill
(
pCxt
,
pSelect
);
}
return
code
;
}
static
int32_t
translatePartitionBy
(
STranslateContext
*
pCxt
,
SNodeList
*
pPartitionByList
)
{
pCxt
->
currClause
=
SQL_CLAUSE_PARTITION_BY
;
return
translateExprList
(
pCxt
,
pPartitionByList
);
...
...
@@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
checkLimit
(
pCxt
,
pSelect
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
translateInterp
(
pCxt
,
pSelect
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
rewriteUniqueStmt
(
pCxt
,
pSelect
);
}
...
...
@@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm
pReq
->
delay2
=
pStmt
->
pOptions
->
maxDelay2
;
pReq
->
watermark1
=
pStmt
->
pOptions
->
watermark1
;
pReq
->
watermark2
=
pStmt
->
pOptions
->
watermark2
;
// pReq->ttl = pStmt->pOptions->ttl;
// pReq->ttl = pStmt->pOptions->ttl;
columnDefNodeToField
(
pStmt
->
pCols
,
&
pReq
->
pColumns
);
columnDefNodeToField
(
pStmt
->
pTags
,
&
pReq
->
pTags
);
pReq
->
numOfColumns
=
LIST_LENGTH
(
pStmt
->
pCols
);
pReq
->
numOfTags
=
LIST_LENGTH
(
pStmt
->
pTags
);
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
pReq
->
comment
=
strdup
(
pStmt
->
pOptions
->
comment
);
if
(
NULL
==
pReq
->
comment
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pReq
->
commentLen
=
strlen
(
pStmt
->
pOptions
->
comment
);
}
else
{
}
else
{
pReq
->
commentLen
=
-
1
;
}
...
...
@@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt*
pAlterReq
->
alterType
=
pStmt
->
alterType
;
if
(
TSDB_ALTER_TABLE_UPDATE_OPTIONS
==
pStmt
->
alterType
)
{
// pAlterReq->ttl = pStmt->pOptions->ttl;
// pAlterReq->ttl = pStmt->pOptions->ttl;
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
pAlterReq
->
comment
=
strdup
(
pStmt
->
pOptions
->
comment
);
if
(
NULL
==
pAlterReq
->
comment
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pAlterReq
->
commentLen
=
strlen
(
pStmt
->
pOptions
->
comment
);
}
else
{
}
else
{
pAlterReq
->
commentLen
=
-
1
;
}
...
...
@@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
return
TSDB_CODE_OUT_OF_MEMORY
;
}
req
.
commentLen
=
strlen
(
pStmt
->
pOptions
->
comment
);
}
else
{
}
else
{
req
.
commentLen
=
-
1
;
}
req
.
ntb
.
schemaRow
.
nCols
=
LIST_LENGTH
(
pStmt
->
pCols
);
...
...
@@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
struct
SVCreateTbReq
req
=
{
0
};
req
.
type
=
TD_CHILD_TABLE
;
req
.
name
=
strdup
(
pStmt
->
tableName
);
req
.
ttl
=
pStmt
->
pOptions
->
ttl
;
req
.
ttl
=
pStmt
->
pOptions
->
ttl
;
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
req
.
comment
=
strdup
(
pStmt
->
pOptions
->
comment
);
req
.
commentLen
=
strlen
(
pStmt
->
pOptions
->
comment
);
}
else
{
}
else
{
req
.
commentLen
=
-
1
;
}
req
.
ctb
.
suid
=
suid
;
...
...
@@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p
pReq
->
newTTL
=
pStmt
->
pOptions
->
ttl
;
}
if
(
TSDB_CODE_SUCCESS
==
code
){
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
if
(
TSDB_CODE_SUCCESS
==
code
)
{
if
(
pStmt
->
pOptions
->
commentNull
==
false
)
{
pReq
->
newComment
=
strdup
(
pStmt
->
pOptions
->
comment
);
if
(
NULL
==
pReq
->
newComment
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
pReq
->
newCommentLen
=
strlen
(
pReq
->
newComment
);
}
else
{
}
else
{
pReq
->
newCommentLen
=
-
1
;
}
}
...
...
source/libs/parser/src/parUtil.c
浏览文件 @
9e3e3513
...
...
@@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return
"%s function does not supportted in group query"
;
case
TSDB_CODE_PAR_INVALID_TABLE_OPTION
:
return
"Invalid option %s"
;
case
TSDB_CODE_PAR_INVALID_INTERP_CLAUSE
:
return
"Invalid usage of RANGE clause, EVERY clause or FILL clause"
;
case
TSDB_CODE_OUT_OF_MEMORY
:
return
"Out of memory"
;
default:
...
...
source/libs/parser/test/parSelectTest.cpp
浏览文件 @
9e3e3513
...
...
@@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) {
run
(
"SELECT INTERP(c1) FROM t1 EVERY(5s)"
);
run
(
"SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"
);
run
(
"SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"
);
run
(
"SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
9e3e3513
...
...
@@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
code
=
rewriteExprsForSelect
(
pInterpFunc
->
pFuncs
,
pSelect
,
SQL_CLAUSE_SELECT
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSelect
->
pRange
)
{
// SRangeNode* pRange = (SRangeNode*)pSelect->pRange;
// pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i;
// pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i;
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSelect
->
pFill
)
{
SFillNode
*
pFill
=
(
SFillNode
*
)
pSelect
->
pFill
;
pInterpFunc
->
timeRange
=
pFill
->
timeRange
;
pInterpFunc
->
fillMode
=
pFill
->
mode
;
pInterpFunc
->
pTimeSeries
=
nodesCloneNode
(
pFill
->
pWStartTs
);
pInterpFunc
->
pFillValues
=
nodesCloneNode
(
pFill
->
pValues
);
if
(
NULL
==
pInterpFunc
->
pTimeSeries
||
(
NULL
!=
pFill
->
pValues
&&
NULL
==
pInterpFunc
->
pFillValues
))
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
NULL
!=
pSelect
->
pEvery
)
{
...
...
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
9e3e3513
...
...
@@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh
if
(
TSDB_CODE_SUCCESS
==
code
)
{
pInterpFunc
->
timeRange
=
pFuncLogicNode
->
timeRange
;
pInterpFunc
->
interval
=
pFuncLogicNode
->
interval
;
pInterpFunc
->
fillMode
=
pFuncLogicNode
->
fillMode
;
pInterpFunc
->
pFillValues
=
nodesCloneNode
(
pFuncLogicNode
->
pFillValues
);
pInterpFunc
->
pTimeSeries
=
nodesCloneNode
(
pFuncLogicNode
->
pTimeSeries
);
if
(
NULL
==
pInterpFunc
->
pTimeSeries
||
(
NULL
!=
pFuncLogicNode
->
pFillValues
&&
NULL
==
pInterpFunc
->
pFillValues
))
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
...
...
source/libs/qworker/inc/qwMsg.h
浏览文件 @
9e3e3513
...
...
@@ -23,6 +23,7 @@ extern "C" {
#include "qwInt.h"
#include "dataSinkMgt.h"
int32_t
qwAbortPrerocessQuery
(
QW_FPARAMS_DEF
);
int32_t
qwPrerocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
int32_t
qwProcessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
,
int8_t
taskType
,
int8_t
explain
,
const
char
*
sql
);
int32_t
qwProcessCQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
);
...
...
source/libs/qworker/src/qwMsg.c
浏览文件 @
9e3e3513
...
...
@@ -283,6 +283,26 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerAbortPreprocessQueryMsg
(
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
)
{
if
(
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
}
SSubQueryMsg
*
msg
=
pMsg
->
pCont
;
SQWorker
*
mgmt
=
(
SQWorker
*
)
qWorkerMgmt
;
uint64_t
sId
=
msg
->
sId
;
uint64_t
qId
=
msg
->
queryId
;
uint64_t
tId
=
msg
->
taskId
;
int64_t
rId
=
msg
->
refId
;
QW_SCH_TASK_DLOG
(
"Abort prerocessQuery start, handle:%p"
,
pMsg
->
info
.
handle
);
qwAbortPrerocessQuery
(
QW_FPARAMS
());
QW_SCH_TASK_DLOG
(
"Abort prerocessQuery end, handle:%p"
,
pMsg
->
info
.
handle
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
qWorkerProcessQueryMsg
(
void
*
node
,
void
*
qWorkerMgmt
,
SRpcMsg
*
pMsg
,
int64_t
ts
)
{
if
(
NULL
==
node
||
NULL
==
qWorkerMgmt
||
NULL
==
pMsg
)
{
QW_ERR_RET
(
TSDB_CODE_QRY_INVALID_INPUT
);
...
...
source/libs/qworker/src/qworker.c
浏览文件 @
9e3e3513
...
...
@@ -482,6 +482,13 @@ _return:
QW_RET
(
code
);
}
int32_t
qwAbortPrerocessQuery
(
QW_FPARAMS_DEF
)
{
QW_ERR_RET
(
qwDropTask
(
QW_FPARAMS
()));
QW_RET
(
TSDB_CODE_SUCCESS
);
}
int32_t
qwPrerocessQuery
(
QW_FPARAMS_DEF
,
SQWMsg
*
qwMsg
)
{
int32_t
code
=
0
;
bool
queryRsped
=
false
;
...
...
tests/pytest/util/common.py
浏览文件 @
9e3e3513
...
...
@@ -11,13 +11,20 @@
# -*- coding: utf-8 -*-
from
collections
import
defaultdict
import
random
import
string
from
util.sql
import
tdSql
from
util.dnodes
import
tdDnodes
import
requests
import
time
import
socket
import
taos
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
class
TDCom
:
def
init
(
self
,
conn
,
logSql
):
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
...
...
@@ -170,4 +177,122 @@ class TDCom:
def
close
(
self
):
self
.
cursor
.
close
()
def
create_database
(
self
,
tsql
,
dbName
=
'test'
,
dropFlag
=
1
,
precision
=
"ms"
,
**
kwargs
):
if
dropFlag
==
1
:
tsql
.
execute
(
"drop database if exists %s"
%
(
dbName
))
'''
vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions
'''
sqlString
=
f
'create database if not exists
{
dbName
}
precision "
{
precision
}
" vgroups 4'
if
len
(
kwargs
)
>
0
:
dbParams
=
""
for
param
,
value
in
kwargs
.
items
():
dbParams
+=
f
'
{
param
}
{
value
}
'
sqlString
+=
f
'
{
dbParams
}
'
tsql
.
execute
(
sqlString
)
tdLog
.
debug
(
"complete to create database %s"
%
(
dbName
))
return
def
create_stable
(
self
,
tsql
,
dbName
,
stbName
,
columnDict
,
tagDict
):
colSchema
=
''
for
i
in
range
(
columnDict
[
'int'
]):
colSchema
+=
', c%d int'
%
i
tagSchema
=
''
for
i
in
range
(
tagDict
[
'int'
]):
if
i
>
0
:
tagSchema
+=
','
tagSchema
+=
't%d int'
%
i
tsql
.
execute
(
"create table if not exists %s.%s (ts timestamp %s) tags(%s)"
%
(
dbName
,
stbName
,
colSchema
,
tagSchema
))
tdLog
.
debug
(
"complete to create %s.%s"
%
(
dbName
,
stbName
))
return
def
create_ctables
(
self
,
tsql
,
dbName
,
stbName
,
ctbNum
,
tagDict
):
tsql
.
execute
(
"use %s"
%
dbName
)
tagsValues
=
''
for
i
in
range
(
tagDict
[
'int'
]):
if
i
>
0
:
tagsValues
+=
','
tagsValues
+=
'%d'
%
i
pre_create
=
"create table"
sql
=
pre_create
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
for
i
in
range
(
ctbNum
):
sql
+=
" %s_%d using %s tags(%s)"
%
(
stbName
,
i
,
stbName
,
tagsValues
)
if
(
i
>
0
)
and
(
i
%
100
==
0
):
tsql
.
execute
(
sql
)
sql
=
pre_create
if
sql
!=
pre_create
:
tsql
.
execute
(
sql
)
tdLog
.
debug
(
"complete to create %d child tables in %s.%s"
%
(
ctbNum
,
dbName
,
stbName
))
return
def
insert_data
(
self
,
tsql
,
dbName
,
stbName
,
ctbNum
,
rowsPerTbl
,
batchNum
,
startTs
=
0
):
tdLog
.
debug
(
"start to insert data ............"
)
tsql
.
execute
(
"use %s"
%
dbName
)
pre_insert
=
"insert into "
sql
=
pre_insert
if
startTs
==
0
:
t
=
time
.
time
()
startTs
=
int
(
round
(
t
*
1000
))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
for
i
in
range
(
ctbNum
):
sql
+=
" %s_%d values "
%
(
stbName
,
i
)
for
j
in
range
(
rowsPerTbl
):
sql
+=
"(%d, %d, %d)"
%
(
startTs
+
j
,
j
,
j
)
if
(
j
>
0
)
and
((
j
%
batchNum
==
0
)
or
(
j
==
rowsPerTbl
-
1
)):
tsql
.
execute
(
sql
)
if
j
<
rowsPerTbl
-
1
:
sql
=
"insert into %s_%d values "
%
(
stbName
,
i
)
else
:
sql
=
"insert into "
#end sql
if
sql
!=
pre_insert
:
#print("insert sql:%s"%sql)
tsql
.
execute
(
sql
)
tdLog
.
debug
(
"insert data ............ [OK]"
)
return
def
getBuildPath
(
self
):
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
projPath
=
selfPath
[:
selfPath
.
find
(
"community"
)]
else
:
projPath
=
selfPath
[:
selfPath
.
find
(
"tests"
)]
for
root
,
dirs
,
files
in
os
.
walk
(
projPath
):
if
(
"taosd"
in
files
or
"taosd.exe"
in
files
):
rootRealPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
root
))
if
(
"packaging"
not
in
rootRealPath
):
buildPath
=
root
[:
len
(
root
)
-
len
(
"/build/bin"
)]
break
return
buildPath
def
getClientCfgPath
(
self
):
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
tdLog
.
exit
(
"taosd not found!"
)
else
:
tdLog
.
info
(
"taosd found in %s"
%
buildPath
)
cfgPath
=
buildPath
+
"/../sim/psim/cfg"
tdLog
.
info
(
"cfgPath: %s"
%
cfgPath
)
return
cfgPath
def
newcur
(
self
,
host
=
'localhost'
,
port
=
6030
,
user
=
'root'
,
password
=
'taosdata'
):
cfgPath
=
self
.
getClientCfgPath
()
con
=
taos
.
connect
(
host
=
host
,
user
=
user
,
password
=
password
,
config
=
cfgPath
,
port
=
port
)
cur
=
con
.
cursor
()
print
(
cur
)
return
cur
def
newTdSql
(
self
,
host
=
'localhost'
,
port
=
6030
,
user
=
'root'
,
password
=
'taosdata'
):
newTdSql
=
TDSql
()
cur
=
self
.
newcur
(
host
=
host
,
port
=
port
,
user
=
user
,
password
=
password
)
newTdSql
.
init
(
cur
,
False
)
return
newTdSql
tdCom
=
TDCom
()
tests/script/jenkins/basic.txt
浏览文件 @
9e3e3513
...
...
@@ -73,6 +73,7 @@
./test.sh -f tsim/stream/basic1.sim
./test.sh -f tsim/stream/basic2.sim
# ./test.sh -f tsim/stream/distributeInterval0.sim
# ./test.sh -f tsim/stream/distributesession0.sim
# ./test.sh -f tsim/stream/session0.sim
# ./test.sh -f tsim/stream/session1.sim
# ./test.sh -f tsim/stream/state0.sim
...
...
tests/script/tsim/stream/distributesession0.sim
0 → 100644
浏览文件 @
9e3e3513
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
sql create dnode $hostname2 port 7200
system sh/exec.sh -n dnode2 -s start
sql create database test vgroups 4;
sql use test;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ;
sleep 1000
sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1);
sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3);
sql insert into ts1 values(1648791211005,1,1,1);
sql insert into ts2 values(1648791221006,5,5,5) (1648791221007,5,5,5);
sql insert into ts2 values(1648791221008,5,5,5) (1648791221008,5,5,5)(1648791221006,5,5,5);
sql insert into ts1 values(1648791231000,1,1,1) (1648791231002,1,1,1) (1648791231006,1,1,1);
sql insert into ts1 values(1648791211000,6,6,6) (1648791231002,2,2,2);
sql insert into ts1 values(1648791211002,7,7,7);
sql insert into ts1 values(1648791211002,7,7,7) ts2 values(1648791221008,5,5,5) ;
$loop_count = 0
loop1:
sql select * from streamtST;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 10 then
print =====data01=$data01
goto loop1
endi
if $data02 != 34 then
print =====data02=$data02
goto loop1
endi
if $data03 != 7 then
print ======$data03
return -1
endi
system sh/stop_dnodes.sh
\ No newline at end of file
tests/system-test/2-query/Today.py
浏览文件 @
9e3e3513
此差异已折叠。
点击以展开。
tests/system-test/7-tmq/subscribeDb4.py
0 → 100644
浏览文件 @
9e3e3513
import
sys
import
time
import
socket
import
os
import
threading
import
taos
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
paraDict
=
{
'dbName'
:
'db12'
,
'dropFlag'
:
1
,
'vgroups'
:
4
,
'precision'
:
'ms'
,
'stbName'
:
'stb0'
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
0
,
# 1640966400000 ----> 2022-01-01 00:00:00.000
'event'
:
''
,
'columnDict'
:
{
'int'
:
2
},
'tagDict'
:
{
'int'
:
1
}
}
cdbName
=
'cdb'
# some parameter to consumer processor
consumerId
=
0
expectrowcnt
=
0
topicList
=
''
ifcheckdata
=
0
ifManualCommit
=
1
groupId
=
'group.id:cgrp1'
autoCommit
=
'enable.auto.commit:false'
autoCommitInterval
=
'auto.commit.interval.ms:1000'
autoOffset
=
'auto.offset.reset:earliest'
pollDelay
=
20
showMsg
=
1
showRow
=
1
hostname
=
socket
.
gethostname
()
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
logSql
=
False
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
def
tmqCase12
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 12: "
)
tdLog
.
info
(
"step 1: create database, stb, ctb and insert data"
)
tmqCom
.
initConsumerTable
(
self
.
cdbName
)
tdCom
.
create_database
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"dropFlag"
],
self
.
paraDict
[
'precision'
])
self
.
paraDict
[
"stbName"
]
=
'stb1'
tdCom
.
create_stable
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"columnDict"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
create_ctables
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
insert_data
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
])
self
.
paraDict
[
"stbName"
]
=
'stb2'
tdCom
.
create_stable
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"columnDict"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
create_ctables
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"tagDict"
])
tdCom
.
insert_data
(
tdSql
,
self
.
paraDict
[
"dbName"
],
self
.
paraDict
[
"stbName"
],
self
.
paraDict
[
"ctbNum"
],
self
.
paraDict
[
"rowsPerTbl"
],
self
.
paraDict
[
"batchNum"
])
tdLog
.
info
(
"create topics from db"
)
topicName1
=
'topic_%s'
%
(
self
.
paraDict
[
'dbName'
])
tdSql
.
execute
(
"create topic %s as database %s"
%
(
topicName1
,
self
.
paraDict
[
'dbName'
]))
topicList
=
topicName1
keyList
=
'%s,%s,%s,%s'
%
(
self
.
groupId
,
self
.
autoCommit
,
self
.
autoCommitInterval
,
self
.
autoOffset
)
self
.
expectrowcnt
=
self
.
paraDict
[
"rowsPerTbl"
]
*
self
.
paraDict
[
"ctbNum"
]
*
2
tmqCom
.
insertConsumerInfo
(
self
.
consumerId
,
self
.
expectrowcnt
,
topicList
,
keyList
,
self
.
ifcheckdata
,
self
.
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
self
.
pollDelay
,
self
.
paraDict
[
"dbName"
],
self
.
showMsg
,
self
.
showRow
,
self
.
cdbName
)
tdLog
.
info
(
"After waiting for a period of time, drop one stable"
)
time
.
sleep
(
10
)
tdSql
.
execute
(
"drop table %s.%s"
%
(
self
.
paraDict
[
'dbName'
],
self
.
paraDict
[
'stbName'
]))
tdLog
.
info
(
"wait result from consumer, then check it"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
totalConsumeRows
=
0
for
i
in
range
(
expectRows
):
totalConsumeRows
+=
resultList
[
i
]
if
not
(
totalConsumeRows
>=
self
.
expectrowcnt
/
2
and
totalConsumeRows
<=
self
.
expectrowcnt
):
tdLog
.
info
(
"act consume rows: %d, expect consume rows: between %d and %d"
%
(
totalConsumeRows
,
self
.
expectrowcnt
/
2
,
self
.
expectrowcnt
))
tdLog
.
exit
(
"tmq consume rows error!"
)
time
.
sleep
(
15
)
tdSql
.
query
(
"drop topic %s"
%
topicName1
)
tdLog
.
printNoPrefix
(
"======== test case 12 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase12
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/subscribeStb4.py
浏览文件 @
9e3e3513
...
...
@@ -196,16 +196,16 @@ class TDTestCase:
auotCtbPrefix
=
'autoCtb'
# create and start thread
parameterDict
=
{
'cfg'
:
''
,
\
'actionType'
:
0
,
\
'dbName'
:
'db1'
,
\
'dropFlag'
:
1
,
\
'vgroups'
:
4
,
\
'replica'
:
1
,
\
'stbName'
:
'stb1'
,
\
'ctbNum'
:
10
,
\
'rowsPerTbl'
:
10000
,
\
'batchNum'
:
100
,
\
parameterDict
=
{
'cfg'
:
''
,
'actionType'
:
0
,
'dbName'
:
'db1'
,
'dropFlag'
:
1
,
'vgroups'
:
4
,
'replica'
:
1
,
'stbName'
:
'stb1'
,
'ctbNum'
:
10
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
100
,
'startTs'
:
1640966400000
}
# 2022-01-01 00:00:00.000
parameterDict
[
'cfg'
]
=
cfgPath
...
...
@@ -349,7 +349,5 @@ class TDTestCase:
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/7-tmq/tmqCommon.py
0 → 100644
浏览文件 @
9e3e3513
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from
collections
import
defaultdict
import
random
import
string
import
threading
import
requests
import
time
# import socketfrom
import
taos
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
# class actionType(Enum):
# CREATE_DATABASE = 0
# CREATE_STABLE = 1
# CREATE_CTABLE = 2
# INSERT_DATA = 3
class
TMQCom
:
def
init
(
self
,
conn
,
logSql
):
tdSql
.
init
(
conn
.
cursor
())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
initConsumerTable
(
self
,
cdbName
=
'cdb'
):
tdLog
.
info
(
"create consume database, and consume info table, and consume result table"
)
tdSql
.
query
(
"create database if not exists %s vgroups 1"
%
(
cdbName
))
tdSql
.
query
(
"drop table if exists %s.consumeinfo "
%
(
cdbName
))
tdSql
.
query
(
"drop table if exists %s.consumeresult "
%
(
cdbName
))
tdSql
.
query
(
"create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"
%
cdbName
)
tdSql
.
query
(
"create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"
%
cdbName
)
def
initConsumerInfoTable
(
self
,
cdbName
=
'cdb'
):
tdLog
.
info
(
"drop consumeinfo table"
)
tdSql
.
query
(
"drop table if exists %s.consumeinfo "
%
(
cdbName
))
tdSql
.
query
(
"create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"
%
cdbName
)
def
insertConsumerInfo
(
self
,
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifmanualcommit
,
cdbName
=
'cdb'
):
sql
=
"insert into %s.consumeinfo values "
%
cdbName
sql
+=
"(now, %d, '%s', '%s', %d, %d, %d)"
%
(
consumerId
,
topicList
,
keyList
,
expectrowcnt
,
ifcheckdata
,
ifmanualcommit
)
tdLog
.
info
(
"consume info sql: %s"
%
sql
)
tdSql
.
query
(
sql
)
def
selectConsumeResult
(
self
,
expectRows
,
cdbName
=
'cdb'
):
resultList
=
[]
while
1
:
tdSql
.
query
(
"select * from %s.consumeresult"
%
cdbName
)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
if
tdSql
.
getRows
()
==
expectRows
:
break
else
:
time
.
sleep
(
5
)
for
i
in
range
(
expectRows
):
tdLog
.
info
(
"consume id: %d, consume msgs: %d, consume rows: %d"
%
(
tdSql
.
getData
(
i
,
1
),
tdSql
.
getData
(
i
,
2
),
tdSql
.
getData
(
i
,
3
)))
resultList
.
append
(
tdSql
.
getData
(
i
,
3
))
return
resultList
def
startTmqSimProcess
(
self
,
pollDelay
,
dbName
,
showMsg
=
1
,
showRow
=
1
,
cdbName
=
'cdb'
,
valgrind
=
0
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
if
valgrind
==
1
:
logFile
=
cfgPath
+
'/../log/valgrind-tmq.log'
shellCmd
=
'nohup valgrind --log-file='
+
logFile
shellCmd
+=
'--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
if
(
platform
.
system
().
lower
()
==
'windows'
):
shellCmd
=
'mintty -h never -w hide '
+
buildPath
+
'
\\
build
\\
bin
\\
tmq_sim.exe -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> nul 2>&1 &"
else
:
shellCmd
=
'nohup '
+
buildPath
+
'/build/bin/tmq_sim -c '
+
cfgPath
shellCmd
+=
" -y %d -d %s -g %d -r %d -w %s "
%
(
pollDelay
,
dbName
,
showMsg
,
showRow
,
cdbName
)
shellCmd
+=
"> /dev/null 2>&1 &"
tdLog
.
info
(
shellCmd
)
os
.
system
(
shellCmd
)
def
syncCreateDbStbCtbInsertData
(
self
,
tsql
,
paraDict
):
tdCom
.
create_database
(
tsql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
paraDict
[
'precision'
])
tdCom
.
create_stable
(
tsql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"columnDict"
],
paraDict
[
"tagDict"
])
tdCom
.
create_ctables
(
tsql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbNum"
],
paraDict
[
"tagDict"
])
if
"event"
in
paraDict
and
type
(
paraDict
[
'event'
])
==
type
(
threading
.
Event
()):
paraDict
[
"event"
].
set
()
tdCom
.
insert_data
(
tsql
,
paraDict
[
"dbName"
],
paraDict
[
"stbName"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
return
def
threadFunction
(
self
,
**
paraDict
):
# create new connector for new tdSql instance in my thread
newTdSql
=
tdCom
.
newTdSql
()
self
.
syncCreateDbStbCtbInsertData
(
self
,
newTdSql
,
paraDict
)
return
def
asyncCreateDbStbCtbInsertData
(
self
,
paraDict
):
pThread
=
threading
.
Thread
(
target
=
self
.
threadFunction
,
kwargs
=
paraDict
)
pThread
.
start
()
return
pThread
def
close
(
self
):
self
.
cursor
.
close
()
tmqCom
=
TMQCom
()
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录