Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3117ea56
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
3117ea56
编写于
11月 23, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/TD-1413
上级
3da2af00
dcd389e1
变更
4
隐藏空白更改
内联
并排
Showing
4 changed file
with
19 addition
and
21 deletion
+19
-21
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+1
-1
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+7
-7
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+10
-12
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+1
-1
未找到文件。
src/mnode/src/mnodeSdb.c
浏览文件 @
3117ea56
...
@@ -211,7 +211,7 @@ void sdbUpdateMnodeRoles() {
...
@@ -211,7 +211,7 @@ void sdbUpdateMnodeRoles() {
if
(
tsSdbMgmt
.
sync
<=
0
)
return
;
if
(
tsSdbMgmt
.
sync
<=
0
)
return
;
SNodesRole
roles
=
{
0
};
SNodesRole
roles
=
{
0
};
syncGetNodesRole
(
tsSdbMgmt
.
sync
,
&
roles
)
;
if
(
syncGetNodesRole
(
tsSdbMgmt
.
sync
,
&
roles
)
!=
0
)
return
;
sdbInfo
(
"vgId:1, update mnodes role, replica:%d"
,
tsSdbMgmt
.
cfg
.
replica
);
sdbInfo
(
"vgId:1, update mnodes role, replica:%d"
,
tsSdbMgmt
.
cfg
.
replica
);
for
(
int32_t
i
=
0
;
i
<
tsSdbMgmt
.
cfg
.
replica
;
++
i
)
{
for
(
int32_t
i
=
0
;
i
<
tsSdbMgmt
.
cfg
.
replica
;
++
i
)
{
...
...
src/sync/inc/syncInt.h
浏览文件 @
3117ea56
...
@@ -159,14 +159,14 @@ typedef struct SSyncNode {
...
@@ -159,14 +159,14 @@ typedef struct SSyncNode {
SSyncFwds
*
pSyncFwds
;
// saved forward info if quorum >1
SSyncFwds
*
pSyncFwds
;
// saved forward info if quorum >1
void
*
pFwdTimer
;
void
*
pFwdTimer
;
void
*
pRoleTimer
;
void
*
pRoleTimer
;
FGetFileInfo
getFileInfo
;
FGetFileInfo
getFileInfo
;
FGetWalInfo
getWalInfo
;
FGetWalInfo
getWalInfo
;
FWriteToCache
writeToCache
;
FWriteToCache
writeToCache
;
FConfirmForward
confirmForward
;
FConfirmForward
confirmForward
;
FNotifyRole
notifyRole
;
FNotifyRole
notifyRole
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFileSynced
notifyFileSynced
;
FNotifyFileSynced
notifyFileSynced
;
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
}
SSyncNode
;
}
SSyncNode
;
// sync module global
// sync module global
...
...
src/sync/src/syncMain.c
浏览文件 @
3117ea56
...
@@ -552,18 +552,16 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
...
@@ -552,18 +552,16 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
}
}
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
{
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
{
SSyncPeer
*
pPeer
;
for
(
int32_t
index
=
0
;
index
<
pNode
->
replica
;
++
index
)
{
if
(
index
==
pNode
->
selfIndex
)
continue
;
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
SSyncPeer
*
pPeer
=
pNode
->
peerInfo
[
index
];
if
(
i
==
pNode
->
selfIndex
)
continue
;
pPeer
=
pNode
->
peerInfo
[
i
];
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
,
SYNC_STATUS_BROADCAST
,
syncGenTranId
());
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
,
SYNC_STATUS_BROADCAST
,
syncGenTranId
());
}
}
}
}
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int32_t
i
ndex
=
0
;
index
<
pNode
->
replica
;
++
index
)
{
pNode
->
peerInfo
[
i
]
->
numOfRetrieves
=
0
;
pNode
->
peerInfo
[
i
ndex
]
->
numOfRetrieves
=
0
;
}
}
if
(
pNode
->
notifyFlowCtrl
)
{
if
(
pNode
->
notifyFlowCtrl
)
{
...
@@ -1171,7 +1169,7 @@ static void syncProcessBrokenLink(void *param) {
...
@@ -1171,7 +1169,7 @@ static void syncProcessBrokenLink(void *param) {
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
)
{
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
)
{
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
uint64_t
time
=
taosGetTimestampMs
();
int64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
->
fwds
>=
tsMaxFwdInfo
)
{
if
(
pSyncFwds
->
fwds
>=
tsMaxFwdInfo
)
{
pSyncFwds
->
first
=
(
pSyncFwds
->
first
+
1
)
%
tsMaxFwdInfo
;
pSyncFwds
->
first
=
(
pSyncFwds
->
first
+
1
)
%
tsMaxFwdInfo
;
...
@@ -1289,7 +1287,6 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
...
@@ -1289,7 +1287,6 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
int32_t
fwdLen
;
int32_t
fwdLen
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
pWalHead
->
version
>
nodeVersion
+
1
)
{
if
(
pWalHead
->
version
>
nodeVersion
+
1
)
{
sError
(
"vgId:%d, hver:%"
PRIu64
", inconsistent with sver:%"
PRIu64
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
sError
(
"vgId:%d, hver:%"
PRIu64
", inconsistent with sver:%"
PRIu64
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
)
{
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
)
{
...
@@ -1305,14 +1302,15 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
...
@@ -1305,14 +1302,15 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
// always update version
// always update version
nodeVersion
=
pWalHead
->
version
;
nodeVersion
=
pWalHead
->
version
;
sTrace
(
"vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
syncRole
[
nodeRole
],
qtypeStr
[
qtype
],
pWalHead
->
version
);
if
(
pNode
->
replica
==
1
||
nodeRole
!=
TAOS_SYNC_ROLE_MASTER
)
return
0
;
if
(
pNode
->
replica
==
1
||
nodeRole
!=
TAOS_SYNC_ROLE_MASTER
)
return
0
;
// only pkt from RPC or CQ can be forwarded
// only pkt from RPC or CQ can be forwarded
if
(
qtype
!=
TAOS_QTYPE_RPC
&&
qtype
!=
TAOS_QTYPE_CQ
)
return
0
;
if
(
qtype
!=
TAOS_QTYPE_RPC
&&
qtype
!=
TAOS_QTYPE_CQ
)
return
0
;
sTrace
(
"vgId:%d, forward to peer, replica:%d role:%s qtype:%s hver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
syncRole
[
nodeRole
],
qtypeStr
[
qtype
],
pWalHead
->
version
);
// a hacker way to improve the performance
// a hacker way to improve the performance
pSyncHead
=
(
SSyncHead
*
)(((
char
*
)
pWalHead
)
-
sizeof
(
SSyncHead
));
pSyncHead
=
(
SSyncHead
*
)(((
char
*
)
pWalHead
)
-
sizeof
(
SSyncHead
));
pSyncHead
->
type
=
TAOS_SMSG_FORWARD
;
pSyncHead
->
type
=
TAOS_SMSG_FORWARD
;
...
@@ -1332,7 +1330,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
...
@@ -1332,7 +1330,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
code
=
1
;
code
=
1
;
}
}
int32_t
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
int32_t
retLen
=
taosWriteMsg
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
if
(
retLen
==
fwdLen
)
{
if
(
retLen
==
fwdLen
)
{
sTrace
(
"%s, forward is sent, hver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
sTrace
(
"%s, forward is sent, hver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
}
else
{
}
else
{
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
3117ea56
...
@@ -438,7 +438,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
...
@@ -438,7 +438,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
if
(
write
(
pPeer
->
syncFd
,
(
char
*
)
&
firstPkt
,
sizeof
(
firstPkt
))
<
0
)
{
if
(
taosWriteMsg
(
pPeer
->
syncFd
,
(
char
*
)
&
firstPkt
,
sizeof
(
firstPkt
))
<
0
)
{
sError
(
"%s, failed to send syncCmd"
,
pPeer
->
id
);
sError
(
"%s, failed to send syncCmd"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
}
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录