Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4bd65060
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4bd65060
编写于
6月 29, 2022
作者:
M
Minghao Li
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(sync): delete old functions
上级
aded474d
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
17 addition
and
171 deletion
+17
-171
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+16
-171
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+1
-0
未找到文件。
source/libs/sync/src/syncRaftLog.c
浏览文件 @
4bd65060
...
...
@@ -18,8 +18,7 @@
#include "syncRaftStore.h"
// refactor, log[0 .. n] ==> log[m .. n]
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
);
// static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
);
static
SyncIndex
raftLogBeginIndex
(
struct
SSyncLogStore
*
pLogStore
);
static
SyncIndex
raftLogEndIndex
(
struct
SSyncLogStore
*
pLogStore
);
static
SyncIndex
raftLogWriteIndex
(
struct
SSyncLogStore
*
pLogStore
);
...
...
@@ -44,28 +43,27 @@ static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex from
static
int32_t
logStoreUpdateCommitIndex
(
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
static
SyncIndex
logStoreGetCommitIndex
(
SSyncLogStore
*
pLogStore
);
// refactor, log[0 .. n] ==> log[m .. n]
/*
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
// if beginIndex == 0, donot need call this funciton
ASSERT(beginIndex > 0);
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
)
{
ASSERT
(
snapshotIndex
>=
0
);
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
sTrace("vgId:%d, reset wal begin index:%ld", pData->pSyncNode->vgId, beginIndex);
int32_t
code
=
walRestoreFromSnapshot
(
pWal
,
snapshotIndex
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
pData->beginIndex = beginIndex
;
walRestoreFromSnapshot(pWal, beginIndex - 1);
return 0;
}
*/
char
logBuf
[
128
]
;
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"wal restore from snapshot error, index:%ld, err:%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
snapshotIndex
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
syncNodeErrorLog
(
pData
->
pSyncNode
,
logBuf
);
static
int32_t
raftLogRestoreFromSnapshot
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
snapshotIndex
)
{
ASSERT
(
snapshotIndex
>=
0
);
ASSERT
(
0
);
}
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
walRestoreFromSnapshot
(
pWal
,
snapshotIndex
);
return
0
;
}
...
...
@@ -91,18 +89,6 @@ static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
return
count
>
0
?
count
:
0
;
}
#if 0
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
if (index >= beginIndex && index <= endIndex) {
return true;
} else {
return false;
}
}
#endif
static
SyncIndex
raftLogLastIndex
(
struct
SSyncLogStore
*
pLogStore
)
{
SyncIndex
lastIndex
;
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
...
...
@@ -138,24 +124,6 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
return
0
;
}
/*
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
if (raftLogEntryCount(pLogStore) == 0) {
lastTerm = 0;
} else {
SSyncRaftEntry* pLastEntry;
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
ASSERT(code == 0);
if (pLastEntry != NULL) {
lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
}
}
return lastTerm;
}
*/
static
int32_t
raftLogAppendEntry
(
struct
SSyncLogStore
*
pLogStore
,
SSyncRaftEntry
*
pEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
...
...
@@ -197,53 +165,6 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
return
code
;
}
#if 0
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
int32_t code;
*ppEntry = NULL;
if (raftLogInRange(pLogStore, index)) {
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
ASSERT(pWalHandle != NULL);
code = walReadWithHandle(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
int32_t linuxErr = errno;
const char* linuxErrMsg = strerror(errno);
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
linuxErrMsg);
ASSERT(0);
walCloseReadHandle(pWalHandle);
return code;
}
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
ASSERT(*ppEntry != NULL);
(*ppEntry)->msgType = TDMT_SYNC_CLIENT_REQUEST;
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
(*ppEntry)->index = index;
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
} else {
// index not in range
code = 0;
}
return code;
}
#endif
static
int32_t
raftLogGetEntry
(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
,
SSyncRaftEntry
**
ppEntry
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
...
...
@@ -343,18 +264,6 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
return
-
1
;
}
/*
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
*ppLastEntry = NULL;
if (raftLogEntryCount(pLogStore) == 0) {
return 0;
}
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
}
*/
//-------------------------------
SSyncLogStore
*
logStoreCreate
(
SSyncNode
*
pSyncNode
)
{
SSyncLogStore
*
pLogStore
=
taosMemoryMalloc
(
sizeof
(
SSyncLogStore
));
...
...
@@ -372,18 +281,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pData
->
pWalHandle
=
walOpenReadHandle
(
pData
->
pWal
);
ASSERT
(
pData
->
pWalHandle
!=
NULL
);
/*
SyncIndex firstVer = walGetFirstVer(pData->pWal);
SyncIndex lastVer = walGetLastVer(pData->pWal);
if (firstVer >= 0) {
pData->beginIndex = firstVer;
} else if (firstVer == -1) {
pData->beginIndex = lastVer + 1;
} else {
ASSERT(0);
}
*/
pLogStore
->
appendEntry
=
logStoreAppendEntry
;
pLogStore
->
getEntry
=
logStoreGetEntry
;
pLogStore
->
truncate
=
logStoreTruncate
;
...
...
@@ -392,7 +289,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore
->
updateCommitIndex
=
logStoreUpdateCommitIndex
;
pLogStore
->
getCommitIndex
=
logStoreGetCommitIndex
;
// pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
pLogStore
->
syncLogRestoreFromSnapshot
=
raftLogRestoreFromSnapshot
;
pLogStore
->
syncLogBeginIndex
=
raftLogBeginIndex
;
pLogStore
->
syncLogEndIndex
=
raftLogEndIndex
;
...
...
@@ -405,8 +301,6 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pLogStore
->
syncLogTruncate
=
raftLogTruncate
;
pLogStore
->
syncLogWriteIndex
=
raftLogWriteIndex
;
// pLogStore->syncLogInRange = raftLogInRange;
return
pLogStore
;
}
...
...
@@ -593,55 +487,6 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
return
pEntry
;
}
/*
cJSON* logStore2Json(SSyncLogStore* pLogStore) {
char u64buf[128] = {0};
SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data;
cJSON* pRoot = cJSON_CreateObject();
if (pData != NULL && pData->pWal != NULL) {
snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode);
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
SyncIndex endIndex = raftLogEndIndex(pLogStore);
snprintf(u64buf, sizeof(u64buf), "%ld", endIndex);
cJSON_AddStringToObject(pRoot, "endIndex", u64buf);
int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
for (SyncIndex i = pData->beginIndex; i <= endIndex; ++i) {
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
syncEntryDestory(pEntry);
}
}
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot);
return pJson;
}
*/
cJSON
*
logStore2Json
(
SSyncLogStore
*
pLogStore
)
{
char
u64buf
[
128
]
=
{
0
};
SSyncLogStoreData
*
pData
=
(
SSyncLogStoreData
*
)
pLogStore
->
data
;
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
4bd65060
...
...
@@ -772,6 +772,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
snapshotReSend
(
pSender
);
}
else
{
// error log
do
{
char
logBuf
[
96
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"snapshot sender recv error ack:%d, my seq:%d"
,
pMsg
->
ack
,
pSender
->
seq
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录