Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4bc38236
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4bc38236
编写于
12月 11, 2020
作者:
P
Ping Xiao
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into xiaoping/add_test_case
上级
f0506a42
77cf9cec
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
68 addition
and
44 deletion
+68
-44
src/mnode/inc/mnodeMnode.h
src/mnode/inc/mnodeMnode.h
+2
-2
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+4
-4
src/mnode/src/mnodePeer.c
src/mnode/src/mnodePeer.c
+1
-1
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+4
-4
src/mnode/src/mnodeRead.c
src/mnode/src/mnodeRead.c
+1
-1
src/mnode/src/mnodeShow.c
src/mnode/src/mnodeShow.c
+2
-2
src/mnode/src/mnodeWrite.c
src/mnode/src/mnodeWrite.c
+1
-1
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+5
-1
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+8
-7
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+8
-7
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+22
-14
tests/pytest/stream/stream2.py
tests/pytest/stream/stream2.py
+10
-0
未找到文件。
src/mnode/inc/mnodeMnode.h
浏览文件 @
4bc38236
...
@@ -43,8 +43,8 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode);
...
@@ -43,8 +43,8 @@ void mnodeIncMnodeRef(struct SMnodeObj *pMnode);
void
mnodeDecMnodeRef
(
struct
SMnodeObj
*
pMnode
);
void
mnodeDecMnodeRef
(
struct
SMnodeObj
*
pMnode
);
char
*
mnodeGetMnodeRoleStr
();
char
*
mnodeGetMnodeRoleStr
();
void
mnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
);
void
mnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
,
bool
redirect
);
void
mnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
);
void
mnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
,
bool
redirect
);
char
*
mnodeGetMnodeMasterEp
();
char
*
mnodeGetMnodeMasterEp
();
void
mnodeGetMnodeInfos
(
void
*
mnodes
);
void
mnodeGetMnodeInfos
(
void
*
mnodes
);
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
4bc38236
...
@@ -273,14 +273,14 @@ void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) {
...
@@ -273,14 +273,14 @@ void mnodeUpdateMnodeEpSet(SMInfos *pMinfos) {
mnodeMnodeUnLock
();
mnodeMnodeUnLock
();
}
}
void
mnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
void
mnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
,
bool
redirect
)
{
mnodeMnodeRdLock
();
mnodeMnodeRdLock
();
*
epSet
=
tsMEpForPeer
;
*
epSet
=
tsMEpForPeer
;
mnodeMnodeUnLock
();
mnodeMnodeUnLock
();
mTrace
(
"vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d"
,
tsMEpForPeer
.
numOfEps
,
tsMEpForPeer
.
inUse
);
mTrace
(
"vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d"
,
tsMEpForPeer
.
numOfEps
,
tsMEpForPeer
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
)
{
if
(
redirect
&&
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
mTrace
(
"vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
mTrace
(
"vgId:1, mnode:%d, for peer ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
}
else
{
}
else
{
...
@@ -289,14 +289,14 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
...
@@ -289,14 +289,14 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) {
}
}
}
}
void
mnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
)
{
void
mnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
,
bool
redirect
)
{
mnodeMnodeRdLock
();
mnodeMnodeRdLock
();
*
epSet
=
tsMEpForShell
;
*
epSet
=
tsMEpForShell
;
mnodeMnodeUnLock
();
mnodeMnodeUnLock
();
mTrace
(
"vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d"
,
tsMEpForShell
.
numOfEps
,
tsMEpForShell
.
inUse
);
mTrace
(
"vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d"
,
tsMEpForShell
.
numOfEps
,
tsMEpForShell
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
)
{
if
(
redirect
&&
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
mTrace
(
"vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
mTrace
(
"vgId:1, mnode:%d, for shell ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
}
else
{
}
else
{
...
...
src/mnode/src/mnodePeer.c
浏览文件 @
4bc38236
...
@@ -54,7 +54,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
...
@@ -54,7 +54,7 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) {
if
(
!
sdbIsMaster
())
{
if
(
!
sdbIsMaster
())
{
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
mnodeGetMnodeEpSetForPeer
(
epSet
);
mnodeGetMnodeEpSetForPeer
(
epSet
,
true
);
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
4bc38236
...
@@ -282,10 +282,11 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
...
@@ -282,10 +282,11 @@ static int32_t mnodeRetrieveConns(SShowObj *pShow, char *data, int32_t rows, voi
// not thread safe, need optimized
// not thread safe, need optimized
int32_t
mnodeSaveQueryStreamList
(
SConnObj
*
pConn
,
SHeartBeatMsg
*
pHBMsg
)
{
int32_t
mnodeSaveQueryStreamList
(
SConnObj
*
pConn
,
SHeartBeatMsg
*
pHBMsg
)
{
pConn
->
numOfQueries
=
0
;
pConn
->
numOfQueries
=
0
;
pConn
->
numOfStreams
=
0
;
pConn
->
numOfStreams
=
0
;
int32_t
numOfQueries
=
htonl
(
pHBMsg
->
numOfQueries
);
int32_t
numOfQueries
=
htonl
(
pHBMsg
->
numOfQueries
);
int32_t
numOfStreams
=
htonl
(
pHBMsg
->
numOfStreams
);
if
(
numOfQueries
>
0
)
{
if
(
numOfQueries
>
0
)
{
if
(
pConn
->
pQueries
==
NULL
)
{
if
(
pConn
->
pQueries
==
NULL
)
{
pConn
->
pQueries
=
calloc
(
sizeof
(
SQueryDesc
),
QUERY_STREAM_SAVE_SIZE
);
pConn
->
pQueries
=
calloc
(
sizeof
(
SQueryDesc
),
QUERY_STREAM_SAVE_SIZE
);
...
@@ -299,7 +300,6 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
...
@@ -299,7 +300,6 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
}
}
}
}
int32_t
numOfStreams
=
htonl
(
pHBMsg
->
numOfStreams
);
if
(
numOfStreams
>
0
)
{
if
(
numOfStreams
>
0
)
{
if
(
pConn
->
pStreams
==
NULL
)
{
if
(
pConn
->
pStreams
==
NULL
)
{
pConn
->
pStreams
=
calloc
(
sizeof
(
SStreamDesc
),
QUERY_STREAM_SAVE_SIZE
);
pConn
->
pStreams
=
calloc
(
sizeof
(
SStreamDesc
),
QUERY_STREAM_SAVE_SIZE
);
...
@@ -309,7 +309,7 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
...
@@ -309,7 +309,7 @@ int32_t mnodeSaveQueryStreamList(SConnObj *pConn, SHeartBeatMsg *pHBMsg) {
int32_t
saveSize
=
pConn
->
numOfStreams
*
sizeof
(
SStreamDesc
);
int32_t
saveSize
=
pConn
->
numOfStreams
*
sizeof
(
SStreamDesc
);
if
(
saveSize
>
0
&&
pConn
->
pStreams
!=
NULL
)
{
if
(
saveSize
>
0
&&
pConn
->
pStreams
!=
NULL
)
{
memcpy
(
pConn
->
pStreams
,
pHBMsg
->
pData
+
pConn
->
numOfQueries
*
sizeof
(
SQueryDesc
),
saveSize
);
memcpy
(
pConn
->
pStreams
,
pHBMsg
->
pData
+
numOfQueries
*
sizeof
(
SQueryDesc
),
saveSize
);
}
}
}
}
...
...
src/mnode/src/mnodeRead.c
浏览文件 @
4bc38236
...
@@ -50,7 +50,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
...
@@ -50,7 +50,7 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) {
if
(
!
sdbIsMaster
())
{
if
(
!
sdbIsMaster
())
{
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
mnodeGetMnodeEpSetForShell
(
epSet
);
mnodeGetMnodeEpSetForShell
(
epSet
,
true
);
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
...
...
src/mnode/src/mnodeShow.c
浏览文件 @
4bc38236
...
@@ -282,7 +282,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
...
@@ -282,7 +282,7 @@ static int32_t mnodeProcessHeartBeatMsg(SMnodeMsg *pMsg) {
pRsp
->
onlineDnodes
=
htonl
(
mnodeGetOnlineDnodesNum
());
pRsp
->
onlineDnodes
=
htonl
(
mnodeGetOnlineDnodesNum
());
pRsp
->
totalDnodes
=
htonl
(
mnodeGetDnodesNum
());
pRsp
->
totalDnodes
=
htonl
(
mnodeGetDnodesNum
());
mnodeGetMnodeEpSetForShell
(
&
pRsp
->
epSet
);
mnodeGetMnodeEpSetForShell
(
&
pRsp
->
epSet
,
false
);
pMsg
->
rpcRsp
.
rsp
=
pRsp
;
pMsg
->
rpcRsp
.
rsp
=
pRsp
;
pMsg
->
rpcRsp
.
len
=
sizeof
(
SHeartBeatRsp
);
pMsg
->
rpcRsp
.
len
=
sizeof
(
SHeartBeatRsp
);
...
@@ -349,7 +349,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
...
@@ -349,7 +349,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
pConnectRsp
->
writeAuth
=
pUser
->
writeAuth
;
pConnectRsp
->
writeAuth
=
pUser
->
writeAuth
;
pConnectRsp
->
superAuth
=
pUser
->
superAuth
;
pConnectRsp
->
superAuth
=
pUser
->
superAuth
;
mnodeGetMnodeEpSetForShell
(
&
pConnectRsp
->
epSet
);
mnodeGetMnodeEpSetForShell
(
&
pConnectRsp
->
epSet
,
false
);
connect_over:
connect_over:
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/mnode/src/mnodeWrite.c
浏览文件 @
4bc38236
...
@@ -50,7 +50,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
...
@@ -50,7 +50,7 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) {
if
(
!
sdbIsMaster
())
{
if
(
!
sdbIsMaster
())
{
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SMnodeRsp
*
rpcRsp
=
&
pMsg
->
rpcRsp
;
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
SRpcEpSet
*
epSet
=
rpcMallocCont
(
sizeof
(
SRpcEpSet
));
mnodeGetMnodeEpSetForShell
(
epSet
);
mnodeGetMnodeEpSetForShell
(
epSet
,
true
);
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
rsp
=
epSet
;
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
rpcRsp
->
len
=
sizeof
(
SRpcEpSet
);
...
...
src/sync/inc/syncInt.h
浏览文件 @
4bc38236
...
@@ -62,12 +62,15 @@ typedef struct {
...
@@ -62,12 +62,15 @@ typedef struct {
typedef
struct
{
typedef
struct
{
SSyncHead
syncHead
;
SSyncHead
syncHead
;
uint16_t
port
;
uint16_t
port
;
uint16_t
tranId
;
char
fqdn
[
TSDB_FQDN_LEN
];
char
fqdn
[
TSDB_FQDN_LEN
];
int32_t
sourceId
;
// only for arbitrator
int32_t
sourceId
;
// only for arbitrator
}
SFirstPkt
;
}
SFirstPkt
;
typedef
struct
{
typedef
struct
{
int8_t
sync
;
int8_t
sync
;
int8_t
reserved
;
uint16_t
tranId
;
}
SFirstPktRsp
;
}
SFirstPktRsp
;
typedef
struct
{
typedef
struct
{
...
@@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer);
...
@@ -187,6 +190,7 @@ void syncRestartConnection(SSyncPeer *pPeer);
void
syncBroadcastStatus
(
SSyncNode
*
pNode
);
void
syncBroadcastStatus
(
SSyncNode
*
pNode
);
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
);
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
);
int32_t
syncDecPeerRef
(
SSyncPeer
*
pPeer
);
int32_t
syncDecPeerRef
(
SSyncPeer
*
pPeer
);
uint16_t
syncGenTranId
();
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/sync/src/syncMain.c
浏览文件 @
4bc38236
...
@@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
...
@@ -396,9 +396,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
pFwdRsp
->
code
=
code
;
pFwdRsp
->
code
=
code
;
int32_t
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int32_t
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int32_t
retLen
=
taosWriteMsg
(
pPeer
->
peerFd
,
msg
,
msgLen
);
if
(
taosWriteMsg
(
pPeer
->
peerFd
,
msg
,
msgLen
)
==
msgLen
)
{
if
(
retLen
==
msgLen
)
{
sTrace
(
"%s, forward-rsp is sent, code:%x hver:%"
PRIu64
,
pPeer
->
id
,
code
,
version
);
sTrace
(
"%s, forward-rsp is sent, code:%x hver:%"
PRIu64
,
pPeer
->
id
,
code
,
version
);
}
else
{
}
else
{
sDebug
(
"%s, failed to send forward ack, restart"
,
pPeer
->
id
);
sDebug
(
"%s, failed to send forward ack, restart"
,
pPeer
->
id
);
...
@@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
...
@@ -873,6 +871,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_SYNC_REQ
;
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_SYNC_REQ
;
firstPkt
.
syncHead
.
vgId
=
pNode
->
vgId
;
firstPkt
.
syncHead
.
vgId
=
pNode
->
vgId
;
firstPkt
.
syncHead
.
len
=
sizeof
(
firstPkt
)
-
sizeof
(
SSyncHead
);
firstPkt
.
syncHead
.
len
=
sizeof
(
firstPkt
)
-
sizeof
(
SSyncHead
);
firstPkt
.
tranId
=
syncGenTranId
();
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
taosTmrReset
(
syncNotStarted
,
tsSyncTimer
*
1000
,
pPeer
,
tsSyncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncNotStarted
,
tsSyncTimer
*
1000
,
pPeer
,
tsSyncTmrCtrl
,
&
pPeer
->
timer
);
...
@@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
...
@@ -881,7 +880,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sError
(
"%s, failed to send sync-req to peer"
,
pPeer
->
id
);
sError
(
"%s, failed to send sync-req to peer"
,
pPeer
->
id
);
}
else
{
}
else
{
nodeSStatus
=
TAOS_SYNC_STATUS_START
;
nodeSStatus
=
TAOS_SYNC_STATUS_START
;
sInfo
(
"%s, sync-req is sent to peer,
set sstatus:%s"
,
pPeer
->
i
d
,
syncStatus
[
nodeSStatus
]);
sInfo
(
"%s, sync-req is sent to peer,
tranId:%u, set sstatus:%s"
,
pPeer
->
id
,
firstPkt
.
tranI
d
,
syncStatus
[
nodeSStatus
]);
}
}
}
}
...
@@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
...
@@ -1018,8 +1017,7 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
pPeersStatus
->
peersStatus
[
i
].
version
=
pNode
->
peerInfo
[
i
]
->
version
;
pPeersStatus
->
peersStatus
[
i
].
version
=
pNode
->
peerInfo
[
i
]
->
version
;
}
}
int32_t
retLen
=
taosWriteMsg
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
);
if
(
taosWriteMsg
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
)
==
statusMsgLen
)
{
if
(
retLen
==
statusMsgLen
)
{
sDebug
(
"%s, status is sent, self:%s:%s:%"
PRIu64
", peer:%s:%s:%"
PRIu64
", ack:%d tranId:%u type:%s pfd:%d"
,
sDebug
(
"%s, status is sent, self:%s:%s:%"
PRIu64
", peer:%s:%s:%"
PRIu64
", ack:%d tranId:%u type:%s pfd:%d"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
syncStatus
[
nodeSStatus
],
nodeVersion
,
syncRole
[
pPeer
->
role
],
pPeer
->
id
,
syncRole
[
nodeRole
],
syncStatus
[
nodeSStatus
],
nodeVersion
,
syncRole
[
pPeer
->
role
],
syncStatus
[
pPeer
->
sstatus
],
pPeer
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
syncStatus
[
pPeer
->
sstatus
],
pPeer
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
...
@@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...
@@ -1053,10 +1051,11 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_STATUS
;
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_STATUS
;
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
tranId
=
syncGenTranId
();
firstPkt
.
sourceId
=
pNode
->
vgId
;
// tell arbitrator its vgId
firstPkt
.
sourceId
=
pNode
->
vgId
;
// tell arbitrator its vgId
if
(
taosWriteMsg
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
==
sizeof
(
firstPkt
))
{
if
(
taosWriteMsg
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
==
sizeof
(
firstPkt
))
{
sDebug
(
"%s, connection to peer server is setup, pfd:%d sfd:%d
"
,
pPeer
->
id
,
connFd
,
pPeer
->
syncF
d
);
sDebug
(
"%s, connection to peer server is setup, pfd:%d sfd:%d
tranId:%u"
,
pPeer
->
id
,
connFd
,
pPeer
->
syncFd
,
firstPkt
.
tranI
d
);
pPeer
->
peerFd
=
connFd
;
pPeer
->
peerFd
=
connFd
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
...
@@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
...
@@ -1123,6 +1122,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
return
;
return
;
}
}
sDebug
(
"vgId:%d, firstPkt is received, tranId:%u"
,
vgId
,
firstPkt
.
tranId
);
SSyncNode
*
pNode
=
*
ppNode
;
SSyncNode
*
pNode
=
*
ppNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
...
...
src/sync/src/syncRestore.c
浏览文件 @
4bc38236
...
@@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -64,8 +64,8 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
sinfo
.
index
=
0
;
sinfo
.
index
=
0
;
while
(
1
)
{
while
(
1
)
{
// read file info
// read file info
int32_t
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
minfo
),
sizeof
(
mi
nfo
));
int32_t
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
minfo
),
sizeof
(
SFileI
nfo
));
if
(
ret
<
0
)
{
if
(
ret
!=
sizeof
(
SFileInfo
)
)
{
sError
(
"%s, failed to read file info while restore file since %s"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to read file info while restore file since %s"
,
pPeer
->
id
,
strerror
(
errno
));
break
;
break
;
}
}
...
@@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -96,7 +96,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// send file ack
// send file ack
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
fileAck
));
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
fileAck
));
if
(
ret
<
0
)
{
if
(
ret
!=
sizeof
(
fileAck
)
)
{
sError
(
"%s, failed to write file:%s ack while restore file since %s"
,
pPeer
->
id
,
minfo
.
name
,
strerror
(
errno
));
sError
(
"%s, failed to write file:%s ack while restore file since %s"
,
pPeer
->
id
,
minfo
.
name
,
strerror
(
errno
));
break
;
break
;
}
}
...
@@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
...
@@ -154,7 +154,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
while
(
1
)
{
while
(
1
)
{
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
,
sizeof
(
SWalHead
));
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
<
0
)
{
if
(
ret
!=
sizeof
(
SWalHead
)
)
{
sError
(
"%s, failed to read walhead while restore wal since %s"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to read walhead while restore wal since %s"
,
pPeer
->
id
,
strerror
(
errno
));
break
;
break
;
}
}
...
@@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
...
@@ -166,7 +166,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
}
// wal sync over
}
// wal sync over
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
->
cont
,
pHead
->
len
);
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
{
if
(
ret
!=
pHead
->
len
)
{
sError
(
"%s, failed to read walcont, len:%d while restore wal since %s"
,
pPeer
->
id
,
pHead
->
len
,
strerror
(
errno
));
sError
(
"%s, failed to read walcont, len:%d while restore wal since %s"
,
pPeer
->
id
,
pHead
->
len
,
strerror
(
errno
));
break
;
break
;
}
}
...
@@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
...
@@ -286,11 +286,12 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
uint64_t
fversion
=
0
;
uint64_t
fversion
=
0
;
sInfo
(
"%s, start to restore, sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
sInfo
(
"%s, start to restore, sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
SFirstPktRsp
firstPktRsp
=
{.
sync
=
1
};
SFirstPktRsp
firstPktRsp
=
{.
sync
=
1
,
.
tranId
=
syncGenTranId
()
};
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
&
firstPktRsp
,
sizeof
(
SFirstPktRsp
))
<
0
)
{
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
&
firstPktRsp
,
sizeof
(
SFirstPktRsp
))
!=
sizeof
(
SFirstPktRsp
)
)
{
sError
(
"%s, failed to send sync firstPkt rsp since %s"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to send sync firstPkt rsp since %s"
,
pPeer
->
id
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
sDebug
(
"%s, send firstPktRsp to peer, tranId:%u"
,
pPeer
->
id
,
firstPktRsp
.
tranId
);
sInfo
(
"%s, start to restore file, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]);
sInfo
(
"%s, start to restore file, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]);
int32_t
code
=
syncRestoreFile
(
pPeer
,
&
fversion
);
int32_t
code
=
syncRestoreFile
(
pPeer
,
&
fversion
);
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
4bc38236
...
@@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
...
@@ -58,7 +58,7 @@ static int32_t syncGetFileVersion(SSyncNode *pNode, SSyncPeer *pPeer) {
uint64_t
fver
,
wver
;
uint64_t
fver
,
wver
;
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
int32_t
code
=
(
*
pNode
->
getVersion
)(
pNode
->
vgId
,
&
fver
,
&
wver
);
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
sDebug
(
"%s, vnode is commiting while retrieve, last fver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastFileVer
);
sDebug
(
"%s, vnode is commiting while
get fver for
retrieve, last fver:%"
PRIu64
,
pPeer
->
id
,
pPeer
->
lastFileVer
);
return
-
1
;
return
-
1
;
}
}
...
@@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -92,7 +92,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
if
(
syncGetFileVersion
(
pNode
,
pPeer
)
<
0
)
return
-
1
;
if
(
syncGetFileVersion
(
pNode
,
pPeer
)
<
0
)
{
pPeer
->
fileChanged
=
1
;
return
-
1
;
}
while
(
1
)
{
while
(
1
)
{
// retrieve file info
// retrieve file info
...
@@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -100,12 +103,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo
.
size
=
0
;
fileInfo
.
size
=
0
;
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
// fileInfo.size = htonl(size);
sDebug
(
"%s, file:%s info is sent, size:%"
PRId64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
size
);
sDebug
(
"%s, file:%s info is sent, size:%"
PRId64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
size
);
// send the file info
// send the file info
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
if
(
ret
<
0
)
{
if
(
ret
!=
sizeof
(
fileInfo
)
)
{
code
=
-
1
;
code
=
-
1
;
sError
(
"%s, failed to write file:%s info while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
sError
(
"%s, failed to write file:%s info while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
break
;
...
@@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -119,8 +121,8 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
}
}
// wait for the ack from peer
// wait for the ack from peer
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
f
ileAck
));
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
SF
ileAck
));
if
(
ret
<
0
)
{
if
(
ret
!=
sizeof
(
SFileAck
)
)
{
code
=
-
1
;
code
=
-
1
;
sError
(
"%s, failed to read file:%s ack while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
sError
(
"%s, failed to read file:%s ack while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
break
;
...
@@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
...
@@ -384,12 +386,15 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
}
}
if
(
code
==
0
)
{
if
(
code
==
0
)
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
sInfo
(
"%s, wal retrieve is finished, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
SWalHead
walHead
;
SWalHead
walHead
;
memset
(
&
walHead
,
0
,
sizeof
(
walHead
));
memset
(
&
walHead
,
0
,
sizeof
(
walHead
));
taosWriteMsg
(
pPeer
->
syncFd
,
&
walHead
,
sizeof
(
walHead
));
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
&
walHead
,
sizeof
(
walHead
))
==
sizeof
(
walHead
))
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
sInfo
(
"%s, wal retrieve is finished, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
}
else
{
sError
(
"%s, failed to send last wal record since %s"
,
pPeer
->
id
,
strerror
(
errno
));
code
=
-
1
;
}
}
else
{
}
else
{
sError
(
"%s, failed to send wal since %s, code:0x%x"
,
pPeer
->
id
,
strerror
(
errno
),
code
);
sError
(
"%s, failed to send wal since %s, code:0x%x"
,
pPeer
->
id
,
strerror
(
errno
),
code
);
}
}
...
@@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
...
@@ -404,20 +409,23 @@ static int32_t syncRetrieveFirstPkt(SSyncPeer *pPeer) {
memset
(
&
firstPkt
,
0
,
sizeof
(
firstPkt
));
memset
(
&
firstPkt
,
0
,
sizeof
(
firstPkt
));
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_SYNC_DATA
;
firstPkt
.
syncHead
.
type
=
TAOS_SMSG_SYNC_DATA
;
firstPkt
.
syncHead
.
vgId
=
pNode
->
vgId
;
firstPkt
.
syncHead
.
vgId
=
pNode
->
vgId
;
firstPkt
.
tranId
=
syncGenTranId
();
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
<
0
)
{
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
!=
sizeof
(
firstPkt
)
)
{
sError
(
"%s, failed to send sync firstPkt since %s
"
,
pPeer
->
id
,
strerror
(
errno
)
);
sError
(
"%s, failed to send sync firstPkt since %s
, tranId:%u"
,
pPeer
->
id
,
strerror
(
errno
),
firstPkt
.
tranId
);
return
-
1
;
return
-
1
;
}
}
sDebug
(
"%s, send firstPkt to peer, tranId:%u"
,
pPeer
->
id
,
firstPkt
.
tranId
);
SFirstPktRsp
firstPktRsp
;
SFirstPktRsp
firstPktRsp
;
if
(
taosReadMsg
(
pPeer
->
syncFd
,
&
firstPktRsp
,
sizeof
(
SFirstPktRsp
))
<
0
)
{
if
(
taosReadMsg
(
pPeer
->
syncFd
,
&
firstPktRsp
,
sizeof
(
SFirstPktRsp
))
!=
sizeof
(
SFirstPktRsp
)
)
{
sError
(
"%s, failed to read sync firstPkt rsp since %s
"
,
pPeer
->
id
,
strerror
(
errno
)
);
sError
(
"%s, failed to read sync firstPkt rsp since %s
, tranId:%u"
,
pPeer
->
id
,
strerror
(
errno
),
firstPkt
.
tranId
);
return
-
1
;
return
-
1
;
}
}
sDebug
(
"%s, recv firstPktRsp from peer, tranId:%u"
,
pPeer
->
id
,
firstPkt
.
tranId
);
return
0
;
return
0
;
}
}
...
...
tests/pytest/stream/stream2.py
浏览文件 @
4bc38236
...
@@ -87,6 +87,10 @@ class TDTestCase:
...
@@ -87,6 +87,10 @@ class TDTestCase:
tdSql
.
checkData
(
0
,
3
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
except
Exception
as
e
:
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
repr
(
e
))
tdSql
.
query
(
"show streams"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
2
,
's0'
)
tdLog
.
info
(
"===== step8 ====="
)
tdLog
.
info
(
"===== step8 ====="
)
tdSql
.
query
(
tdSql
.
query
(
...
@@ -142,6 +146,12 @@ class TDTestCase:
...
@@ -142,6 +146,12 @@ class TDTestCase:
except
Exception
as
e
:
except
Exception
as
e
:
tdLog
.
info
(
repr
(
e
))
tdLog
.
info
(
repr
(
e
))
tdSql
.
query
(
"show streams"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
2
,
's1'
)
tdSql
.
checkData
(
1
,
2
,
's0'
)
def
stop
(
self
):
def
stop
(
self
):
tdSql
.
close
()
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录