Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
de891966
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
de891966
编写于
6月 14, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
8d182d0c
8b874876
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
563 addition
and
16 deletion
+563
-16
include/common/tmsg.h
include/common/tmsg.h
+8
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
source/common/src/tmsg.c
source/common/src/tmsg.c
+27
-0
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+2
-0
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+8
-4
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+27
-0
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+42
-10
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+7
-1
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/system-test/2-query/tail.py
tests/system-test/2-query/tail.py
+439
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
未找到文件。
include/common/tmsg.h
浏览文件 @
de891966
...
@@ -1294,6 +1294,14 @@ typedef struct {
...
@@ -1294,6 +1294,14 @@ typedef struct {
int32_t
tSerializeSDCreateMnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDCreateMnodeReq
*
pReq
);
int32_t
tSerializeSDCreateMnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDCreateMnodeReq
*
pReq
);
int32_t
tDeserializeSDCreateMnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDCreateMnodeReq
*
pReq
);
int32_t
tDeserializeSDCreateMnodeReq
(
void
*
buf
,
int32_t
bufLen
,
SDCreateMnodeReq
*
pReq
);
typedef
struct
{
int32_t
dnodeId
;
int8_t
standby
;
}
SSetStandbyReq
;
int32_t
tSerializeSSetStandbyReq
(
void
*
buf
,
int32_t
bufLen
,
SSetStandbyReq
*
pReq
);
int32_t
tDeserializeSSetStandbyReq
(
void
*
buf
,
int32_t
bufLen
,
SSetStandbyReq
*
pReq
);
typedef
struct
{
typedef
struct
{
int32_t
connId
;
int32_t
connId
;
int32_t
queryId
;
int32_t
queryId
;
...
...
include/common/tmsgdef.h
浏览文件 @
de891966
...
@@ -97,6 +97,7 @@ enum {
...
@@ -97,6 +97,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_MNODE
,
"create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_MNODE
,
"create-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_MNODE
,
"alter-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_MNODE
,
"alter-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_MNODE
,
"drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_MNODE
,
"drop-mnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_SET_STANDBY
,
"set-mnode-standby"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_QNODE
,
"create-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_QNODE
,
"create-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_QNODE
,
"alter-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_ALTER_QNODE
,
"alter-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_QNODE
,
"drop-qnode"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_QNODE
,
"drop-qnode"
,
NULL
,
NULL
)
...
...
source/common/src/tmsg.c
浏览文件 @
de891966
...
@@ -3507,6 +3507,33 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
...
@@ -3507,6 +3507,33 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
return
0
;
return
0
;
}
}
int32_t
tSerializeSSetStandbyReq
(
void
*
buf
,
int32_t
bufLen
,
SSetStandbyReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
standby
)
<
0
)
return
-
1
;
tEndEncode
(
&
encoder
);
int32_t
tlen
=
encoder
.
pos
;
tEncoderClear
(
&
encoder
);
return
tlen
;
}
int32_t
tDeserializeSSetStandbyReq
(
void
*
buf
,
int32_t
bufLen
,
SSetStandbyReq
*
pReq
)
{
SDecoder
decoder
=
{
0
};
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
standby
)
<
0
)
return
-
1
;
tEndDecode
(
&
decoder
);
tDecoderClear
(
&
decoder
);
return
0
;
}
int32_t
tSerializeSAuthReq
(
void
*
buf
,
int32_t
bufLen
,
SAuthReq
*
pReq
)
{
int32_t
tSerializeSAuthReq
(
void
*
buf
,
int32_t
bufLen
,
SAuthReq
*
pReq
)
{
SEncoder
encoder
=
{
0
};
SEncoder
encoder
=
{
0
};
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
de891966
...
@@ -155,6 +155,7 @@ SArray *mmGetMsgHandles() {
...
@@ -155,6 +155,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_MNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_QNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_QNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_QNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_QNODE
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
...
@@ -235,6 +236,7 @@ SArray *mmGetMsgHandles() {
...
@@ -235,6 +236,7 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY
,
mmPutNodeMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
code
=
0
;
code
=
0
;
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
de891966
...
@@ -428,7 +428,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
...
@@ -428,7 +428,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
syncClientRequestDestroy
(
pSyncMsg
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnRequestVoteSnapshotCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnRequestVoteSnapshotCb
(
pSyncNode
,
pSyncMsg
);
...
@@ -445,7 +444,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
...
@@ -445,7 +444,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReplySnapshotCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnAppendEntriesReplySnapshotCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_SNAPSHOT_SEND
)
{
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
SyncSnapshotSend
*
pSyncMsg
=
syncSnapshotSendFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotSendCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnSnapshotSendCb
(
pSyncNode
,
pSyncMsg
);
...
@@ -454,12 +452,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
...
@@ -454,12 +452,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
SyncSnapshotRsp
*
pSyncMsg
=
syncSnapshotRspFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnSnapshotRspCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnSnapshotRspCb
(
pSyncNode
,
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
syncSnapshotRspDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_MND_SET_STANDBY
)
{
code
=
syncSetStandby
(
pMgmt
->
sync
);
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
code
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
}
}
else
{
}
else
{
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
...
@@ -493,6 +493,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
...
@@ -493,6 +493,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnAppendEntriesReplyCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnAppendEntriesReplyCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_MND_SET_STANDBY
)
{
code
=
syncSetStandby
(
pMgmt
->
sync
);
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
else
{
}
else
{
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
mError
(
"failed to process msg:%p since invalid type:%s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
));
code
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
code
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
de891966
...
@@ -55,6 +55,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
...
@@ -55,6 +55,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_MNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_ALTER_MNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_MNODE
,
mndProcessDropMnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_DROP_MNODE
,
mndProcessDropMnodeReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_MNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_DND_DROP_MNODE_RSP
,
mndTransProcessRsp
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_SET_STANDBY_RSP
,
mndTransProcessRsp
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_MNODE
,
mndRetrieveMnodes
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_MNODE
,
mndRetrieveMnodes
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_MNODE
,
mndCancelGetNextMnode
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_MNODE
,
mndCancelGetNextMnode
);
...
@@ -460,6 +461,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
...
@@ -460,6 +461,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
int32_t
numOfReplicas
=
0
;
int32_t
numOfReplicas
=
0
;
SDAlterMnodeReq
alterReq
=
{
0
};
SDAlterMnodeReq
alterReq
=
{
0
};
SDDropMnodeReq
dropReq
=
{
0
};
SDDropMnodeReq
dropReq
=
{
0
};
SSetStandbyReq
standbyReq
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
...
@@ -494,6 +496,31 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
...
@@ -494,6 +496,31 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
dropEpSet
.
eps
[
0
].
port
=
pDnode
->
port
;
dropEpSet
.
eps
[
0
].
port
=
pDnode
->
port
;
memcpy
(
dropEpSet
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
memcpy
(
dropEpSet
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
standbyReq
.
dnodeId
=
pDnode
->
id
;
standbyReq
.
standby
=
1
;
{
int32_t
contLen
=
tSerializeSSetStandbyReq
(
NULL
,
0
,
&
standbyReq
)
+
sizeof
(
SMsgHead
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSSetStandbyReq
((
char
*
)
pReq
+
sizeof
(
SMsgHead
),
contLen
,
&
standbyReq
);
SMsgHead
*
pHead
=
pReq
;
pHead
->
contLen
=
htonl
(
contLen
);
pHead
->
vgId
=
htonl
(
MNODE_HANDLE
);
STransAction
action
=
{
.
epSet
=
dropEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_MND_SET_STANDBY
,
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
{
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
de891966
...
@@ -152,7 +152,7 @@ int32_t syncSetStandby(int64_t rid) {
...
@@ -152,7 +152,7 @@ int32_t syncSetStandby(int64_t rid) {
return
-
1
;
return
-
1
;
}
}
if
(
pSyncNode
->
state
!
=
TAOS_SYNC_STATE_LEADER
)
{
if
(
pSyncNode
->
state
=
=
TAOS_SYNC_STATE_LEADER
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
-
1
;
return
-
1
;
}
}
...
@@ -170,6 +170,7 @@ int32_t syncSetStandby(int64_t rid) {
...
@@ -170,6 +170,7 @@ int32_t syncSetStandby(int64_t rid) {
raftCfgPersist
(
pSyncNode
->
pRaftCfg
);
raftCfgPersist
(
pSyncNode
->
pRaftCfg
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
sInfo
(
"vgId:%d, set to standby"
,
pSyncNode
->
vgId
);
return
0
;
return
0
;
}
}
...
@@ -1157,6 +1158,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
...
@@ -1157,6 +1158,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
int32_t
ret
=
0
;
int32_t
ret
=
0
;
// save snapshot senders
int32_t
oldReplicaNum
=
pSyncNode
->
replicaNum
;
SRaftId
oldReplicasId
[
TSDB_MAX_REPLICA
];
memcpy
(
oldReplicasId
,
pSyncNode
->
replicasId
,
sizeof
(
oldReplicasId
));
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
memcpy
(
oldSenders
,
pSyncNode
->
senders
,
sizeof
(
oldSenders
));
// init internal
// init internal
pSyncNode
->
myNodeInfo
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
];
pSyncNode
->
myNodeInfo
=
pSyncNode
->
pRaftCfg
->
cfg
.
nodeInfo
[
pSyncNode
->
pRaftCfg
->
cfg
.
myIndex
];
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
myNodeInfo
,
pSyncNode
->
vgId
,
&
pSyncNode
->
myRaftId
);
syncUtilnodeInfo2raftId
(
&
pSyncNode
->
myNodeInfo
,
pSyncNode
->
vgId
,
&
pSyncNode
->
myRaftId
);
...
@@ -1187,6 +1195,27 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
...
@@ -1187,6 +1195,27 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
pSyncNode
->
quorum
=
syncUtilQuorum
(
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
);
pSyncNode
->
quorum
=
syncUtilQuorum
(
pSyncNode
->
pRaftCfg
->
cfg
.
replicaNum
);
// reset snapshot senders, memory leak
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
(
pSyncNode
->
senders
)[
i
]
=
NULL
;
}
for
(
int
i
=
0
;
i
<
pSyncNode
->
replicaNum
;
++
i
)
{
for
(
int
j
=
0
;
j
<
TSDB_MAX_REPLICA
;
++
j
)
{
if
(
syncUtilSameId
(
&
(
pSyncNode
->
replicasId
)[
i
],
&
oldReplicasId
[
j
]))
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d sync event reset sender for %lu, %s:%d"
,
pSyncNode
->
vgId
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
port
);
(
pSyncNode
->
senders
)[
i
]
=
oldSenders
[
j
];
}
}
}
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
}
}
bool
IamInOld
=
false
;
bool
IamInOld
=
false
;
bool
IamInNew
=
false
;
bool
IamInNew
=
false
;
for
(
int
i
=
0
;
i
<
oldConfig
.
replicaNum
;
++
i
)
{
for
(
int
i
=
0
;
i
<
oldConfig
.
replicaNum
;
++
i
)
{
...
@@ -1845,7 +1874,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
...
@@ -1845,7 +1874,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
bool
isDrop
;
bool
isDrop
;
if
(
IamInNew
||
(
!
IamInNew
&&
ths
->
state
!=
TAOS_SYNC_STATE_LEADER
))
{
//if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if
(
IamInNew
)
{
syncNodeUpdateConfig
(
ths
,
&
newSyncCfg
,
pEntry
->
index
,
&
isDrop
);
syncNodeUpdateConfig
(
ths
,
&
newSyncCfg
,
pEntry
->
index
,
&
isDrop
);
// change isStandBy to normal
// change isStandBy to normal
...
@@ -1856,14 +1886,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
...
@@ -1856,14 +1886,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
syncNodeBecomeFollower
(
ths
,
"config change"
);
syncNodeBecomeFollower
(
ths
,
"config change"
);
}
}
}
}
}
else
{
if
(
gRaftDetailLog
)
{
syncNodeBecomeFollower
(
ths
,
"config change2"
);
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
}
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d
\n
"
,
sOld
,
sNew
,
isDrop
);
if
(
gRaftDetailLog
)
{
taosMemoryFree
(
sOld
);
char
*
sOld
=
syncCfg2Str
(
&
oldSyncCfg
);
taosMemoryFree
(
sNew
);
char
*
sNew
=
syncCfg2Str
(
&
newSyncCfg
);
}
sInfo
(
"==config change== 0x11 old:%s new:%s isDrop:%d index:%ld
\n
"
,
sOld
,
sNew
,
isDrop
,
pEntry
->
index
);
taosMemoryFree
(
sOld
);
taosMemoryFree
(
sNew
);
}
}
// always call FpReConfigCb
// always call FpReConfigCb
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
de891966
...
@@ -755,6 +755,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
...
@@ -755,6 +755,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// sender receives ack, set seq = ack + 1, send msg from seq
// sender receives ack, set seq = ack + 1, send msg from seq
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
pSyncNode
,
SyncSnapshotRsp
*
pMsg
)
{
int32_t
syncNodeOnSnapshotRspCb
(
SSyncNode
*
pSyncNode
,
SyncSnapshotRsp
*
pMsg
)
{
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
pSyncNode
,
&
(
pMsg
->
srcId
))
&&
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
sInfo
(
"recv SyncSnapshotRsp maybe replica already dropped"
);
return
0
;
}
// get sender
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pMsg
->
srcId
));
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
ASSERT
(
pSender
!=
NULL
);
...
@@ -788,4 +794,4 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
...
@@ -788,4 +794,4 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
}
}
return
0
;
return
0
;
}
}
\ No newline at end of file
tests/script/jenkins/basic.txt
浏览文件 @
de891966
...
@@ -58,7 +58,7 @@
...
@@ -58,7 +58,7 @@
# ---- mnode
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
#
./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim
./test.sh -f tsim/mnode/basic5.sim
...
...
tests/system-test/2-query/tail.py
0 → 100644
浏览文件 @
de891966
from
random
import
randint
,
random
from
numpy
import
equal
import
taos
import
sys
import
datetime
import
inspect
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
class
TDTestCase
:
updatecfgDict
=
{
'debugFlag'
:
143
,
"cDebugFlag"
:
143
,
"uDebugFlag"
:
143
,
"rpcDebugFlag"
:
143
,
"tmrDebugFlag"
:
143
,
"jniDebugFlag"
:
143
,
"simDebugFlag"
:
143
,
"dDebugFlag"
:
143
,
"dDebugFlag"
:
143
,
"vDebugFlag"
:
143
,
"mDebugFlag"
:
143
,
"qDebugFlag"
:
143
,
"wDebugFlag"
:
143
,
"sDebugFlag"
:
143
,
"tsdbDebugFlag"
:
143
,
"tqDebugFlag"
:
143
,
"fsDebugFlag"
:
143
,
"fnDebugFlag"
:
143
}
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
def
prepare_datas
(
self
):
tdSql
.
execute
(
'''create table stb1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
tags (t1 int)
'''
)
tdSql
.
execute
(
'''
create table t1
(ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp)
'''
)
for
i
in
range
(
4
):
tdSql
.
execute
(
f
'create table ct
{
i
+
1
}
using stb1 tags (
{
i
+
1
}
)'
)
for
i
in
range
(
9
):
tdSql
.
execute
(
f
"insert into ct1 values ( now()-
{
i
*
10
}
s,
{
1
*
i
}
,
{
11111
*
i
}
,
{
111
*
i
}
,
{
11
*
i
}
,
{
1.11
*
i
}
,
{
11.11
*
i
}
,
{
i
%
2
}
, 'binary
{
i
}
', 'nchar
{
i
}
', now()+
{
1
*
i
}
a )"
)
tdSql
.
execute
(
f
"insert into ct4 values ( now()-
{
i
*
90
}
d,
{
1
*
i
}
,
{
11111
*
i
}
,
{
111
*
i
}
,
{
11
*
i
}
,
{
1.11
*
i
}
,
{
11.11
*
i
}
,
{
i
%
2
}
, 'binary
{
i
}
', 'nchar
{
i
}
', now()+
{
1
*
i
}
a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()-45s, 0, 0, 0, 0, 0, 0, 0, 'binary0', 'nchar0', now()+8a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+10s, 9, -99999, -999, -99, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+15s, 9, -99999, -999, -99, -9.99, NULL, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct1 values (now()+20s, 9, -99999, -999, NULL, -9.99, -99.99, 1, 'binary9', 'nchar9', now()+9a )"
)
tdSql
.
execute
(
"insert into ct4 values (now()-810d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
"insert into ct4 values (now()-400d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
"insert into ct4 values (now()+90d, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL ) "
)
tdSql
.
execute
(
f
'''insert into t1 values
( '2020-04-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2020-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now()+1a )
( '2020-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now()+2a )
( '2021-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now()+3a )
( '2021-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now()+4a )
( '2021-07-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
( '2021-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now()+5a )
( '2022-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now()+6a )
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
'''
)
def
test_errors
(
self
):
error_sql_lists
=
[
"select tail from t1"
,
"select tail(123--123)==1 from t1"
,
"select tail(123,123) from t1"
,
"select tail(c1,ts) from t1"
,
"select tail(c1,c1,ts) from t1"
,
"select tail(c1) as 'd1' from t1"
,
"select tail(c1 ,c2 ) from t1"
,
"select tail(c1 ,NULL) from t1"
,
"select tail(,) from t1;"
,
"select tail(tail(c1) ab from t1)"
,
"select tail(c1) as int from t1"
,
"select tail('c1') from t1"
,
"select tail(NULL) from t1"
,
"select tail('') from t1"
,
"select tail(c%) from t1"
,
"select tail(t1) from t1"
,
"select tail(True) from t1"
,
"select tail(c1,1) , count(c1) from t1"
,
"select tail(c1,1) , avg(c1) from t1"
,
"select tail(c1,1) , min(c1) from t1"
,
"select tail(c1,1) , spread(c1) from t1"
,
"select tail(c1,1) , diff(c1) from t1"
,
"select tail(c1,1) , abs(c1) from t1"
,
"select tail(c1,1) , c1 from t1"
,
"select tail from stb1 partition by tbname"
,
"select tail(123--123)==1 from stb1 partition by tbname"
,
"select tail(123,123) from stb1 partition by tbname"
,
"select tail(c1,ts) from stb1 partition by tbname"
,
"select tail(c1,c1,ts) from stb1 partition by tbname"
,
"select tail(c1) as 'd1' from stb1 partition by tbname"
,
"select tail(c1 ,c2 ) from stb1 partition by tbname"
,
"select tail(c1 ,NULL) from stb1 partition by tbname"
,
"select tail(,) from stb1 partition by tbname;"
,
"select tail(tail(c1) ab from stb1 partition by tbname)"
,
"select tail(c1) as int from stb1 partition by tbname"
,
"select tail('c1') from stb1 partition by tbname"
,
"select tail(NULL) from stb1 partition by tbname"
,
"select tail('') from stb1 partition by tbname"
,
"select tail(c%) from stb1 partition by tbname"
,
"select tail(t1) from stb1 partition by tbname"
,
"select tail(True) from stb1 partition by tbname"
,
"select tail(c1,1) , count(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , avg(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , min(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , spread(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , diff(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , abs(c1) from stb1 partition by tbname"
,
"select tail(c1,1) , c1 from stb1 partition by tbname"
]
for
error_sql
in
error_sql_lists
:
tdSql
.
error
(
error_sql
)
def
support_types
(
self
):
other_no_value_types
=
[
"select tail(ts,1) from t1"
,
"select tail(c7,1) from t1"
,
"select tail(c8,1) from t1"
,
"select tail(c9,1) from t1"
,
"select tail(ts,1) from ct1"
,
"select tail(c7,1) from ct1"
,
"select tail(c8,1) from ct1"
,
"select tail(c9,1) from ct1"
,
"select tail(ts,1) from ct3"
,
"select tail(c7,1) from ct3"
,
"select tail(c8,1) from ct3"
,
"select tail(c9,1) from ct3"
,
"select tail(ts,1) from ct4"
,
"select tail(c7,1) from ct4"
,
"select tail(c8,1) from ct4"
,
"select tail(c9,1) from ct4"
,
"select tail(ts,1) from stb1 partition by tbname"
,
"select tail(c7,1) from stb1 partition by tbname"
,
"select tail(c8,1) from stb1 partition by tbname"
,
"select tail(c9,1) from stb1 partition by tbname"
]
for
type_sql
in
other_no_value_types
:
tdSql
.
query
(
type_sql
)
type_sql_lists
=
[
"select tail(c1,1) from t1"
,
"select tail(c2,1) from t1"
,
"select tail(c3,1) from t1"
,
"select tail(c4,1) from t1"
,
"select tail(c5,1) from t1"
,
"select tail(c6,1) from t1"
,
"select tail(c1,1) from ct1"
,
"select tail(c2,1) from ct1"
,
"select tail(c3,1) from ct1"
,
"select tail(c4,1) from ct1"
,
"select tail(c5,1) from ct1"
,
"select tail(c6,1) from ct1"
,
"select tail(c1,1) from ct3"
,
"select tail(c2,1) from ct3"
,
"select tail(c3,1) from ct3"
,
"select tail(c4,1) from ct3"
,
"select tail(c5,1) from ct3"
,
"select tail(c6,1) from ct3"
,
"select tail(c1,1) from stb1 partition by tbname"
,
"select tail(c2,1) from stb1 partition by tbname"
,
"select tail(c3,1) from stb1 partition by tbname"
,
"select tail(c4,1) from stb1 partition by tbname"
,
"select tail(c5,1) from stb1 partition by tbname"
,
"select tail(c6,1) from stb1 partition by tbname"
,
"select tail(c6,1) as alisb from stb1 partition by tbname"
,
"select tail(c6,1) alisb from stb1 partition by tbname"
,
]
for
type_sql
in
type_sql_lists
:
tdSql
.
query
(
type_sql
)
def
check_tail_table
(
self
,
tbname
,
col_name
,
tail_rows
,
offset
):
tail_sql
=
f
"select tail(
{
col_name
}
,
{
tail_rows
}
,
{
offset
}
) from
{
tbname
}
"
equal_sql
=
f
"select
{
col_name
}
from (select ts ,
{
col_name
}
from
{
tbname
}
order by ts desc limit
{
tail_rows
}
offset
{
offset
}
) order by ts"
tdSql
.
query
(
tail_sql
)
tail_result
=
tdSql
.
queryResult
tdSql
.
query
(
equal_sql
)
print
(
equal_sql
)
equal_result
=
tdSql
.
queryResult
if
tail_result
==
equal_result
:
tdLog
.
info
(
" tail query check pass , tail sql is: %s"
%
tail_sql
)
else
:
tdLog
.
exit
(
" tail query check fail , tail sql is: %s "
%
tail_sql
)
def
basic_tail_function
(
self
):
# basic query
tdSql
.
query
(
"select c1 from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select c1 from t1"
)
tdSql
.
checkRows
(
12
)
tdSql
.
query
(
"select c1 from stb1"
)
tdSql
.
checkRows
(
25
)
# used for empty table , ct3 is empty
tdSql
.
query
(
"select tail(c1,1) from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c2,1) from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c3,1) from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c4,1) from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c5,1) from ct3"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c6,1) from ct3"
)
# auto check for t1 table
# used for regular table
tdSql
.
query
(
"select tail(c1,1) from t1"
)
tdSql
.
query
(
"desc t1"
)
col_lists_rows
=
tdSql
.
queryResult
col_lists
=
[]
for
col_name
in
col_lists_rows
:
if
col_name
[
0
]
==
"ts"
:
continue
col_lists
.
append
(
col_name
[
0
])
for
col
in
col_lists
:
for
loop
in
range
(
100
):
limit
=
randint
(
1
,
100
)
offset
=
randint
(
0
,
100
)
self
.
check_tail_table
(
"t1"
,
col
,
limit
,
offset
)
# tail for invalid params
tdSql
.
error
(
"select tail(c1,-10,10) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10000) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,-100) from ct1"
)
tdSql
.
error
(
"select tail(c1,100/2,10) from ct1"
)
tdSql
.
error
(
"select tail(c1,5,10*2) from ct1"
)
tdSql
.
query
(
"select tail(c1,100,100) from ct1"
)
tdSql
.
checkRows
(
0
)
tdSql
.
query
(
"select tail(c1,10,100) from ct1"
)
tdSql
.
checkRows
(
0
)
tdSql
.
error
(
"select tail(c1,10,101) from ct1"
)
tdSql
.
query
(
"select tail(c1,10,0) from ct1"
)
tdSql
.
query
(
"select tail(c1,100,10) from ct1"
)
tdSql
.
checkRows
(
3
)
# tail with super tags
tdSql
.
query
(
"select tail(c1,10,10) from ct1"
)
tdSql
.
checkRows
(
3
)
tdSql
.
error
(
"select tail(c1,10,10),tbname from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10),t1 from ct1"
)
# tail with common col
tdSql
.
error
(
"select tail(c1,10,10) ,ts from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) ,c1 from ct1"
)
# tail with scalar function
tdSql
.
error
(
"select tail(c1,10,10) ,abs(c1) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) , tail(c2,10,10) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) , abs(c2)+2 from ct1"
)
# bug need fix for scalar value or compute again
# tdSql.error(" select tail(c1,10,10) , 123 from ct1")
# tdSql.error(" select abs(tail(c1,10,10)) from ct1")
# tdSql.error(" select abs(tail(c1,10,10)) + 2 from ct1")
# tail with aggregate function
tdSql
.
error
(
"select tail(c1,10,10) ,sum(c1) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) ,max(c1) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) ,csum(c1) from ct1"
)
tdSql
.
error
(
"select tail(c1,10,10) ,count(c1) from ct1"
)
# tail with filter where
tdSql
.
query
(
"select tail(c1,3,1) from ct4 where c1 is null"
)
tdSql
.
checkData
(
0
,
0
,
None
)
tdSql
.
checkData
(
1
,
0
,
None
)
tdSql
.
query
(
"select tail(c1,3,2) from ct4 where c1 >2 "
)
tdSql
.
checkData
(
0
,
0
,
7
)
tdSql
.
checkData
(
1
,
0
,
6
)
tdSql
.
checkData
(
2
,
0
,
5
)
tdSql
.
query
(
"select tail(c1,2,1) from ct4 where c2 between 0 and 99999"
)
tdSql
.
checkData
(
0
,
0
,
2
)
tdSql
.
checkData
(
1
,
0
,
1
)
# tail with union all
tdSql
.
query
(
"select tail(c1,2,1) from ct4 union all select c1 from ct1"
)
tdSql
.
checkRows
(
15
)
tdSql
.
query
(
"select tail(c1,2,1) from ct4 union all select c1 from ct2"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
1
)
tdSql
.
checkData
(
1
,
0
,
0
)
tdSql
.
query
(
"select tail(c2,2,1) from ct4 union all select abs(c2)/2 from ct4"
)
tdSql
.
checkRows
(
14
)
# tail with join
# prepare join datas with same ts
tdSql
.
execute
(
" use db "
)
tdSql
.
execute
(
" create stable st1 (ts timestamp , num int) tags(ind int)"
)
tdSql
.
execute
(
" create table tb1 using st1 tags(1)"
)
tdSql
.
execute
(
" create table tb2 using st1 tags(2)"
)
tdSql
.
execute
(
" create stable st2 (ts timestamp , num int) tags(ind int)"
)
tdSql
.
execute
(
" create table ttb1 using st2 tags(1)"
)
tdSql
.
execute
(
" create table ttb2 using st2 tags(2)"
)
start_ts
=
1622369635000
# 2021-05-30 18:13:55
for
i
in
range
(
10
):
ts_value
=
start_ts
+
i
*
1000
tdSql
.
execute
(
f
" insert into tb1 values(
{
ts_value
}
,
{
i
}
)"
)
tdSql
.
execute
(
f
" insert into tb2 values(
{
ts_value
}
,
{
i
}
)"
)
tdSql
.
execute
(
f
" insert into ttb1 values(
{
ts_value
}
,
{
i
}
)"
)
tdSql
.
execute
(
f
" insert into ttb2 values(
{
ts_value
}
,
{
i
}
)"
)
tdSql
.
query
(
"select tail(tb2.num,3,2) from tb1, tb2 where tb1.ts=tb2.ts "
)
tdSql
.
checkRows
(
3
)
tdSql
.
checkData
(
0
,
0
,
5
)
tdSql
.
checkData
(
1
,
0
,
6
)
tdSql
.
checkData
(
2
,
0
,
7
)
# nest query
# tdSql.query("select tail(c1,2) from (select c1 from ct1)")
tdSql
.
query
(
"select c1 from (select tail(c1,2) c1 from ct4)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
0
)
tdSql
.
checkData
(
1
,
0
,
None
)
tdSql
.
query
(
"select sum(c1) from (select tail(c1,2) c1 from ct1)"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
18
)
tdSql
.
query
(
"select abs(c1) from (select tail(c1,2) c1 from ct1)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
9
)
#partition by tbname
tdSql
.
query
(
" select tail(c1,5) from stb1 partition by tbname "
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
" select tail(c1,3) from stb1 partition by tbname "
)
tdSql
.
checkRows
(
6
)
# group by
tdSql
.
error
(
"select tail(c1,2) from ct1 group by c1"
)
tdSql
.
error
(
"select tail(c1,2) from ct1 group by tbname"
)
# super table
def
check_boundary_values
(
self
):
tdSql
.
execute
(
"drop database if exists bound_test"
)
tdSql
.
execute
(
"create database if not exists bound_test"
)
tdSql
.
execute
(
"use bound_test"
)
tdSql
.
execute
(
"create table stb_bound (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(32),c9 nchar(32), c10 timestamp) tags (t1 int);"
)
tdSql
.
execute
(
f
'create table sub1_bound using stb_bound tags ( 1 )'
)
tdSql
.
execute
(
f
"insert into sub1_bound values ( now()-1s, 2147483647, 9223372036854775807, 32767, 127, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
execute
(
f
"insert into sub1_bound values ( now(), 2147483646, 9223372036854775806, 32766, 126, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
execute
(
f
"insert into sub1_bound values ( now(), -2147483646, -9223372036854775806, -32766, -126, -3.40E+38, -1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
execute
(
f
"insert into sub1_bound values ( now(), 2147483643, 9223372036854775803, 32763, 123, 3.39E+38, 1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
execute
(
f
"insert into sub1_bound values ( now(), -2147483643, -9223372036854775803, -32763, -123, -3.39E+38, -1.69e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
error
(
f
"insert into sub1_bound values ( now()+1s, 2147483648, 9223372036854775808, 32768, 128, 3.40E+38, 1.7e+308, True, 'binary_tb1', 'nchar_tb1', now() )"
)
tdSql
.
query
(
"select tail(c2,2) from sub1_bound"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
9223372036854775803
)
def
run
(
self
):
# sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql
.
prepare
()
tdLog
.
printNoPrefix
(
"==========step1:create table =============="
)
self
.
prepare_datas
()
tdLog
.
printNoPrefix
(
"==========step2:test errors =============="
)
self
.
test_errors
()
tdLog
.
printNoPrefix
(
"==========step3:support types ============"
)
self
.
support_types
()
tdLog
.
printNoPrefix
(
"==========step4: tail basic query ============"
)
self
.
basic_tail_function
()
tdLog
.
printNoPrefix
(
"==========step5: tail boundary query ============"
)
self
.
check_boundary_values
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
de891966
...
@@ -95,6 +95,7 @@ python3 ./test.py -f 2-query/unique.py
...
@@ -95,6 +95,7 @@ python3 ./test.py -f 2-query/unique.py
python3 ./test.py
-f
2-query/stateduration.py
python3 ./test.py
-f
2-query/stateduration.py
python3 ./test.py
-f
2-query/function_stateduration.py
python3 ./test.py
-f
2-query/function_stateduration.py
python3 ./test.py
-f
2-query/statecount.py
python3 ./test.py
-f
2-query/statecount.py
python3 ./test.py
-f
2-query/tail.py
python3 ./test.py
-f
6-cluster/5dnode1mnode.py
python3 ./test.py
-f
6-cluster/5dnode1mnode.py
python3 ./test.py
-f
6-cluster/5dnode2mnode.py
python3 ./test.py
-f
6-cluster/5dnode2mnode.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录