Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
dbf80b56
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
dbf80b56
编写于
5月 26, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 26, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12987 from taosdata/fix/mnode
fix(sync): persist standby option
上级
0f27d391
cfa89bd7
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
241 addition
and
141 deletion
+241
-141
include/common/tmsg.h
include/common/tmsg.h
+0
-1
include/libs/sync/sync.h
include/libs/sync/sync.h
+5
-1
source/common/src/tmsg.c
source/common/src/tmsg.c
+0
-2
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-1
source/dnode/mgmt/test/CMakeLists.txt
source/dnode/mgmt/test/CMakeLists.txt
+1
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+100
-94
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+9
-1
source/libs/sync/inc/syncRaftCfg.h
source/libs/sync/inc/syncRaftCfg.h
+6
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+19
-1
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+47
-8
source/libs/sync/test/syncRaftCfgTest.cpp
source/libs/sync/test/syncRaftCfgTest.cpp
+17
-1
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+36
-27
未找到文件。
include/common/tmsg.h
浏览文件 @
dbf80b56
...
...
@@ -1274,7 +1274,6 @@ int32_t tSerializeSCreateDropMQSBNodeReq(void* buf, int32_t bufLen, SMCreateQnod
int32_t
tDeserializeSCreateDropMQSBNodeReq
(
void
*
buf
,
int32_t
bufLen
,
SMCreateQnodeReq
*
pReq
);
typedef
struct
{
int32_t
dnodeId
;
int8_t
replica
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
}
SDCreateMnodeReq
,
SDAlterMnodeReq
;
...
...
include/libs/sync/sync.h
浏览文件 @
dbf80b56
...
...
@@ -146,6 +146,7 @@ typedef struct SSyncLogStore {
}
SSyncLogStore
;
typedef
struct
SSyncInfo
{
bool
isStandBy
;
SyncGroupId
vgId
;
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
...
...
@@ -160,7 +161,6 @@ int32_t syncInit();
void
syncCleanUp
();
int64_t
syncOpen
(
const
SSyncInfo
*
pSyncInfo
);
void
syncStart
(
int64_t
rid
);
void
syncStartStandBy
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pSyncCfg
);
ESyncState
syncGetMyRole
(
int64_t
rid
);
...
...
@@ -173,6 +173,10 @@ bool syncEnvIsStart();
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
// to be moved to static
void
syncStartNormal
(
int64_t
rid
);
void
syncStartStandBy
(
int64_t
rid
);
#ifdef __cplusplus
}
#endif
...
...
source/common/src/tmsg.c
浏览文件 @
dbf80b56
...
...
@@ -3188,7 +3188,6 @@ int32_t tSerializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq *
tEncoderInit
(
&
encoder
,
buf
,
bufLen
);
if
(
tStartEncode
(
&
encoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
replica
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
i
];
...
...
@@ -3206,7 +3205,6 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
tDecoderInit
(
&
decoder
,
buf
,
bufLen
);
if
(
tStartDecode
(
&
decoder
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
&
decoder
,
&
pReq
->
dnodeId
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
&
decoder
,
&
pReq
->
replica
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
SReplica
*
pReplica
=
&
pReq
->
replicas
[
i
];
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
dbf80b56
...
...
@@ -79,7 +79,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
return
-
1
;
}
if
(
createReq
.
replica
<=
1
||
(
createReq
.
dnodeId
!=
pInput
->
pData
->
dnodeId
&&
pInput
->
pData
->
dnodeId
!=
0
)
)
{
if
(
createReq
.
replica
!=
1
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/test/CMakeLists.txt
浏览文件 @
dbf80b56
...
...
@@ -3,7 +3,7 @@ if(${BUILD_TEST})
add_subdirectory
(
qnode
)
add_subdirectory
(
bnode
)
add_subdirectory
(
snode
)
add_subdirectory
(
mnode
)
#
add_subdirectory(mnode)
add_subdirectory
(
vnode
)
add_subdirectory
(
sut
)
endif
(
${
BUILD_TEST
}
)
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
dbf80b56
...
...
@@ -267,75 +267,83 @@ static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnod
}
static
int32_t
mndSetCreateMnodeRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDAlterMnodeReq
alterReq
=
{
0
};
SDCreateMnodeReq
createReq
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
createEpset
=
{
0
};
while
(
1
)
{
SMnodeObj
*
pMObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
);
if
(
pIter
==
NULL
)
break
;
SReplica
*
pReplica
=
&
createReq
.
replicas
[
numOfReplicas
];
pReplica
->
id
=
pMObj
->
id
;
pReplica
->
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
numOfReplicas
++
;
alterReq
.
replicas
[
numOfReplicas
].
id
=
pMObj
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
alterEpset
.
eps
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pMObj
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
alterEpset
.
inUse
=
numOfReplicas
;
}
numOfReplicas
++
;
sdbRelease
(
pSdb
,
pMObj
);
}
SReplica
*
pReplica
=
&
createReq
.
replicas
[
numOfReplicas
];
pReplica
->
id
=
pDnode
->
id
;
pReplica
->
port
=
pDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
numOfReplicas
++
;
alterReq
.
replica
=
numOfReplicas
+
1
;
alterReq
.
replicas
[
numOfReplicas
].
id
=
pDnode
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
createReq
.
replica
=
numOfReplicas
;
alterEpset
.
numOfEps
=
numOfReplicas
+
1
;
alterEpset
.
eps
[
numOfReplicas
].
port
=
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
while
(
1
)
{
SMnodeObj
*
pMObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
)
;
if
(
pIter
==
NULL
)
break
;
createReq
.
replica
=
1
;
createReq
.
replicas
[
0
].
id
=
pDnode
->
id
;
createReq
.
replicas
[
0
].
port
=
pDnode
->
port
;
memcpy
(
createReq
.
replicas
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
)
;
STransAction
action
=
{
0
};
createEpset
.
numOfEps
=
1
;
createEpset
.
eps
[
0
].
port
=
pDnode
->
port
;
memcpy
(
createEpset
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
createReq
.
dnodeId
=
pMObj
->
id
;
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
create
Req
);
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alter
Req
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
create
Req
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
alter
Req
);
action
.
epSet
=
mndGetDnodeEpset
(
pMObj
->
pDnode
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_ALTER_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
STransAction
action
=
{
.
epSet
=
alterEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_ALTER_MNODE
,
.
acceptableCode
=
0
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pMObj
);
return
-
1
;
}
sdbRelease
(
pSdb
,
pMObj
);
}
{
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
createReq
.
dnodeId
=
pObj
->
id
;
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
createReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
createReq
);
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_CREATE_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
STransAction
action
=
{
.
epSet
=
createEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_CREATE_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
...
...
@@ -441,73 +449,77 @@ static int32_t mndSetDropMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeO
}
static
int32_t
mndSetDropMnodeRedoActions
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SDnodeObj
*
pDnode
,
SMnodeObj
*
pObj
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
void
*
pIter
=
NULL
;
int32_t
numOfReplicas
=
0
;
SDAlterMnodeReq
alterReq
=
{
0
};
SDDropMnodeReq
dropReq
=
{
0
};
SEpSet
alterEpset
=
{
0
};
SEpSet
dropEpSet
=
{
0
};
while
(
1
)
{
SMnodeObj
*
pMObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
);
if
(
pIter
==
NULL
)
break
;
if
(
pMObj
->
id
==
pObj
->
id
)
{
sdbRelease
(
pSdb
,
pMObj
);
continue
;
}
alterReq
.
replicas
[
numOfReplicas
].
id
=
pMObj
->
id
;
alterReq
.
replicas
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterReq
.
replicas
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pMObj
->
id
!=
pObj
->
id
)
{
SReplica
*
pReplica
=
&
alterReq
.
replicas
[
numOfReplicas
];
pReplica
->
id
=
pMObj
->
id
;
pReplica
->
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
pReplica
->
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
numOfReplicas
++
;
alterEpset
.
eps
[
numOfReplicas
].
port
=
pMObj
->
pDnode
->
port
;
memcpy
(
alterEpset
.
eps
[
numOfReplicas
].
fqdn
,
pMObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
if
(
pMObj
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
alterEpset
.
inUse
=
numOfReplicas
;
}
numOfReplicas
++
;
sdbRelease
(
pSdb
,
pMObj
);
}
alterReq
.
replica
=
numOfReplicas
;
alterEpset
.
numOfEps
=
numOfReplicas
;
while
(
1
)
{
SMnodeObj
*
pMObj
=
NULL
;
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pIter
,
(
void
**
)
&
pMObj
);
if
(
pIter
==
NULL
)
break
;
if
(
pMObj
->
id
!=
pObj
->
id
)
{
STransAction
action
=
{
0
};
alterReq
.
dnodeId
=
pMObj
->
id
;
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
alterReq
);
action
.
epSet
=
mndGetDnodeEpset
(
pMObj
->
pDnode
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_ALTER_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_ALREADY_DEPLOYED
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
sdbCancelFetch
(
pSdb
,
pIter
);
sdbRelease
(
pSdb
,
pMObj
);
return
-
1
;
}
}
dropReq
.
dnodeId
=
pDnode
->
id
;
dropEpSet
.
numOfEps
=
1
;
dropEpSet
.
eps
[
0
].
port
=
pDnode
->
port
;
memcpy
(
dropEpSet
.
eps
[
0
].
fqdn
,
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
sdbRelease
(
pSdb
,
pMObj
);
{
int32_t
contLen
=
tSerializeSDCreateMnodeReq
(
NULL
,
0
,
&
alterReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSDCreateMnodeReq
(
pReq
,
contLen
,
&
alterReq
);
STransAction
action
=
{
.
epSet
=
alterEpset
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_ALTER_MNODE
,
.
acceptableCode
=
0
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
}
}
{
STransAction
action
=
{
0
};
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
SDDropMnodeReq
dropReq
=
{
0
};
dropReq
.
dnodeId
=
pObj
->
id
;
int32_t
contLen
=
tSerializeSCreateDropMQSBNodeReq
(
NULL
,
0
,
&
dropReq
);
void
*
pReq
=
taosMemoryMalloc
(
contLen
);
tSerializeSCreateDropMQSBNodeReq
(
pReq
,
contLen
,
&
dropReq
);
action
.
epSet
=
mndGetDnodeEpset
(
pDnode
);
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_DND_DROP_MNODE
;
action
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
;
STransAction
action
=
{
.
epSet
=
dropEpSet
,
.
pCont
=
pReq
,
.
contLen
=
contLen
,
.
msgType
=
TDMT_DND_DROP_MNODE
,
.
acceptableCode
=
TSDB_CODE_NODE_NOT_DEPLOYED
,
};
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
return
-
1
;
...
...
@@ -662,7 +674,7 @@ static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) {
}
static
int32_t
mndProcessAlterMnodeReq
(
SRpcMsg
*
pReq
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SDAlterMnodeReq
alterReq
=
{
0
};
if
(
tDeserializeSDCreateMnodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
alterReq
)
!=
0
)
{
...
...
@@ -670,12 +682,6 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
return
-
1
;
}
if
(
alterReq
.
dnodeId
!=
pMnode
->
selfDnodeId
)
{
terrno
=
TSDB_CODE_INVALID_OPTION
;
mError
(
"failed to alter mnode since %s, input:%d cur:%d"
,
terrstr
(),
alterReq
.
dnodeId
,
pMnode
->
selfDnodeId
);
return
-
1
;
}
SSyncCfg
cfg
=
{.
replicaNum
=
alterReq
.
replica
,
.
myIndex
=
-
1
};
for
(
int32_t
i
=
0
;
i
<
alterReq
.
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
cfg
.
nodeInfo
[
i
];
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
dbf80b56
...
...
@@ -31,7 +31,8 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SMnode
*
pMnode
=
pFsm
->
data
;
SSdbRaw
*
pRaw
=
pMsg
->
pCont
;
mTrace
(
"raw:%p, apply to sdb, ver:%"
PRId64
" role:%s"
,
pRaw
,
cbMeta
.
index
,
syncStr
(
cbMeta
.
state
));
mTrace
(
"raw:%p, apply to sdb, ver:%"
PRId64
" term:%"
PRId64
" role:%s"
,
pRaw
,
cbMeta
.
index
,
cbMeta
.
term
,
syncStr
(
cbMeta
.
state
));
sdbWriteWithoutFree
(
pMnode
->
pSdb
,
pRaw
);
sdbSetApplyIndex
(
pMnode
->
pSdb
,
cbMeta
.
index
);
sdbSetApplyTerm
(
pMnode
->
pSdb
,
cbMeta
.
term
);
...
...
@@ -125,6 +126,7 @@ int32_t mndInitSync(SMnode *pMnode) {
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s%ssync"
,
pMnode
->
path
,
TD_DIRSEP
);
syncInfo
.
pWal
=
pMgmt
->
pWal
;
syncInfo
.
pFsm
=
mndSyncMakeFsm
(
pMnode
);
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
pMnode
->
replica
;
...
...
@@ -191,11 +193,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
void
mndSyncStart
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncSetMsgCb
(
pMgmt
->
sync
,
&
pMnode
->
msgCb
);
syncStart
(
pMgmt
->
sync
);
#if 0
if (pMgmt->standby) {
syncStartStandBy(pMgmt->sync);
} else {
syncStart(pMgmt->sync);
}
#endif
mDebug
(
"sync:%"
PRId64
" is started"
,
pMgmt
->
sync
);
}
...
...
source/libs/sync/inc/syncRaftCfg.h
浏览文件 @
dbf80b56
...
...
@@ -31,6 +31,7 @@ typedef struct SRaftCfg {
SSyncCfg
cfg
;
TdFilePtr
pFile
;
char
path
[
TSDB_FILENAME_LEN
*
2
];
int8_t
isStandBy
;
}
SRaftCfg
;
SRaftCfg
*
raftCfgOpen
(
const
char
*
path
);
...
...
@@ -42,10 +43,12 @@ char * syncCfg2Str(SSyncCfg *pSyncCfg);
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromStr
(
const
char
*
s
,
SSyncCfg
*
pSyncCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromJson
(
const
cJSON
*
pRoot
,
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromStr
(
const
char
*
s
,
SRaftCfg
*
pRaftCfg
);
int32_t
syncCfgCreateFile
(
SSyncCfg
*
pCfg
,
const
char
*
path
);
int32_t
raftCfgCreateFile
(
SSyncCfg
*
pCfg
,
int8_t
isStandBy
,
const
char
*
path
);
// for debug ----------------------
void
syncCfgPrint
(
SSyncCfg
*
pCfg
);
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
dbf80b56
...
...
@@ -100,6 +100,21 @@ void syncStart(int64_t rid) {
if
(
pSyncNode
==
NULL
)
{
return
;
}
if
(
pSyncNode
->
pRaftCfg
->
isStandBy
)
{
syncNodeStartStandBy
(
pSyncNode
);
}
else
{
syncNodeStart
(
pSyncNode
);
}
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
}
void
syncStartNormal
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
;
}
syncNodeStart
(
pSyncNode
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -368,7 +383,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
snprintf
(
pSyncNode
->
configPath
,
sizeof
(
pSyncNode
->
configPath
),
"%s/raft_config.json"
,
pSyncInfo
->
path
);
if
(
!
taosCheckExistFile
(
pSyncNode
->
configPath
))
{
// create raft config file
ret
=
syncCfgCreateFile
((
SSyncCfg
*
)
&
(
pSyncInfo
->
syncCfg
)
,
pSyncNode
->
configPath
);
ret
=
raftCfgCreateFile
((
SSyncCfg
*
)
&
(
pSyncInfo
->
syncCfg
),
pSyncInfo
->
isStandBy
,
pSyncNode
->
configPath
);
assert
(
ret
==
0
);
}
else
{
...
...
@@ -979,6 +994,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
voteGrantedUpdate
(
pSyncNode
->
pVotesGranted
,
pSyncNode
);
votesRespondUpdate
(
pSyncNode
->
pVotesRespond
,
pSyncNode
);
pSyncNode
->
pRaftCfg
->
isStandBy
=
0
;
raftCfgPersist
(
pSyncNode
->
pRaftCfg
);
syncNodeLog2
(
"==syncNodeUpdateConfig=="
,
pSyncNode
);
}
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
dbf80b56
...
...
@@ -32,7 +32,7 @@ SRaftCfg *raftCfgOpen(const char *path) {
int
len
=
taosReadFile
(
pCfg
->
pFile
,
buf
,
sizeof
(
buf
));
assert
(
len
>
0
);
int32_t
ret
=
syncCfgFromStr
(
buf
,
&
(
pCfg
->
cfg
)
);
int32_t
ret
=
raftCfgFromStr
(
buf
,
pCfg
);
assert
(
ret
==
0
);
return
pCfg
;
...
...
@@ -48,7 +48,7 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg) {
int32_t
raftCfgPersist
(
SRaftCfg
*
pRaftCfg
)
{
assert
(
pRaftCfg
!=
NULL
);
char
*
s
=
syncCfg2Str
(
&
(
pRaftCfg
->
cfg
)
);
char
*
s
=
raftCfg2Str
(
pRaftCfg
);
taosLSeekFile
(
pRaftCfg
->
pFile
,
0
,
SEEK_SET
);
int64_t
ret
=
taosWriteFile
(
pRaftCfg
->
pFile
,
s
,
strlen
(
s
)
+
1
);
assert
(
ret
==
strlen
(
s
)
+
1
);
...
...
@@ -76,9 +76,12 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
}
}
return
pRoot
;
/*
cJSON *pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
return pJson;
*/
}
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
)
{
...
...
@@ -90,7 +93,8 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
)
{
memset
(
pSyncCfg
,
0
,
sizeof
(
SSyncCfg
));
cJSON
*
pJson
=
cJSON_GetObjectItem
(
pRoot
,
"SSyncCfg"
);
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
const
cJSON
*
pJson
=
pRoot
;
cJSON
*
pReplicaNum
=
cJSON_GetObjectItem
(
pJson
,
"replicaNum"
);
assert
(
cJSON_IsNumber
(
pReplicaNum
));
...
...
@@ -133,22 +137,32 @@ int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
}
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pJson
=
syncCfg2Json
(
&
(
pRaftCfg
->
cfg
));
cJSON
*
pRoot
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pRoot
,
"SSyncCfg"
,
syncCfg2Json
(
&
(
pRaftCfg
->
cfg
)));
cJSON_AddNumberToObject
(
pRoot
,
"isStandBy"
,
pRaftCfg
->
isStandBy
);
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"RaftCfg"
,
pRoot
);
return
pJson
;
}
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
)
{
char
*
s
=
syncCfg2Str
(
&
(
pRaftCfg
->
cfg
));
return
s
;
cJSON
*
pJson
=
raftCfg2Json
(
pRaftCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
int32_t
syncCfgCreateFile
(
SSyncCfg
*
pCfg
,
const
char
*
path
)
{
int32_t
raftCfgCreateFile
(
SSyncCfg
*
pCfg
,
int8_t
isStandBy
,
const
char
*
path
)
{
assert
(
pCfg
!=
NULL
);
TdFilePtr
pFile
=
taosOpenFile
(
path
,
TD_FILE_CREATE
|
TD_FILE_WRITE
);
assert
(
pFile
!=
NULL
);
char
*
s
=
syncCfg2Str
(
pCfg
);
SRaftCfg
raftCfg
;
raftCfg
.
cfg
=
*
pCfg
;
raftCfg
.
isStandBy
=
isStandBy
;
char
*
s
=
raftCfg2Str
(
&
raftCfg
);
int64_t
ret
=
taosWriteFile
(
pFile
,
s
,
strlen
(
s
)
+
1
);
assert
(
ret
==
strlen
(
s
)
+
1
);
...
...
@@ -157,6 +171,31 @@ int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path) {
return
0
;
}
int32_t
raftCfgFromJson
(
const
cJSON
*
pRoot
,
SRaftCfg
*
pRaftCfg
)
{
// memset(pRaftCfg, 0, sizeof(SRaftCfg));
cJSON
*
pJson
=
cJSON_GetObjectItem
(
pRoot
,
"RaftCfg"
);
cJSON
*
pJsonIsStandBy
=
cJSON_GetObjectItem
(
pJson
,
"isStandBy"
);
pRaftCfg
->
isStandBy
=
cJSON_GetNumberValue
(
pJsonIsStandBy
);
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
int32_t
code
=
syncCfgFromJson
(
pJsonSyncCfg
,
&
(
pRaftCfg
->
cfg
));
ASSERT
(
code
==
0
);
return
code
;
}
int32_t
raftCfgFromStr
(
const
char
*
s
,
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pRoot
=
cJSON_Parse
(
s
);
assert
(
pRoot
!=
NULL
);
int32_t
ret
=
raftCfgFromJson
(
pRoot
,
pRaftCfg
);
assert
(
ret
==
0
);
cJSON_Delete
(
pRoot
);
return
0
;
}
// for debug ----------------------
void
syncCfgPrint
(
SSyncCfg
*
pCfg
)
{
char
*
serialized
=
syncCfg2Str
(
pCfg
);
...
...
source/libs/sync/test/syncRaftCfgTest.cpp
浏览文件 @
dbf80b56
...
...
@@ -15,6 +15,21 @@ void logTest() {
sFatal
(
"--- sync log test: fatal"
);
}
SRaftCfg
*
createRaftCfg
()
{
SRaftCfg
*
pCfg
=
(
SRaftCfg
*
)
taosMemoryMalloc
(
sizeof
(
SRaftCfg
));
memset
(
pCfg
,
0
,
sizeof
(
SRaftCfg
));
pCfg
->
cfg
.
replicaNum
=
3
;
pCfg
->
cfg
.
myIndex
=
1
;
for
(
int
i
=
0
;
i
<
pCfg
->
cfg
.
replicaNum
;
++
i
)
{
((
pCfg
->
cfg
.
nodeInfo
)[
i
]).
nodePort
=
i
*
100
;
snprintf
(((
pCfg
->
cfg
.
nodeInfo
)[
i
]).
nodeFqdn
,
sizeof
(((
pCfg
->
cfg
.
nodeInfo
)[
i
]).
nodeFqdn
),
"100.200.300.%d"
,
i
);
}
pCfg
->
isStandBy
=
taosGetTimestampSec
()
%
100
;
return
pCfg
;
}
SSyncCfg
*
createSyncCfg
()
{
SSyncCfg
*
pCfg
=
(
SSyncCfg
*
)
taosMemoryMalloc
(
sizeof
(
SSyncCfg
));
memset
(
pCfg
,
0
,
sizeof
(
SSyncCfg
));
...
...
@@ -56,7 +71,7 @@ void test3() {
if
(
taosCheckExistFile
(
s
))
{
printf
(
"%s file: %s already exist!
\n
"
,
(
char
*
)
__FUNCTION__
,
s
);
}
else
{
syncCfgCreateFile
(
pCfg
,
s
);
raftCfgCreateFile
(
pCfg
,
7
,
s
);
printf
(
"%s create json file: %s
\n
"
,
(
char
*
)
__FUNCTION__
,
s
);
}
...
...
@@ -78,6 +93,7 @@ void test5() {
assert
(
pCfg
!=
NULL
);
pCfg
->
cfg
.
myIndex
=
taosGetTimestampSec
();
pCfg
->
isStandBy
+=
2
;
raftCfgPersist
(
pCfg
);
printf
(
"%s update json file: %s myIndex->%d
\n
"
,
(
char
*
)
__FUNCTION__
,
"./test3_raft_cfg.json"
,
pCfg
->
cfg
.
myIndex
);
...
...
tests/test/c/sdbDump.c
浏览文件 @
dbf80b56
...
...
@@ -20,10 +20,13 @@
#include "tconfig.h"
#include "tjson.h"
#define TMP_SDB_DATA_DIR "/tmp/dumpsdb"
#define TMP_SDB_MNODE_DIR "/tmp/dumpsdb/mnode"
#define TMP_SDB_FILE "/tmp/dumpsdb/mnode/data/sdb.data"
#define TMP_SDB_PATH "/tmp/dumpsdb/mnode/data"
#define TMP_DNODE_DIR "/tmp/dumpsdb"
#define TMP_MNODE_DIR "/tmp/dumpsdb/mnode"
#define TMP_SDB_DATA_DIR "/tmp/dumpsdb/mnode/data"
#define TMP_SDB_SYNC_DIR "/tmp/dumpsdb/mnode/sync"
#define TMP_SDB_DATA_FILE "/tmp/dumpsdb/mnode/data/sdb.data"
#define TMP_SDB_RAFT_CFG_FILE "/tmp/dumpsdb/mnode/sync/raft_config.json"
#define TMP_SDB_RAFT_STORE_FILE "/tmp/dumpsdb/mnode/sync/raft_store.json"
void
reportStartup
(
const
char
*
name
,
const
char
*
desc
)
{}
...
...
@@ -318,6 +321,10 @@ void dumpHeader(SSdb *pSdb, SJson *json) {
}
int32_t
dumpSdb
()
{
wDebugFlag
=
0
;
mDebugFlag
=
0
;
sDebugFlag
=
0
;
SMsgCb
msgCb
=
{
0
};
msgCb
.
reportStartupFp
=
reportStartup
;
msgCb
.
sendReqFp
=
sendReq
;
...
...
@@ -325,9 +332,10 @@ int32_t dumpSdb() {
msgCb
.
mgmt
=
(
SMgmtWrapper
*
)(
&
msgCb
);
// hack
tmsgSetDefault
(
&
msgCb
);
walInit
();
syncInit
();
SMnodeOpt
opt
=
{.
msgCb
=
msgCb
};
SMnode
*
pMnode
=
mndOpen
(
TMP_
SDB_
MNODE_DIR
,
&
opt
);
SMnode
*
pMnode
=
mndOpen
(
TMP_MNODE_DIR
,
&
opt
);
if
(
pMnode
==
NULL
)
return
-
1
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
...
...
@@ -369,13 +377,11 @@ int32_t dumpSdb() {
taosCloseFile
(
&
pFile
);
tjsonDelete
(
json
);
taosMemoryFree
(
pCont
);
taosRemoveDir
(
TMP_
SDB_DATA
_DIR
);
taosRemoveDir
(
TMP_
DNODE
_DIR
);
return
0
;
}
int32_t
parseArgs
(
int32_t
argc
,
char
*
argv
[])
{
char
file
[
PATH_MAX
]
=
{
0
};
for
(
int32_t
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-c"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
...
...
@@ -388,20 +394,8 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
printf
(
"'-c' requires a parameter, default is %s
\n
"
,
configDir
);
return
-
1
;
}
}
else
if
(
strcmp
(
argv
[
i
],
"-f"
)
==
0
)
{
if
(
i
<
argc
-
1
)
{
if
(
strlen
(
argv
[
++
i
])
>=
PATH_MAX
)
{
printf
(
"file path overflow"
);
return
-
1
;
}
tstrncpy
(
file
,
argv
[
i
],
PATH_MAX
);
}
else
{
printf
(
"'-f' requires a parameter, default is %s
\n
"
,
configDir
);
return
-
1
;
}
}
else
{
printf
(
"-c Configuration directory.
\n
"
);
printf
(
"-f Input sdb.data file.
\n
"
);
return
-
1
;
}
}
...
...
@@ -416,13 +410,28 @@ int32_t parseArgs(int32_t argc, char *argv[]) {
return
-
1
;
}
if
(
file
[
0
]
==
0
)
{
snprintf
(
file
,
PATH_MAX
,
"%s/mnode/data/sdb.data"
,
tsDataDir
);
}
strcpy
(
tsDataDir
,
TMP_SDB_DATA_DIR
);
taosMulMkDir
(
TMP_SDB_PATH
);
taosCopyFile
(
file
,
TMP_SDB_FILE
);
char
dataFile
[
PATH_MAX
]
=
{
0
};
char
raftCfgFile
[
PATH_MAX
]
=
{
0
};
char
raftStoreFile
[
PATH_MAX
]
=
{
0
};
snprintf
(
dataFile
,
PATH_MAX
,
"%s/mnode/data/sdb.data"
,
tsDataDir
);
snprintf
(
raftCfgFile
,
PATH_MAX
,
"%s/mnode/sync/raft_config.json"
,
tsDataDir
);
snprintf
(
raftStoreFile
,
PATH_MAX
,
"%s/mnode/sync/raft_store.json"
,
tsDataDir
);
char
cmd
[
PATH_MAX
*
2
]
=
{
0
};
snprintf
(
cmd
,
sizeof
(
cmd
),
"rm -rf %s"
,
TMP_DNODE_DIR
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"mkdir -p %s"
,
TMP_SDB_DATA_DIR
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"mkdir -p %s"
,
TMP_SDB_SYNC_DIR
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
dataFile
,
TMP_SDB_DATA_FILE
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
raftCfgFile
,
TMP_SDB_RAFT_CFG_FILE
);
system
(
cmd
);
snprintf
(
cmd
,
sizeof
(
cmd
),
"cp %s %s 2>/dev/null"
,
raftStoreFile
,
TMP_SDB_RAFT_STORE_FILE
);
system
(
cmd
);
strcpy
(
tsDataDir
,
TMP_DNODE_DIR
);
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录