Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
eee4c085
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
eee4c085
编写于
8月 17, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): add syncNodeAppendEntriesOnePeer
上级
4164d146
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
123 addition
and
5 deletion
+123
-5
include/libs/sync/sync.h
include/libs/sync/sync.h
+6
-5
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+2
-0
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+115
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
eee4c085
...
@@ -26,11 +26,12 @@ extern "C" {
...
@@ -26,11 +26,12 @@ extern "C" {
extern
bool
gRaftDetailLog
;
extern
bool
gRaftDetailLog
;
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_RESP_TTL_MS 10000000
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_HB_TIMER 400
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SPEED_UP_AFTER_MS (1000 * 20)
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_SLOW_DOWN_RANGE 100
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_READ_RANGE 10
#define SYNC_MAX_PROGRESS_WAIT_MS 4000
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_BEGIN 0
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
eee4c085
...
@@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
...
@@ -55,6 +55,8 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesOnePeer
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
SyncIndex
nextIndex
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
,
bool
isTimer
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
,
bool
isTimer
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
eee4c085
...
@@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
...
@@ -116,6 +116,120 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
return
ret
;
return
ret
;
}
}
int32_t
syncNodeAppendEntriesOnePeer
(
SSyncNode
*
pSyncNode
,
SRaftId
*
pDestId
,
SyncIndex
nextIndex
)
{
int32_t
ret
=
0
;
// pre index, pre term
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
if
(
preLogTerm
==
SYNC_TERM_INVALID
)
{
SyncIndex
newNextIndex
=
syncNodeGetLastIndex
(
pSyncNode
)
+
1
;
// SyncIndex newNextIndex = nextIndex + 1;
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
,
newNextIndex
);
syncIndexMgrSetIndex
(
pSyncNode
->
pMatchIndex
,
pDestId
,
SYNC_INDEX_INVALID
);
sError
(
"vgId:%d, sync get pre term error, nextIndex:%"
PRId64
", update next-index:%"
PRId64
", match-index:%d, raftid:%"
PRId64
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
return
-
1
;
}
// entry pointer array
SSyncRaftEntry
*
entryPArr
[
SYNC_MAX_BATCH_SIZE
];
memset
(
entryPArr
,
0
,
sizeof
(
entryPArr
));
// get entry batch
int32_t
getCount
=
0
;
SyncIndex
getEntryIndex
=
nextIndex
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
NULL
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
getEntryIndex
,
&
pEntry
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
entryPArr
[
i
]
=
pEntry
;
getCount
++
;
getEntryIndex
++
;
}
else
{
break
;
}
}
// event log
do
{
char
logBuf
[
128
];
char
host
[
64
];
uint16_t
port
;
syncUtilU642Addr
(
pDestId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"build batch:%d for %s:%d"
,
getCount
,
host
,
port
);
syncNodeEventLog
(
pSyncNode
,
logBuf
);
}
while
(
0
);
// build msg
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
entryPArr
,
getCount
,
pSyncNode
->
vgId
);
ASSERT
(
pMsg
!=
NULL
);
// free entries
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
pRaftCfg
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
entryPArr
[
i
];
if
(
pEntry
!=
NULL
)
{
syncEntryDestory
(
pEntry
);
entryPArr
[
i
]
=
NULL
;
}
}
// prepare msg
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
pMsg
->
dataCount
=
getCount
;
// send msg
syncNodeAppendEntriesBatch
(
pSyncNode
,
pDestId
,
pMsg
);
// speed up
if
(
pMsg
->
dataCount
>
0
&&
pSyncNode
->
commitIndex
-
pMsg
->
prevLogIndex
>
SYNC_SLOW_DOWN_RANGE
)
{
ret
=
1
;
#if 0
do {
char logBuf[128];
char host[64];
uint16_t port;
syncUtilU642Addr(pDestId->addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), "maybe speed up for %s:%d, pre-index:%ld", host, port, pMsg->prevLogIndex);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
#endif
}
syncAppendEntriesBatchDestroy
(
pMsg
);
return
ret
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
ret
=
syncNodeAppendEntriesOnePeer
(
pSyncNode
,
pDestId
,
nextIndex
);
}
return
ret
;
}
#if 0
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
return -1;
return -1;
...
@@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
...
@@ -221,6 +335,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
return ret;
return ret;
}
}
#endif
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录