Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
20d788d7
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
20d788d7
编写于
5月 24, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
5月 24, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #12863 from taosdata/fix/mnode
refactor: sync integrate into mnode
上级
e7a182fc
9f38ff67
变更
31
隐藏空白更改
内联
并排
Showing
31 changed file
with
533 addition
and
246 deletion
+533
-246
include/common/tmsgdef.h
include/common/tmsgdef.h
+1
-0
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+1
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+8
-4
include/libs/sync/sync.h
include/libs/sync/sync.h
+3
-1
include/libs/transport/trpc.h
include/libs/transport/trpc.h
+10
-10
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+18
-13
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+10
-0
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+26
-0
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+16
-2
source/dnode/mgmt/test/mnode/CMakeLists.txt
source/dnode/mgmt/test/mnode/CMakeLists.txt
+4
-4
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+6
-2
source/dnode/mnode/impl/inc/mndSync.h
source/dnode/mnode/impl/inc/mndSync.h
+2
-0
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+107
-133
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+20
-11
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+100
-2
source/dnode/mnode/impl/test/sdb/sdbTest.cpp
source/dnode/mnode/impl/test/sdb/sdbTest.cpp
+6
-9
source/dnode/mnode/impl/test/trans/CMakeLists.txt
source/dnode/mnode/impl/test/trans/CMakeLists.txt
+4
-4
source/dnode/mnode/impl/test/trans/trans2.cpp
source/dnode/mnode/impl/test/trans/trans2.cpp
+9
-0
source/dnode/mnode/sdb/src/sdb.c
source/dnode/mnode/sdb/src/sdb.c
+9
-10
source/dnode/mnode/sdb/src/sdbFile.c
source/dnode/mnode/sdb/src/sdbFile.c
+19
-3
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+8
-1
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+2
-2
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+5
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+27
-1
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+28
-3
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+54
-10
source/libs/sync/test/syncConfigChangeTest.cpp
source/libs/sync/test/syncConfigChangeTest.cpp
+5
-0
source/libs/sync/test/syncSnapshotTest.cpp
source/libs/sync/test/syncSnapshotTest.cpp
+2
-0
source/libs/transport/inc/transComm.h
source/libs/transport/inc/transComm.h
+1
-1
source/libs/transport/src/trans.c
source/libs/transport/src/trans.c
+20
-19
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+2
-1
未找到文件。
include/common/tmsgdef.h
浏览文件 @
20d788d7
...
...
@@ -158,6 +158,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_DROP_INDEX
,
"mnode-drop-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_DB_CFG
,
"mnode-get-db-cfg"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_INDEX
,
"mnode-get-index"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_MND_APPLY_MSG
,
"mnode-apply-msg"
,
NULL
,
NULL
)
// Requests handled by VNODE
TD_NEW_MSG_SEG
(
TDMT_VND_MSG
)
...
...
include/dnode/mnode/mnode.h
浏览文件 @
20d788d7
...
...
@@ -89,6 +89,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
* @return int32_t 0 for success, -1 for failure.
*/
int32_t
mndProcessMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
/**
* @brief Generate machine code
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
20d788d7
...
...
@@ -304,13 +304,16 @@ int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
int64_t
sdbGetTableVer
(
SSdb
*
pSdb
,
ESdbType
type
);
/**
* @brief Update the
version
of sdb
* @brief Update the
index
of sdb
*
* @param pSdb The sdb object.
* @param
val The update value of the version
.
* @return int32_t The current
version
of sdb
* @param
index The update value of the apply index
.
* @return int32_t The current
index
of sdb
*/
int64_t
sdbUpdateVer
(
SSdb
*
pSdb
,
int32_t
val
);
void
sdbSetApplyIndex
(
SSdb
*
pSdb
,
int64_t
index
);
int64_t
sdbGetApplyIndex
(
SSdb
*
pSdb
);
void
sdbSetApplyTerm
(
SSdb
*
pSdb
,
int64_t
term
);
int64_t
sdbGetApplyTerm
(
SSdb
*
pSdb
);
SSdbRaw
*
sdbAllocRaw
(
ESdbType
type
,
int8_t
sver
,
int32_t
dataLen
);
void
sdbFreeRaw
(
SSdbRaw
*
pRaw
);
...
...
@@ -339,6 +342,7 @@ typedef struct SSdb {
char
*
tmpDir
;
int64_t
lastCommitVer
;
int64_t
curVer
;
int64_t
curTerm
;
int64_t
tableVer
[
SDB_MAX
];
int64_t
maxId
[
SDB_MAX
];
EKeyType
keyTypes
[
SDB_MAX
];
...
...
include/libs/sync/sync.h
浏览文件 @
20d788d7
...
...
@@ -78,6 +78,8 @@ typedef struct SFsmCbMeta {
int32_t
code
;
ESyncState
state
;
uint64_t
seqNum
;
SyncTerm
term
;
SyncTerm
currentTerm
;
}
SFsmCbMeta
;
typedef
struct
SSyncFSM
{
...
...
@@ -85,6 +87,7 @@ typedef struct SSyncFSM {
void
(
*
FpCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
(
*
FpPreCommitCb
)(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
(
*
FpRollBackCb
)(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
);
void
(
*
FpRestoreFinish
)(
struct
SSyncFSM
*
pFsm
);
int32_t
(
*
FpGetSnapshot
)(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
int32_t
(
*
FpRestoreSnapshot
)(
struct
SSyncFSM
*
pFsm
,
const
SSnapshot
*
snapshot
);
}
SSyncFSM
;
...
...
@@ -117,7 +120,6 @@ typedef struct SSyncLogStore {
}
SSyncLogStore
;
typedef
struct
SSyncInfo
{
SyncGroupId
vgId
;
SSyncCfg
syncCfg
;
...
...
include/libs/transport/trpc.h
浏览文件 @
20d788d7
...
...
@@ -28,7 +28,7 @@ extern "C" {
#define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U)
extern
int
tsRpcHeadSize
;
extern
int
32_t
tsRpcHeadSize
;
typedef
struct
{
uint32_t
clientIp
;
...
...
@@ -69,10 +69,10 @@ typedef struct SRpcInit {
char
localFqdn
[
TSDB_FQDN_LEN
];
uint16_t
localPort
;
// local port
char
*
label
;
// for debug purpose
int
numOfThreads
;
// number of threads to handle connections
int
sessions
;
// number of sessions allowed
int
32_t
numOfThreads
;
// number of threads to handle connections
int
32_t
sessions
;
// number of sessions allowed
int8_t
connType
;
// TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int
idleTime
;
// milliseconds, 0 means idle timer is disabled
int
32_t
idleTime
;
// milliseconds, 0 means idle timer is disabled
// the following is for client app ecurity only
char
*
user
;
// user name
...
...
@@ -108,9 +108,9 @@ int32_t rpcInit();
void
rpcCleanup
();
void
*
rpcOpen
(
const
SRpcInit
*
pRpc
);
void
rpcClose
(
void
*
);
void
*
rpcMallocCont
(
int
contLen
);
void
*
rpcMallocCont
(
int
32_t
contLen
);
void
rpcFreeCont
(
void
*
pCont
);
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
);
void
*
rpcReallocCont
(
void
*
ptr
,
int
32_t
contLen
);
// Because taosd supports multi-process mode
// These functions should not be used on the server side
...
...
@@ -121,10 +121,10 @@ void rpcRegisterBrokenLinkArg(SRpcMsg *msg);
void
rpcReleaseHandle
(
void
*
handle
,
int8_t
type
);
// just release client conn to rpc instance, no close sock
// These functions will not be called in the child process
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SEpSet
*
pEpSet
);
void
rpcSendRequestWithCtx
(
void
*
thandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
rid
,
SRpcCtx
*
ctx
);
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
);
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
void
rpcSendRedirectRsp
(
void
*
pConn
,
const
SEpSet
*
pEpSet
);
void
rpcSendRequestWithCtx
(
void
*
thandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
rid
,
SRpcCtx
*
ctx
);
int
32_t
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
);
void
rpcSendRecv
(
void
*
shandle
,
SEpSet
*
pEpSet
,
SRpcMsg
*
pReq
,
SRpcMsg
*
pRsp
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
20d788d7
...
...
@@ -24,19 +24,22 @@ extern "C" {
#endif
typedef
struct
SMnodeMgmt
{
SDnodeData
*
pData
;
SMnode
*
pMnode
;
SMsgCb
msgCb
;
const
char
*
path
;
const
char
*
name
;
SSingleWorker
queryWorker
;
SSingleWorker
readWorker
;
SSingleWorker
writeWorker
;
SSingleWorker
syncWorker
;
SSingleWorker
monitorWorker
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
int8_t
replica
;
int8_t
selfIndex
;
SDnodeData
*
pData
;
SMnode
*
pMnode
;
SMsgCb
msgCb
;
const
char
*
path
;
const
char
*
name
;
SSingleWorker
queryWorker
;
SSingleWorker
readWorker
;
SSingleWorker
writeWorker
;
SSingleWorker
syncWorker
;
SSingleWorker
monitorWorker
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
int8_t
replica
;
int8_t
selfIndex
;
bool
stopped
;
int32_t
refCount
;
TdThreadRwlock
lock
;
}
SMnodeMgmt
;
// mmFile.c
...
...
@@ -45,6 +48,8 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed);
// mmInt.c
int32_t
mmAlter
(
SMnodeMgmt
*
pMgmt
,
SDAlterMnodeReq
*
pMsg
);
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
);
void
mmRelease
(
SMnodeMgmt
*
pMgmt
);
// mmHandle.c
SArray
*
mmGetMsgHandles
();
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
20d788d7
...
...
@@ -237,6 +237,16 @@ SArray *mmGetMsgHandles() {
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_VNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT_VNODE_RSP
,
mmPutNodeMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_TIMEOUT
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_PING
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_PING_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_CLIENT_REQUEST
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_CLIENT_REQUEST_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_REQUEST_VOTE
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_APPEND_ENTRIES
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
,
mmPutNodeMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
code
=
0
;
_OVER:
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
20d788d7
...
...
@@ -110,6 +110,7 @@ static void mmClose(SMnodeMgmt *pMgmt) {
if
(
pMgmt
->
pMnode
!=
NULL
)
{
mmStopWorker
(
pMgmt
);
mndClose
(
pMgmt
->
pMnode
);
taosThreadRwlockDestroy
(
&
pMgmt
->
lock
);
pMgmt
->
pMnode
=
NULL
;
}
...
...
@@ -122,6 +123,11 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
return
-
1
;
}
if
(
syncInit
()
!=
0
)
{
dError
(
"failed to init sync since %s"
,
terrstr
());
return
-
1
;
}
SMnodeMgmt
*
pMgmt
=
taosMemoryCalloc
(
1
,
sizeof
(
SMnodeMgmt
));
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -137,6 +143,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt
->
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
(
PutToQueueFp
)
mmPutRpcMsgToWriteQueue
;
pMgmt
->
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
(
PutToQueueFp
)
mmPutRpcMsgToSyncQueue
;
pMgmt
->
msgCb
.
mgmt
=
pMgmt
;
taosThreadRwlockInit
(
&
pMgmt
->
lock
,
NULL
);
bool
deployed
=
false
;
if
(
mmReadFile
(
pMgmt
,
&
deployed
)
!=
0
)
{
...
...
@@ -206,3 +213,22 @@ SMgmtFunc mmGetMgmtFunc() {
return
mgmtFunc
;
}
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
)
{
int32_t
code
=
0
;
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
if
(
pMgmt
->
stopped
)
{
code
=
-
1
;
}
else
{
atomic_add_fetch_32
(
&
pMgmt
->
refCount
,
1
);
}
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
return
code
;
}
void
mmRelease
(
SMnodeMgmt
*
pMgmt
)
{
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
atomic_sub_fetch_32
(
&
pMgmt
->
refCount
,
1
);
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
}
\ No newline at end of file
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
20d788d7
...
...
@@ -56,6 +56,12 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
}
static
void
mmProcessSyncQueue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
mndProcessSyncMsg
(
pMsg
);
}
static
int32_t
mmPutNodeMsgToWorker
(
SSingleWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
dTrace
(
"msg:%p, put into worker %s, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pMsg
->
msgType
));
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
...
...
@@ -105,7 +111,10 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
int32_t
mmPutRpcMsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPutRpcMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
if
(
mmAcquire
(
pMgmt
)
!=
0
)
return
-
1
;
int32_t
code
=
mmPutRpcMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
mmRelease
(
pMgmt
);
return
code
;
}
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
)
{
...
...
@@ -149,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync"
,
.
fp
=
(
FItem
)
mmProcessQueue
,
.
fp
=
(
FItem
)
mmProcess
Sync
Queue
,
.
param
=
pMgmt
,
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncWorker
,
&
sCfg
)
!=
0
)
{
...
...
@@ -174,6 +183,11 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
}
void
mmStopWorker
(
SMnodeMgmt
*
pMgmt
)
{
taosThreadRwlockWrlock
(
&
pMgmt
->
lock
);
pMgmt
->
stopped
=
1
;
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
while
(
pMgmt
->
refCount
>
0
)
taosMsleep
(
10
);
tSingleWorkerCleanup
(
&
pMgmt
->
monitorWorker
);
tSingleWorkerCleanup
(
&
pMgmt
->
queryWorker
);
tSingleWorkerCleanup
(
&
pMgmt
->
readWorker
);
...
...
source/dnode/mgmt/test/mnode/CMakeLists.txt
浏览文件 @
20d788d7
...
...
@@ -4,7 +4,7 @@ target_link_libraries(
dmnodeTest sut
)
add_test
(
NAME dmnodeTest
COMMAND dmnodeTest
)
#
add_test(
#
NAME dmnodeTest
#
COMMAND dmnodeTest
#
)
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
20d788d7
...
...
@@ -19,6 +19,7 @@
#include "mndDef.h"
#include "sdb.h"
#include "syncTools.h"
#include "tcache.h"
#include "tdatablock.h"
#include "tglobal.h"
...
...
@@ -31,12 +32,14 @@
extern
"C"
{
#endif
// clang-format off
#define mFatal(...) { if (mDebugFlag & DEBUG_FATAL) { taosPrintLog("MND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }}
#define mError(...) { if (mDebugFlag & DEBUG_ERROR) { taosPrintLog("MND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }}
#define mWarn(...) { if (mDebugFlag & DEBUG_WARN) { taosPrintLog("MND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }}
#define mInfo(...) { if (mDebugFlag & DEBUG_INFO) { taosPrintLog("MND ", DEBUG_INFO, 255, __VA_ARGS__); }}
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
// clang-format on
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
...
...
@@ -72,10 +75,11 @@ typedef struct {
}
STelemMgmt
;
typedef
struct
{
SWal
*
pWal
;
int32_t
errCode
;
bool
restored
;
sem_t
syncSem
;
SWal
*
pWal
;
SSyncNode
*
pSyncNode
;
int64_t
sync
;
ESyncState
state
;
}
SSyncMgmt
;
...
...
source/dnode/mnode/impl/inc/mndSync.h
浏览文件 @
20d788d7
...
...
@@ -26,6 +26,8 @@ int32_t mndInitSync(SMnode *pMnode);
void
mndCleanupSync
(
SMnode
*
pMnode
);
bool
mndIsMaster
(
SMnode
*
pMnode
);
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
);
void
mndSyncStart
(
SMnode
*
pMnode
);
void
mndSyncStop
(
SMnode
*
pMnode
);
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
20d788d7
...
...
@@ -17,178 +17,152 @@
#include "mndSync.h"
#include "mndTrans.h"
static
int32_t
mndInitWal
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
char
path
[
PATH_MAX
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s%swal"
,
pMnode
->
path
,
TD_DIRSEP
);
SWalCfg
cfg
=
{
.
vgId
=
1
,
.
fsyncPeriod
=
0
,
.
rollPeriod
=
-
1
,
.
segSize
=
-
1
,
.
retentionPeriod
=
-
1
,
.
retentionSize
=
-
1
,
.
level
=
TAOS_WAL_FSYNC
,
};
pMgmt
->
pWal
=
walOpen
(
path
,
&
cfg
);
if
(
pMgmt
->
pWal
==
NULL
)
return
-
1
;
return
0
;
int32_t
mndSyncEqMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
tmsgPutToQueue
(
msgcb
,
SYNC_QUEUE
,
pMsg
);
if
(
code
!=
0
)
{
rpcFreeCont
(
pMsg
->
pCont
);
}
return
code
;
}
static
void
mndCloseWal
(
SMnode
*
pMnode
)
{
int32_t
mndSyncSendMsg
(
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
tmsgSendReq
(
pEpSet
,
pMsg
);
}
void
mndSyncCommitMsg
(
struct
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
if
(
pMgmt
->
pWal
!=
NULL
)
{
walClose
(
pMgmt
->
pWal
);
pMgmt
->
pWal
=
NULL
;
SSdbRaw
*
pRaw
=
pMsg
->
pCont
;
mTrace
(
"ver:%"
PRId64
", apply raw:%p to sdb, role:%s"
,
cbMeta
.
index
,
pRaw
,
syncStr
(
cbMeta
.
state
));
sdbWriteWithoutFree
(
pSdb
,
pRaw
);
sdbSetApplyIndex
(
pSdb
,
cbMeta
.
index
);
sdbSetApplyTerm
(
pSdb
,
cbMeta
.
term
);
if
(
cbMeta
.
state
==
TAOS_SYNC_STATE_LEADER
)
{
tsem_post
(
&
pMgmt
->
syncSem
);
}
}
static
int32_t
mndRestoreWal
(
SMnode
*
pMnode
)
{
SWal
*
pWal
=
pMnode
->
syncMgmt
.
pWal
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int64_t
lastSdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
int32_t
code
=
-
1
;
SWalReadHandle
*
pHandle
=
walOpenReadHandle
(
pWal
);
if
(
pHandle
==
NULL
)
return
-
1
;
int64_t
first
=
walGetFirstVer
(
pWal
);
int64_t
last
=
walGetLastVer
(
pWal
);
mDebug
(
"start to restore wal, sdbver:%"
PRId64
", first:%"
PRId64
" last:%"
PRId64
,
lastSdbVer
,
first
,
last
);
first
=
TMAX
(
lastSdbVer
+
1
,
first
);
for
(
int64_t
ver
=
first
;
ver
>=
0
&&
ver
<=
last
;
++
ver
)
{
if
(
walReadWithHandle
(
pHandle
,
ver
)
<
0
)
{
mError
(
"ver:%"
PRId64
", failed to read from wal since %s"
,
ver
,
terrstr
());
goto
_OVER
;
}
SWalHead
*
pHead
=
pHandle
->
pHead
;
int64_t
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
if
(
sdbVer
+
1
!=
ver
)
{
terrno
=
TSDB_CODE_SDB_INVALID_WAl_VER
;
mError
(
"ver:%"
PRId64
", failed to write to sdb, since inconsistent with sdbver:%"
PRId64
,
ver
,
sdbVer
);
goto
_OVER
;
}
mTrace
(
"ver:%"
PRId64
", will be restored, content:%p"
,
ver
,
pHead
->
head
.
body
);
if
(
sdbWriteWithoutFree
(
pSdb
,
(
void
*
)
pHead
->
head
.
body
)
<
0
)
{
mError
(
"ver:%"
PRId64
", failed to write to sdb since %s"
,
ver
,
terrstr
());
goto
_OVER
;
}
sdbUpdateVer
(
pSdb
,
1
);
mDebug
(
"ver:%"
PRId64
", is restored"
,
ver
);
}
int64_t
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
mDebug
(
"restore wal finished, sdbver:%"
PRId64
,
sdbVer
);
int32_t
mndSyncGetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
pSnapshot
->
lastApplyIndex
=
sdbGetApplyIndex
(
pMnode
->
pSdb
);
pSnapshot
->
lastApplyTerm
=
sdbGetApplyTerm
(
pMnode
->
pSdb
);
return
0
;
}
void
mndRestoreFinish
(
struct
SSyncFSM
*
pFsm
)
{
SMnode
*
pMnode
=
pFsm
->
data
;
mndTransPullup
(
pMnode
);
sdbVer
=
sdbUpdateVer
(
pSdb
,
0
);
mDebug
(
"pullup trans finished, sdbver:%"
PRId64
,
sdbVer
);
if
(
sdbVer
!=
lastSdbVer
)
{
mInfo
(
"sdb restored from %"
PRId64
" to %"
PRId64
", write file"
,
lastSdbVer
,
sdbVer
);
if
(
sdbWriteFile
(
pSdb
)
!=
0
)
{
goto
_OVER
;
}
if
(
walCommit
(
pWal
,
sdbVer
)
!=
0
)
{
goto
_OVER
;
}
if
(
walBeginSnapshot
(
pWal
,
sdbVer
)
<
0
)
{
goto
_OVER
;
}
if
(
walEndSnapshot
(
pWal
)
<
0
)
{
goto
_OVER
;
}
}
code
=
0
;
pMnode
->
syncMgmt
.
restored
=
true
;
}
_OVER:
walCloseReadHandle
(
pHandle
);
return
code
;
SSyncFSM
*
mndSyncMakeFsm
(
SMnode
*
pMnode
)
{
SSyncFSM
*
pFsm
=
taosMemoryCalloc
(
1
,
sizeof
(
SSyncFSM
));
pFsm
->
data
=
pMnode
;
pFsm
->
FpCommitCb
=
mndSyncCommitMsg
;
pFsm
->
FpPreCommitCb
=
NULL
;
pFsm
->
FpRollBackCb
=
NULL
;
pFsm
->
FpGetSnapshot
=
mndSyncGetSnapshot
;
pFsm
->
FpRestoreFinish
=
mndRestoreFinish
;
pFsm
->
FpRestoreSnapshot
=
NULL
;
return
pFsm
;
}
int32_t
mndInitSync
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
if
(
mndInitWal
(
pMnode
)
<
0
)
{
char
path
[
PATH_MAX
+
20
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s%swal"
,
pMnode
->
path
,
TD_DIRSEP
);
SWalCfg
cfg
=
{
.
vgId
=
1
,
.
fsyncPeriod
=
0
,
.
rollPeriod
=
-
1
,
.
segSize
=
-
1
,
.
retentionPeriod
=
-
1
,
.
retentionSize
=
-
1
,
.
level
=
TAOS_WAL_FSYNC
,
};
pMgmt
->
pWal
=
walOpen
(
path
,
&
cfg
);
if
(
pMgmt
->
pWal
==
NULL
)
{
mError
(
"failed to open wal since %s"
,
terrstr
());
return
-
1
;
}
if
(
mndRestoreWal
(
pMnode
)
<
0
)
{
mError
(
"failed to restore wal since %s"
,
terrstr
());
return
-
1
;
SSyncInfo
syncInfo
=
{.
vgId
=
1
,
.
FpSendMsg
=
mndSyncSendMsg
,
.
FpEqMsg
=
mndSyncEqMsg
};
snprintf
(
syncInfo
.
path
,
sizeof
(
syncInfo
.
path
),
"%s%ssync"
,
pMnode
->
path
,
TD_DIRSEP
);
syncInfo
.
pWal
=
pMgmt
->
pWal
;
syncInfo
.
pFsm
=
mndSyncMakeFsm
(
pMnode
);
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
pCfg
->
replicaNum
=
pMnode
->
replica
;
pCfg
->
myIndex
=
pMnode
->
selfIndex
;
for
(
int32_t
i
=
0
;
i
<
pMnode
->
replica
;
++
i
)
{
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
i
];
tstrncpy
(
pNode
->
nodeFqdn
,
pMnode
->
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pMnode
->
replicas
[
i
].
port
;
}
if
(
pMnode
->
selfId
==
1
)
{
pMgmt
->
state
=
TAOS_SYNC_STATE_LEADER
;
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
pMgmt
->
sync
=
syncOpen
(
&
syncInfo
);
if
(
pMgmt
->
sync
<=
0
)
{
mError
(
"failed to open sync since %s"
,
terrstr
());
return
-
1
;
}
pMgmt
->
pSyncNode
=
NULL
;
mDebug
(
"mnode sync is opened, id:%"
PRId64
,
pMgmt
->
sync
);
return
0
;
}
void
mndCleanupSync
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
tsem_destroy
(
&
pMgmt
->
syncSem
);
mndCloseWal
(
pMnode
);
}
static
int32_t
mndSyncApplyCb
(
struct
SSyncFSM
*
fsm
,
SyncIndex
index
,
const
SSyncBuffer
*
buf
,
void
*
pData
)
{
SMnode
*
pMnode
=
pData
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncStop
(
pMgmt
->
sync
);
mDebug
(
"sync:%"
PRId64
" is stopped"
,
pMgmt
->
sync
);
pMgmt
->
errCode
=
0
;
tsem_post
(
&
pMgmt
->
syncSem
);
tsem_destroy
(
&
pMgmt
->
syncSem
);
if
(
pMgmt
->
pWal
!=
NULL
)
{
walClose
(
pMgmt
->
pWal
);
}
return
0
;
memset
(
pMgmt
,
0
,
sizeof
(
SSyncMgmt
))
;
}
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
)
{
SWal
*
pWal
=
pMnode
->
syncMgmt
.
pWal
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int64_t
ver
=
sdbUpdateVer
(
pSdb
,
1
);
if
(
walWrite
(
pWal
,
ver
,
1
,
pRaw
,
sdbGetRawTotalSize
(
pRaw
))
<
0
)
{
sdbUpdateVer
(
pSdb
,
-
1
);
mError
(
"ver:%"
PRId64
", failed to write raw:%p to wal since %s"
,
ver
,
pRaw
,
terrstr
());
return
-
1
;
}
mTrace
(
"ver:%"
PRId64
", write to wal, raw:%p"
,
ver
,
pRaw
);
walCommit
(
pWal
,
ver
);
walFsync
(
pWal
,
true
);
#if 1
return
0
;
#else
if
(
pMnode
->
replica
==
1
)
return
0
;
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
pMgmt
->
errCode
=
0
;
SSyncBuffer
buf
=
{.
data
=
pRaw
,
.
len
=
sdbGetRawTotalSize
(
pRaw
)};
bool
isWeak
=
false
;
int32_t
code
=
syncPropose
(
pMgmt
->
pSyncNode
,
&
buf
,
pMnode
,
isWeak
);
SRpcMsg
rsp
=
{.
code
=
TDMT_MND_APPLY_MSG
,
.
contLen
=
sdbGetRawTotalSize
(
pRaw
)};
rsp
.
pCont
=
rpcMallocCont
(
rsp
.
contLen
);
if
(
rsp
.
pCont
==
NULL
)
return
-
1
;
memcpy
(
rsp
.
pCont
,
pRaw
,
rsp
.
contLen
);
const
bool
isWeak
=
false
;
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
rsp
,
isWeak
);
if
(
code
==
0
)
{
tsem_wait
(
&
pMgmt
->
syncSem
);
}
else
if
(
code
==
TAOS_SYNC_PROPOSE_NOT_LEADER
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
}
else
if
(
code
==
TAOS_SYNC_PROPOSE_OTHER_ERROR
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
else
{
terrno
=
TSDB_CODE_APP_ERROR
;
}
if
(
code
!=
0
)
return
code
;
tsem_wait
(
&
pMgmt
->
syncSem
);
return
pMgmt
->
errCode
;
#endif
}
void
mndSyncStart
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
syncSetMsgCb
(
pMgmt
->
sync
,
&
pMnode
->
msgCb
);
syncStart
(
pMgmt
->
sync
);
mDebug
(
"sync:%"
PRId64
" is started"
,
pMgmt
->
sync
);
}
void
mndSyncStop
(
SMnode
*
pMnode
)
{}
bool
mndIsMaster
(
SMnode
*
pMnode
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
return
pMgmt
->
state
==
TAOS_SYNC_STATE_LEADER
;
pMgmt
->
state
=
syncGetMyRole
(
pMgmt
->
sync
);
return
(
pMgmt
->
state
==
TAOS_SYNC_STATE_LEADER
)
&&
(
pMnode
->
syncMgmt
.
restored
);
}
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
20d788d7
...
...
@@ -682,13 +682,6 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
mDebug
(
"trans:%d, sync finished"
,
pTrans
->
id
);
code
=
sdbWrite
(
pMnode
->
pSdb
,
pRaw
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to write sdb since %s"
,
pTrans
->
id
,
terrstr
());
return
-
1
;
}
return
0
;
}
...
...
@@ -1360,19 +1353,35 @@ _OVER:
return
code
;
}
static
int32_t
mndCompareTransId
(
int32_t
*
pTransId1
,
int32_t
*
pTransId2
)
{
return
*
pTransId1
>=
*
pTransId2
?
1
:
0
;
}
void
mndTransPullup
(
SMnode
*
pMnode
)
{
STrans
*
pTrans
=
NULL
;
void
*
pIter
=
NULL
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SArray
*
pArray
=
taosArrayInit
(
sdbGetSize
(
pSdb
,
SDB_TRANS
),
sizeof
(
int32_t
));
if
(
pArray
==
NULL
)
return
;
void
*
pIter
=
NULL
;
while
(
1
)
{
STrans
*
pTrans
=
NULL
;
pIter
=
sdbFetch
(
pMnode
->
pSdb
,
SDB_TRANS
,
pIter
,
(
void
**
)
&
pTrans
);
if
(
pIter
==
NULL
)
break
;
taosArrayPush
(
pArray
,
&
pTrans
->
id
);
sdbRelease
(
pSdb
,
pTrans
);
}
mndTransExecute
(
pMnode
,
pTrans
);
sdbRelease
(
pMnode
->
pSdb
,
pTrans
);
taosArraySort
(
pArray
,
(
__compar_fn_t
)
mndCompareTransId
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pArray
);
++
i
)
{
int32_t
*
pTransId
=
taosArrayGet
(
pArray
,
i
);
STrans
*
pTrans
=
mndAcquireTrans
(
pMnode
,
*
pTransId
);
if
(
pTrans
!=
NULL
)
{
mndTransExecute
(
pMnode
,
pTrans
);
}
mndReleaseTrans
(
pMnode
,
pTrans
);
}
sdbWriteFile
(
pMnode
->
pSdb
);
taosArrayDestroy
(
pArray
);
}
static
int32_t
mndRetrieveTrans
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
20d788d7
...
...
@@ -335,9 +335,107 @@ int32_t mndAlter(SMnode *pMnode, const SMnodeOpt *pOption) {
return
0
;
}
int32_t
mndStart
(
SMnode
*
pMnode
)
{
return
mndInitTimer
(
pMnode
);
}
int32_t
mndStart
(
SMnode
*
pMnode
)
{
mndSyncStart
(
pMnode
);
return
mndInitTimer
(
pMnode
);
}
void
mndStop
(
SMnode
*
pMnode
)
{
mndSyncStop
(
pMnode
);
return
mndCleanupTimer
(
pMnode
);
}
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
void
*
ahandle
=
pMsg
->
info
.
ahandle
;
int32_t
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
if
(
syncEnvIsStart
())
{
SSyncNode
*
pSyncNode
=
syncNodeAcquire
(
pMnode
->
syncMgmt
.
sync
);
assert
(
pSyncNode
!=
NULL
);
ESyncState
state
=
syncGetMyRole
(
pMnode
->
syncMgmt
.
sync
);
SyncTerm
currentTerm
=
syncGetMyTerm
(
pMnode
->
syncMgmt
.
sync
);
SMsgHead
*
pHead
=
pMsg
->
pCont
;
char
logBuf
[
512
];
char
*
syncNodeStr
=
sync2SimpleStr
(
pMnode
->
syncMgmt
.
sync
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==vnodeProcessSyncReq== msgType:%d, syncNode: %s"
,
pMsg
->
msgType
,
syncNodeStr
);
syncRpcMsgLog2
(
logBuf
,
pMsg
);
taosMemoryFree
(
syncNodeStr
);
SRpcMsg
*
pRpcMsg
=
pMsg
;
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnTimeoutCb
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_PING
)
{
SyncPing
*
pSyncMsg
=
syncPingFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnPingCb
(
pSyncNode
,
pSyncMsg
);
syncPingDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_PING_REPLY
)
{
SyncPingReply
*
pSyncMsg
=
syncPingReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnPingReplyCb
(
pSyncNode
,
pSyncMsg
);
syncPingReplyDestroy
(
pSyncMsg
);
void
mndStop
(
SMnode
*
pMnode
)
{
return
mndCleanupTimer
(
pMnode
);
}
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnRequestVoteCb
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_REQUEST_VOTE_REPLY
)
{
SyncRequestVoteReply
*
pSyncMsg
=
syncRequestVoteReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnRequestVoteReplyCb
(
pSyncNode
,
pSyncMsg
);
syncRequestVoteReplyDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_APPEND_ENTRIES
)
{
SyncAppendEntries
*
pSyncMsg
=
syncAppendEntriesFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnAppendEntriesCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_VND_SYNC_APPEND_ENTRIES_REPLY
)
{
SyncAppendEntriesReply
*
pSyncMsg
=
syncAppendEntriesReplyFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnAppendEntriesReplyCb
(
pSyncNode
,
pSyncMsg
);
syncAppendEntriesReplyDestroy
(
pSyncMsg
);
}
else
{
mError
(
"==mndProcessSyncMsg== error msg type:%d"
,
pRpcMsg
->
msgType
);
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
syncNodeRelease
(
pSyncNode
);
}
else
{
mError
(
"==mndProcessSyncMsg== error syncEnv stop"
);
ret
=
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
return
ret
;
}
int32_t
mndProcessMsg
(
SRpcMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
info
.
node
;
...
...
source/dnode/mnode/impl/test/sdb/sdbTest.cpp
浏览文件 @
20d788d7
...
...
@@ -493,9 +493,8 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ
(
sdbGetSize
(
pSdb
,
SDB_USER
),
2
);
ASSERT_EQ
(
sdbGetMaxId
(
pSdb
,
SDB_USER
),
-
1
);
ASSERT_EQ
(
sdbGetTableVer
(
pSdb
,
SDB_USER
),
2
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
0
),
-
1
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
1
),
0
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
-
1
),
-
1
);
sdbSetApplyIndex
(
pSdb
,
-
1
);
ASSERT_EQ
(
sdbGetApplyIndex
(
pSdb
),
-
1
);
ASSERT_EQ
(
mnode
.
insertTimes
,
2
);
ASSERT_EQ
(
mnode
.
deleteTimes
,
0
);
...
...
@@ -537,9 +536,6 @@ TEST_F(MndTestSdb, 01_Write_Str) {
ASSERT_EQ
(
sdbGetSize
(
pSdb
,
SDB_USER
),
3
);
ASSERT_EQ
(
sdbGetTableVer
(
pSdb
,
SDB_USER
),
4
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
0
),
-
1
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
1
),
0
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
-
1
),
-
1
);
ASSERT_EQ
(
mnode
.
insertTimes
,
3
);
ASSERT_EQ
(
mnode
.
deleteTimes
,
0
);
...
...
@@ -704,8 +700,9 @@ TEST_F(MndTestSdb, 01_Write_Str) {
}
// write version
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
1
),
0
);
ASSERT_EQ
(
sdbUpdateVer
(
pSdb
,
1
),
1
);
sdbSetApplyIndex
(
pSdb
,
0
);
sdbSetApplyIndex
(
pSdb
,
1
);
ASSERT_EQ
(
sdbGetApplyIndex
(
pSdb
),
1
);
ASSERT_EQ
(
sdbWriteFile
(
pSdb
),
0
);
ASSERT_EQ
(
sdbWriteFile
(
pSdb
),
0
);
...
...
@@ -775,7 +772,7 @@ TEST_F(MndTestSdb, 01_Read_Str) {
ASSERT_EQ
(
sdbGetSize
(
pSdb
,
SDB_USER
),
2
);
ASSERT_EQ
(
sdbGetMaxId
(
pSdb
,
SDB_USER
),
-
1
);
ASSERT_EQ
(
sdbGetTableVer
(
pSdb
,
SDB_USER
),
5
);
ASSERT_EQ
(
sdb
UpdateVer
(
pSdb
,
0
),
1
);
ASSERT_EQ
(
sdb
GetApplyIndex
(
pSdb
),
1
);
ASSERT_EQ
(
mnode
.
insertTimes
,
4
);
ASSERT_EQ
(
mnode
.
deleteTimes
,
0
);
...
...
source/dnode/mnode/impl/test/trans/CMakeLists.txt
浏览文件 @
20d788d7
...
...
@@ -31,7 +31,7 @@ target_include_directories(
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/dnode/mnode"
PRIVATE
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../../inc"
)
#
add_test(
#
NAME transTest2
#
COMMAND transTest2
#
)
#add_test(
# NAME transTest2
# COMMAND transTest2
#)
source/dnode/mnode/impl/test/trans/trans2.cpp
浏览文件 @
20d788d7
...
...
@@ -23,6 +23,11 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return
-
1
;
}
int32_t
putToQueue
(
void
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
terrno
=
TSDB_CODE_INVALID_PTR
;
return
-
1
;
}
class
MndTestTrans2
:
public
::
testing
::
Test
{
protected:
static
void
InitLog
()
{
...
...
@@ -55,6 +60,9 @@ class MndTestTrans2 : public ::testing::Test {
msgCb
.
reportStartupFp
=
reportStartup
;
msgCb
.
sendReqFp
=
sendReq
;
msgCb
.
sendRspFp
=
sendRsp
;
msgCb
.
queueFps
[
SYNC_QUEUE
]
=
putToQueue
;
msgCb
.
queueFps
[
WRITE_QUEUE
]
=
putToQueue
;
msgCb
.
queueFps
[
READ_QUEUE
]
=
putToQueue
;
msgCb
.
mgmt
=
(
SMgmtWrapper
*
)(
&
msgCb
);
// hack
tmsgSetDefault
(
&
msgCb
);
...
...
@@ -77,6 +85,7 @@ class MndTestTrans2 : public ::testing::Test {
static
void
SetUpTestSuite
()
{
InitLog
();
walInit
();
syncInit
();
InitMnode
();
}
...
...
source/dnode/mnode/sdb/src/sdb.c
浏览文件 @
20d788d7
...
...
@@ -31,11 +31,9 @@ SSdb *sdbInit(SSdbOpt *pOption) {
char
path
[
PATH_MAX
+
100
]
=
{
0
};
snprintf
(
path
,
sizeof
(
path
),
"%s%sdata"
,
pOption
->
path
,
TD_DIRSEP
);
pSdb
->
currDir
=
strdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%ssync"
,
pOption
->
path
,
TD_DIRSEP
);
pSdb
->
syncDir
=
strdup
(
path
);
snprintf
(
path
,
sizeof
(
path
),
"%s%stmp"
,
pOption
->
path
,
TD_DIRSEP
);
pSdb
->
tmpDir
=
strdup
(
path
);
if
(
pSdb
->
currDir
==
NULL
||
pSdb
->
currDir
==
NULL
||
pSdb
->
curr
Dir
==
NULL
)
{
if
(
pSdb
->
currDir
==
NULL
||
pSdb
->
tmp
Dir
==
NULL
)
{
sdbCleanup
(
pSdb
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"failed to init sdb since %s"
,
terrstr
());
...
...
@@ -55,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
}
pSdb
->
curVer
=
-
1
;
pSdb
->
curTerm
=
-
1
;
pSdb
->
lastCommitVer
=
-
1
;
pSdb
->
pMnode
=
pOption
->
pMnode
;
mDebug
(
"sdb init successfully"
);
...
...
@@ -149,12 +148,6 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return
-
1
;
}
if
(
taosMkDir
(
pSdb
->
syncDir
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
pSdb
->
syncDir
,
terrstr
());
return
-
1
;
}
if
(
taosMkDir
(
pSdb
->
tmpDir
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
mError
(
"failed to create dir:%s since %s"
,
pSdb
->
tmpDir
,
terrstr
());
...
...
@@ -164,4 +157,10 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
return
0
;
}
int64_t
sdbUpdateVer
(
SSdb
*
pSdb
,
int32_t
val
)
{
return
atomic_add_fetch_64
(
&
pSdb
->
curVer
,
val
);
}
\ No newline at end of file
void
sdbSetApplyIndex
(
SSdb
*
pSdb
,
int64_t
index
)
{
pSdb
->
curVer
=
index
;
}
int64_t
sdbGetApplyIndex
(
SSdb
*
pSdb
)
{
return
pSdb
->
curVer
;
}
void
sdbSetApplyTerm
(
SSdb
*
pSdb
,
int64_t
term
)
{
pSdb
->
curTerm
=
term
;
}
int64_t
sdbGetApplyTerm
(
SSdb
*
pSdb
)
{
return
pSdb
->
curTerm
;
}
source/dnode/mnode/sdb/src/sdbFile.c
浏览文件 @
20d788d7
...
...
@@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
return
-
1
;
}
ret
=
taosReadFile
(
pFile
,
&
pSdb
->
curTerm
,
sizeof
(
int64_t
));
if
(
ret
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
if
(
ret
!=
sizeof
(
int64_t
))
{
terrno
=
TSDB_CODE_FILE_CORRUPTED
;
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
SDB_TABLE_SIZE
;
++
i
)
{
int64_t
maxId
=
0
;
ret
=
taosReadFile
(
pFile
,
&
maxId
,
sizeof
(
int64_t
));
...
...
@@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
return
-
1
;
}
if
(
taosWriteFile
(
pFile
,
&
pSdb
->
curTerm
,
sizeof
(
int64_t
))
!=
sizeof
(
int64_t
))
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
for
(
int32_t
i
=
0
;
i
<
SDB_TABLE_SIZE
;
++
i
)
{
int64_t
maxId
=
0
;
if
(
i
<
SDB_MAX
)
{
...
...
@@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
if
(
sdbReadFileHead
(
pSdb
,
pFile
)
!=
0
)
{
mError
(
"failed to read file:%s head since %s"
,
file
,
terrstr
());
pSdb
->
curVer
=
-
1
;
pSdb
->
curTerm
=
-
1
;
taosMemoryFree
(
pRaw
);
taosCloseFile
(
&
pFile
);
return
-
1
;
...
...
@@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char
curfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
curfile
,
sizeof
(
curfile
),
"%s%ssdb.data"
,
pSdb
->
currDir
,
TD_DIRSEP
);
mDebug
(
"start to write file:%s, current ver:%"
PRId64
", commit ver:%"
PRId64
,
curfile
,
pSdb
->
curVer
,
pSdb
->
lastCommitVer
);
mDebug
(
"start to write file:%s, current ver:%"
PRId64
"
term:%"
PRId64
"
, commit ver:%"
PRId64
,
curfile
,
pSdb
->
curVer
,
pSdb
->
curTerm
,
pSdb
->
lastCommitVer
);
TdFilePtr
pFile
=
taosOpenFile
(
tmpfile
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
...
...
@@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
mError
(
"failed to write file:%s since %s"
,
curfile
,
tstrerror
(
code
));
}
else
{
pSdb
->
lastCommitVer
=
pSdb
->
curVer
;
mDebug
(
"write file:%s successfully, ver:%"
PRId64
,
curfile
,
pSdb
->
lastCommitVer
);
mDebug
(
"write file:%s successfully, ver:%"
PRId64
" term:%"
PRId64
,
curfile
,
pSdb
->
lastCommitVer
,
pSdb
->
curTerm
);
}
terrno
=
code
;
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
20d788d7
...
...
@@ -56,7 +56,13 @@ void vnodeSyncStart(SVnode *pVnode) {
void
vnodeSyncClose
(
SVnode
*
pVnode
)
{
syncStop
(
pVnode
->
sync
);
}
int32_t
vnodeSyncEqMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
return
tmsgPutToQueue
(
msgcb
,
SYNC_QUEUE
,
pMsg
);
}
int32_t
vnodeSyncEqMsg
(
const
SMsgCb
*
msgcb
,
SRpcMsg
*
pMsg
)
{
int32_t
code
=
tmsgPutToQueue
(
msgcb
,
SYNC_QUEUE
,
pMsg
);
if
(
code
!=
0
)
{
rpcFreeCont
(
pMsg
->
pCont
);
}
return
code
;
}
int32_t
vnodeSyncSendMsg
(
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
)
{
return
tmsgSendReq
(
pEpSet
,
pMsg
);
}
...
...
@@ -141,5 +147,6 @@ SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm
->
FpPreCommitCb
=
vnodeSyncPreCommitMsg
;
pFsm
->
FpRollBackCb
=
vnodeSyncRollBackMsg
;
pFsm
->
FpGetSnapshot
=
vnodeSyncGetSnapshot
;
pFsm
->
FpRestoreFinish
=
NULL
;
return
pFsm
;
}
\ No newline at end of file
source/libs/sync/inc/syncIO.h
浏览文件 @
20d788d7
...
...
@@ -36,8 +36,8 @@ typedef struct SSyncIO {
STaosQueue
*
pMsgQ
;
STaosQset
*
pQset
;
TdThread
consumerTid
;
void
*
serverRpc
;
void
*
clientRpc
;
void
*
serverRpc
;
void
*
clientRpc
;
SEpSet
myAddr
;
SMsgCb
msgcb
;
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
20d788d7
...
...
@@ -147,6 +147,11 @@ typedef struct SSyncNode {
// tools
SSyncRespMgr
*
pSyncRespMgr
;
// restore state
bool
restoreFinish
;
//sem_t restoreSem;
SSnapshot
*
pSnapshot
;
}
SSyncNode
;
// open/close --------------
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
20d788d7
...
...
@@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
SRpcMsg
rpcMsg
;
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
// if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if
(
ths
->
pFsm
->
FpCommitCb
!=
NULL
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
SFsmCbMeta
cbMeta
;
cbMeta
.
index
=
pEntry
->
index
;
...
...
@@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta
.
code
=
0
;
cbMeta
.
state
=
ths
->
state
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
&
rpcMsg
,
cbMeta
);
bool
needExecute
=
true
;
if
(
ths
->
pSnapshot
!=
NULL
&&
cbMeta
.
index
<=
ths
->
pSnapshot
->
lastApplyIndex
)
{
needExecute
=
false
;
}
if
(
needExecute
)
{
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
&
rpcMsg
,
cbMeta
);
}
}
// config change
...
...
@@ -349,6 +359,22 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
}
// restore finish
if
(
pEntry
->
index
==
ths
->
pLogStore
->
getLastIndex
(
ths
->
pLogStore
))
{
if
(
ths
->
restoreFinish
==
false
)
{
if
(
ths
->
pFsm
->
FpRestoreFinish
!=
NULL
)
{
ths
->
pFsm
->
FpRestoreFinish
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
sInfo
(
"==syncNodeOnAppendEntriesCb== restoreFinish set true %p vgId:%d"
,
ths
,
ths
->
vgId
);
/*
tsem_post(&ths->restoreSem);
sInfo("==syncNodeOnAppendEntriesCb== RestoreFinish tsem_post %p", ths);
*/
}
}
rpcFreeCont
(
rpcMsg
.
pCont
);
syncEntryDestory
(
pEntry
);
}
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
20d788d7
...
...
@@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SRpcMsg
rpcMsg
;
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
// if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) {
if
(
pSyncNode
->
pFsm
->
FpCommitCb
!=
NULL
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
SFsmCbMeta
cbMeta
;
cbMeta
.
index
=
pEntry
->
index
;
...
...
@@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta
.
code
=
0
;
cbMeta
.
state
=
pSyncNode
->
state
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
pSyncNode
->
pFsm
->
FpCommitCb
(
pSyncNode
->
pFsm
,
&
rpcMsg
,
cbMeta
);
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
currentTerm
=
pSyncNode
->
pRaftStore
->
currentTerm
;
bool
needExecute
=
true
;
if
(
pSyncNode
->
pSnapshot
!=
NULL
&&
cbMeta
.
index
<=
pSyncNode
->
pSnapshot
->
lastApplyIndex
)
{
needExecute
=
false
;
}
if
(
needExecute
)
{
pSyncNode
->
pFsm
->
FpCommitCb
(
pSyncNode
->
pFsm
,
&
rpcMsg
,
cbMeta
);
}
}
// config change
...
...
@@ -127,6 +136,22 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
// restore finish
if
(
pEntry
->
index
==
pSyncNode
->
pLogStore
->
getLastIndex
(
pSyncNode
->
pLogStore
))
{
if
(
pSyncNode
->
restoreFinish
==
false
)
{
if
(
pSyncNode
->
pFsm
->
FpRestoreFinish
!=
NULL
)
{
pSyncNode
->
pFsm
->
FpRestoreFinish
(
pSyncNode
->
pFsm
);
}
pSyncNode
->
restoreFinish
=
true
;
sInfo
(
"==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d"
,
pSyncNode
,
pSyncNode
->
vgId
);
/*
tsem_post(&pSyncNode->restoreSem);
sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode);
*/
}
}
rpcFreeCont
(
rpcMsg
.
pCont
);
syncEntryDestory
(
pEntry
);
}
...
...
@@ -162,4 +187,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) {
}
}
return
false
;
}
\ No newline at end of file
}
source/libs/sync/src/syncMain.c
浏览文件 @
20d788d7
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdint.h>
#include "sync.h"
#include "syncAppendEntries.h"
#include "syncAppendEntriesReply.h"
...
...
@@ -55,14 +54,17 @@ static void syncFreeNode(void* param);
// ---------------------------------
int32_t
syncInit
()
{
int32_t
ret
;
tsNodeRefId
=
taosOpenRef
(
200
,
syncFreeNode
);
if
(
tsNodeRefId
<
0
)
{
sError
(
"failed to init node ref"
);
syncCleanUp
();
ret
=
-
1
;
}
else
{
ret
=
syncEnvStart
();
int32_t
ret
=
0
;
if
(
!
syncEnvIsStart
())
{
tsNodeRefId
=
taosOpenRef
(
200
,
syncFreeNode
);
if
(
tsNodeRefId
<
0
)
{
sError
(
"failed to init node ref"
);
syncCleanUp
();
ret
=
-
1
;
}
else
{
ret
=
syncEnvStart
();
}
}
return
ret
;
...
...
@@ -240,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return
ret
;
}
void
syncSetMsgCb
(
int64_t
rid
,
const
SMsgCb
*
msgcb
)
{
void
syncSetMsgCb
(
int64_t
rid
,
const
SMsgCb
*
msgcb
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
sTrace
(
"syncSetQ get pSyncNode is NULL, rid:%ld"
,
rid
);
...
...
@@ -490,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode
->
pSyncRespMgr
=
syncRespMgrCreate
(
NULL
,
0
);
assert
(
pSyncNode
->
pSyncRespMgr
!=
NULL
);
// restore state
pSyncNode
->
restoreFinish
=
false
;
pSyncNode
->
pSnapshot
=
NULL
;
if
(
pSyncNode
->
pFsm
->
FpGetSnapshot
!=
NULL
)
{
pSyncNode
->
pSnapshot
=
taosMemoryMalloc
(
sizeof
(
SSnapshot
));
pSyncNode
->
pFsm
->
FpGetSnapshot
(
pSyncNode
->
pFsm
,
pSyncNode
->
pSnapshot
);
}
//tsem_init(&(pSyncNode->restoreSem), 0, 0);
// start in syncNodeStart
// start raft
// syncNodeBecomeFollower(pSyncNode);
...
...
@@ -509,6 +520,20 @@ void syncNodeStart(SSyncNode* pSyncNode) {
// use this now
syncNodeAppendNoop
(
pSyncNode
);
syncMaybeAdvanceCommitIndex
(
pSyncNode
);
// maybe only one replica
/*
sInfo("==syncNodeStart== RestoreFinish begin 1 replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end 1 replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
}
*/
sInfo
(
"==syncNodeStart== restoreFinish ok 1 replica %p vgId:%d"
,
pSyncNode
,
pSyncNode
->
vgId
);
return
;
}
...
...
@@ -518,6 +543,19 @@ void syncNodeStart(SSyncNode* pSyncNode) {
int32_t
ret
=
0
;
// ret = syncNodeStartPingTimer(pSyncNode);
assert
(
ret
==
0
);
/*
sInfo("==syncNodeStart== RestoreFinish begin multi replica tsem_wait %p", pSyncNode);
tsem_wait(&pSyncNode->restoreSem);
sInfo("==syncNodeStart== RestoreFinish end multi replica tsem_wait %p", pSyncNode);
*/
/*
while (pSyncNode->restoreFinish != true) {
taosMsleep(10);
}
*/
sInfo
(
"==syncNodeStart== restoreFinish ok multi replica %p vgId:%d"
,
pSyncNode
,
pSyncNode
->
vgId
);
}
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
)
{
...
...
@@ -554,6 +592,12 @@ void syncNodeClose(SSyncNode* pSyncNode) {
taosMemoryFree
(
pSyncNode
->
pFsm
);
}
if
(
pSyncNode
->
pSnapshot
!=
NULL
)
{
taosMemoryFree
(
pSyncNode
->
pSnapshot
);
}
//tsem_destroy(&pSyncNode->restoreSem);
// free memory in syncFreeNode
// taosMemoryFree(pSyncNode);
}
...
...
source/libs/sync/test/syncConfigChangeTest.cpp
浏览文件 @
20d788d7
...
...
@@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return
0
;
}
void
FpRestoreFinishCb
(
struct
SSyncFSM
*
pFsm
)
{
sTrace
(
"==callback== ==FpRestoreFinishCb=="
);
}
SSyncFSM
*
createFsm
()
{
SSyncFSM
*
pFsm
=
(
SSyncFSM
*
)
taosMemoryMalloc
(
sizeof
(
SSyncFSM
));
pFsm
->
FpCommitCb
=
CommitCb
;
pFsm
->
FpPreCommitCb
=
PreCommitCb
;
pFsm
->
FpRollBackCb
=
RollBackCb
;
pFsm
->
FpGetSnapshot
=
GetSnapshotCb
;
pFsm
->
FpRestoreFinish
=
FpRestoreFinishCb
;
return
pFsm
;
}
...
...
source/libs/sync/test/syncSnapshotTest.cpp
浏览文件 @
20d788d7
...
...
@@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) {
}
int
main
(
int
argc
,
char
**
argv
)
{
sprintf
(
tsTempDir
,
"%s"
,
"."
);
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog
=
0
;
sDebugFlag
=
143
+
64
;
...
...
source/libs/transport/inc/transComm.h
浏览文件 @
20d788d7
...
...
@@ -94,7 +94,7 @@ typedef void* queue[2];
/* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT
2
0 // retry count limit
#define TRANS_RETRY_COUNT_LIMIT
10
0 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // ms retry interval
#define TRANS_CONN_TIMEOUT 3 // connect timeout
...
...
source/libs/transport/src/trans.c
浏览文件 @
20d788d7
...
...
@@ -17,7 +17,7 @@
#include "transComm.h"
void
*
(
*
taosInitHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
void
*
(
*
taosInitHandle
[])(
uint32_t
ip
,
uint32_t
port
,
char
*
label
,
int
32_t
numOfThreads
,
void
*
fp
,
void
*
shandle
)
=
{
transInitServer
,
transInitClient
};
void
(
*
taosCloseHandle
[])(
void
*
arg
)
=
{
transCloseServer
,
transCloseClient
};
...
...
@@ -77,37 +77,38 @@ void rpcClose(void* arg) {
taosMemoryFree
(
pRpc
);
return
;
}
void
*
rpcMallocCont
(
int
contLen
)
{
int
size
=
contLen
+
TRANS_MSG_OVERHEAD
;
char
*
start
=
(
char
*
)
taosMemoryCalloc
(
1
,
(
size_t
)
size
);
void
*
rpcMallocCont
(
int32_t
contLen
)
{
int32_t
size
=
contLen
+
TRANS_MSG_OVERHEAD
;
char
*
start
=
taosMemoryCalloc
(
1
,
size
);
if
(
start
==
NULL
)
{
tError
(
"failed to malloc msg, size:%d"
,
size
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
else
{
tTrace
(
"malloc mem:%p size:%d"
,
start
,
size
);
}
return
start
+
sizeof
(
STransMsgHead
);
}
void
rpcFreeCont
(
void
*
cont
)
{
// impl
if
(
cont
==
NULL
)
{
return
;
}
void
rpcFreeCont
(
void
*
cont
)
{
if
(
cont
==
NULL
)
return
;
taosMemoryFree
((
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
tTrace
(
"free mem: %p"
,
(
char
*
)
cont
-
TRANS_MSG_OVERHEAD
);
}
void
*
rpcReallocCont
(
void
*
ptr
,
int
contLen
)
{
if
(
ptr
==
NULL
)
{
return
rpcMallocCont
(
contLen
);
}
char
*
st
=
(
char
*
)
ptr
-
TRANS_MSG_OVERHEAD
;
int
sz
=
contLen
+
TRANS_MSG_OVERHEAD
;
void
*
rpcReallocCont
(
void
*
ptr
,
int32_t
contLen
)
{
if
(
ptr
==
NULL
)
return
rpcMallocCont
(
contLen
);
char
*
st
=
(
char
*
)
ptr
-
TRANS_MSG_OVERHEAD
;
int
32_t
sz
=
contLen
+
TRANS_MSG_OVERHEAD
;
st
=
taosMemoryRealloc
(
st
,
sz
);
if
(
st
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
return
st
+
TRANS_MSG_OVERHEAD
;
}
...
...
@@ -116,8 +117,8 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
assert
(
0
);
}
int
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
in
t
contLen
)
{
return
-
1
;
}
void
rpcCancelRequest
(
int64_t
rid
)
{
return
;
}
int
32_t
rpcReportProgress
(
void
*
pConn
,
char
*
pCont
,
int32_
t
contLen
)
{
return
-
1
;
}
void
rpcCancelRequest
(
int64_t
rid
)
{
return
;
}
void
rpcSendRequest
(
void
*
shandle
,
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
,
int64_t
*
pRid
)
{
transSendRequest
(
shandle
,
pEpSet
,
pMsg
,
NULL
);
...
...
@@ -129,8 +130,8 @@ void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
transSendRecv
(
shandle
,
pEpSet
,
pMsg
,
pRsp
);
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
transSendResponse
(
pMsg
);
}
int
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
transGetConnInfo
((
void
*
)
thandle
,
pInfo
);
}
void
rpcSendResponse
(
const
SRpcMsg
*
pMsg
)
{
transSendResponse
(
pMsg
);
}
int
32_t
rpcGetConnInfo
(
void
*
thandle
,
SRpcConnInfo
*
pInfo
)
{
return
transGetConnInfo
((
void
*
)
thandle
,
pInfo
);
}
void
rpcRefHandle
(
void
*
handle
,
int8_t
type
)
{
assert
(
type
==
TAOS_CONN_SERVER
||
type
==
TAOS_CONN_CLIENT
);
...
...
tests/test/c/sdbDump.c
浏览文件 @
20d788d7
...
...
@@ -262,7 +262,7 @@ void dumpCluster(SSdb *pSdb, SJson *json) {
}
void
dumpTrans
(
SSdb
*
pSdb
,
SJson
*
json
)
{
void
*
pIter
=
NULL
;
void
*
pIter
=
NULL
;
SJson
*
items
=
tjsonCreateObject
();
tjsonAddItemToObject
(
json
,
"transactions"
,
items
);
...
...
@@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
void
dumpHeader
(
SSdb
*
pSdb
,
SJson
*
json
)
{
tjsonAddIntegerToObject
(
json
,
"sver"
,
1
);
tjsonAddStringToObject
(
json
,
"curVer"
,
i642str
(
pSdb
->
curVer
));
tjsonAddStringToObject
(
json
,
"curTerm"
,
i642str
(
pSdb
->
curTerm
));
SJson
*
maxIdsJson
=
tjsonCreateObject
();
tjsonAddItemToObject
(
json
,
"maxIds"
,
maxIdsJson
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录