Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8318c2ac
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
8318c2ac
编写于
11月 12, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
11月 12, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #18078 from taosdata/fix/TD-20052
refact: adjust timeout msg build
上级
03a66e64
60fd3c4e
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
254 addition
and
242 deletion
+254
-242
source/libs/sync/inc/syncMessage.h
source/libs/sync/inc/syncMessage.h
+6
-23
source/libs/sync/inc/syncTimeout.h
source/libs/sync/inc/syncTimeout.h
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+69
-81
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+24
-134
source/libs/sync/src/syncTimeout.c
source/libs/sync/src/syncTimeout.c
+4
-2
source/libs/sync/test/syncTimeoutTest.cpp
source/libs/sync/test/syncTimeoutTest.cpp
+1
-1
source/libs/sync/test/sync_test_lib/inc/syncTest.h
source/libs/sync/test/sync_test_lib/inc/syncTest.h
+18
-0
source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
+131
-0
未找到文件。
source/libs/sync/inc/syncMessage.h
浏览文件 @
8318c2ac
...
...
@@ -40,27 +40,9 @@ typedef struct SyncTimeout {
void
*
data
;
// need optimized
}
SyncTimeout
;
SyncTimeout
*
syncTimeoutBuild
();
SyncTimeout
*
syncTimeoutBuild2
(
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
int32_t
vgId
,
void
*
data
);
void
syncTimeoutDestroy
(
SyncTimeout
*
pMsg
);
void
syncTimeoutSerialize
(
const
SyncTimeout
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncTimeoutDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncTimeout
*
pMsg
);
char
*
syncTimeoutSerialize2
(
const
SyncTimeout
*
pMsg
,
uint32_t
*
len
);
SyncTimeout
*
syncTimeoutDeserialize2
(
const
char
*
buf
,
uint32_t
len
);
void
syncTimeout2RpcMsg
(
const
SyncTimeout
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncTimeoutFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncTimeout
*
pMsg
);
SyncTimeout
*
syncTimeoutFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
);
char
*
syncTimeout2Str
(
const
SyncTimeout
*
pMsg
);
int32_t
syncTimeoutBuild
(
SRpcMsg
*
pTimeoutRpcMsg
,
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
SSyncNode
*
pNode
);
// for debug ----------------------
void
syncTimeoutPrint
(
const
SyncTimeout
*
pMsg
);
void
syncTimeoutPrint2
(
char
*
s
,
const
SyncTimeout
*
pMsg
);
void
syncTimeoutLog
(
const
SyncTimeout
*
pMsg
);
void
syncTimeoutLog2
(
char
*
s
,
const
SyncTimeout
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncClientRequest
{
uint32_t
bytes
;
int32_t
vgId
;
...
...
@@ -577,9 +559,9 @@ typedef struct SyncLocalCmd {
SRaftId
srcId
;
SRaftId
destId
;
int32_t
cmd
;
SyncTerm
sdNewTerm
;
// step down new term
SyncIndex
fcIndex
;
// follower commit index
int32_t
cmd
;
SyncTerm
sdNewTerm
;
// step down new term
SyncIndex
fcIndex
;
// follower commit index
}
SyncLocalCmd
;
...
...
@@ -628,6 +610,7 @@ bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
ESyncStrategy
syncNodeStrategy
(
SSyncNode
*
pSyncNode
);
// ---------------------------------------------
SyncTimeout
*
syncTimeoutBuildX
();
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncTimeout.h
浏览文件 @
8318c2ac
...
...
@@ -34,7 +34,7 @@ extern "C" {
// /\ voterLog' = [voterLog EXCEPT ![i] = [j \in {} |-> <<>>]]
// /\ UNCHANGED <<messages, leaderVars, logVars>>
//
int32_t
syncNodeOnTimer
(
SSyncNode
*
ths
,
S
yncTimeout
*
pMsg
);
int32_t
syncNodeOnTimer
(
SSyncNode
*
ths
,
S
RpcMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
8318c2ac
...
...
@@ -142,9 +142,7 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) {
code
=
syncNodeOnHeartbeatReply
(
pSyncNode
,
pSyncMsg
);
syncHeartbeatReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_TIMEOUT
)
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pSyncMsg
);
syncTimeoutDestroy
(
pSyncMsg
);
code
=
syncNodeOnTimer
(
pSyncNode
,
pMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
code
=
syncNodeOnClientRequest
(
pSyncNode
,
pMsg
,
NULL
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
...
...
@@ -1797,70 +1795,67 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex
}
static
void
syncNodeEqPingTimer
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
if
(
atomic_load_64
(
&
pSyncNode
->
pingTimerLogicClockUser
)
<=
atomic_load_64
(
&
pSyncNode
->
pingTimerLogicClock
))
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutBuild2
(
SYNC_TIMEOUT_PING
,
atomic_load_64
(
&
pSyncNode
->
pingTimerLogicClock
),
pSyncNode
->
pingTimerMS
,
pSyncNode
->
vgId
,
pSyncNode
);
SRpcMsg
rpcMsg
;
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
sNTrace
(
pSyncNode
,
"enqueue ping timer"
);
if
(
pSyncNode
->
syncEqMsg
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
syncEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d, sync enqueue ping msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sTrace
(
"syncNodeEqPingTimer pSyncNode->syncEqMsg is NULL"
);
if
(
!
syncIsInit
())
return
;
SSyncNode
*
pNode
=
param
;
if
(
atomic_load_64
(
&
pNode
->
pingTimerLogicClockUser
)
<=
atomic_load_64
(
&
pNode
->
pingTimerLogicClock
))
{
SRpcMsg
rpcMsg
=
{
0
};
int32_t
code
=
syncTimeoutBuild
(
&
rpcMsg
,
SYNC_TIMEOUT_PING
,
atomic_load_64
(
&
pNode
->
pingTimerLogicClock
),
pNode
->
pingTimerMS
,
pNode
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to build ping msg"
);
rpcFreeCont
(
rpcMsg
.
pCont
);
return
;
}
syncTimeoutDestroy
(
pSyncMsg
);
if
(
syncIsInit
())
{
taosTmrReset
(
syncNodeEqPingTimer
,
pSyncNode
->
pingTimerMS
,
pSyncNode
,
syncEnv
()
->
pTimerManager
,
&
pSyncNode
->
pPingTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqPingTimer"
);
sNTrace
(
pNode
,
"enqueue ping msg"
);
code
=
pNode
->
syncEqMsg
(
pNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to sync enqueue ping msg since %s"
,
terrstr
());
rpcFreeCont
(
rpcMsg
.
pCont
);
return
;
}
taosTmrReset
(
syncNodeEqPingTimer
,
pNode
->
pingTimerMS
,
pNode
,
syncEnv
()
->
pTimerManager
,
&
pNode
->
pPingTimer
);
}
else
{
sTrace
(
"==syncNodeEqPingTimer== pingTimerLogicClock:%"
PRId64
", pingTimerLogicClockUser:%"
PRId64
,
p
SyncNode
->
pingTimerLogicClock
,
pSync
Node
->
pingTimerLogicClockUser
);
p
Node
->
pingTimerLogicClock
,
p
Node
->
pingTimerLogicClockUser
);
}
}
static
void
syncNodeEqElectTimer
(
void
*
param
,
void
*
tmrId
)
{
SElectTimer
*
pElectTimer
=
(
SElectTimer
*
)
param
;
SSyncNode
*
pSyncNode
=
pElectTimer
->
pSyncNode
;
SyncTimeout
*
pSyncMsg
=
syncTimeoutBuild2
(
SYNC_TIMEOUT_ELECTION
,
pElectTimer
->
logicClock
,
pSyncNode
->
electTimerMS
,
pSyncNode
->
vgId
,
pSyncNode
);
SRpcMsg
rpcMsg
;
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
if
(
pSyncNode
->
syncEqMsg
!=
NULL
&&
pSyncNode
->
msgcb
!=
NULL
&&
pSyncNode
->
msgcb
->
putToQueueFp
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
syncEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d, sync enqueue elect msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
taosMemoryFree
(
pElectTimer
);
return
;
}
sNTrace
(
pSyncNode
,
"eq elect timer lc:%"
PRId64
,
pSyncMsg
->
logicClock
);
}
else
{
sTrace
(
"syncNodeEqElectTimer syncEqMsg is NULL"
);
if
(
!
syncIsInit
())
return
;
SElectTimer
*
pElectTimer
=
param
;
SSyncNode
*
pNode
=
pElectTimer
->
pSyncNode
;
SRpcMsg
rpcMsg
=
{
0
};
int32_t
code
=
syncTimeoutBuild
(
&
rpcMsg
,
SYNC_TIMEOUT_ELECTION
,
pElectTimer
->
logicClock
,
pNode
->
electTimerMS
,
pNode
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to build elect msg"
);
taosMemoryFree
(
pElectTimer
);
return
;
}
SyncTimeout
*
pTimeout
=
rpcMsg
.
pCont
;
sNTrace
(
pNode
,
"enqueue elect msg lc:%"
PRId64
,
pTimeout
->
logicClock
);
code
=
pNode
->
syncEqMsg
(
pNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to sync enqueue elect msg since %s"
,
terrstr
());
rpcFreeCont
(
rpcMsg
.
pCont
);
taosMemoryFree
(
pElectTimer
);
return
;
}
syncTimeoutDestroy
(
pSyncMsg
);
taosMemoryFree
(
pElectTimer
);
#if 0
// reset timer ms
if (syncIsInit() && pSyncNode->electBaseLine > 0) {
pSyncNode->electTimerMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pSyncNode->electTimerMS, pSyncNode, syncEnv()->pTimerManager,
&pSyncNode->pElectTimer);
if (syncIsInit() && pNode->electBaseLine > 0) {
pNode->electTimerMS = syncUtilElectRandomMS(pNode->electBaseLine, 2 * pNode->electBaseLine);
taosTmrReset(syncNodeEqElectTimer, pNode->electTimerMS, pNode, syncEnv()->pTimerManager, &pNode->pElectTimer);
} else {
sError("sync env is stop, syncNodeEqElectTimer");
}
...
...
@@ -1868,41 +1863,34 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
}
static
void
syncNodeEqHeartbeatTimer
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
param
;
sNTrace
(
pSyncNode
,
"eq hb timer"
);
if
(
!
syncIsInit
())
return
;
if
(
pSyncNode
->
replicaNum
>
1
)
{
if
(
atomic_load_64
(
&
pSyncNode
->
heartbeatTimerLogicClockUser
)
<=
atomic_load_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
))
{
SyncTimeout
*
pSyncMsg
=
syncTimeoutBuild2
(
SYNC_TIMEOUT_HEARTBEAT
,
atomic_load_64
(
&
pSyncNode
->
heartbeatTimerLogicClock
),
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
->
vgId
,
pSyncNode
);
SRpcMsg
rpcMsg
;
syncTimeout2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
sNTrace
(
pSyncNode
,
"enqueue heartbeat timer"
);
if
(
pSyncNode
->
syncEqMsg
!=
NULL
)
{
int32_t
code
=
pSyncNode
->
syncEqMsg
(
pSyncNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sError
(
"vgId:%d, sync enqueue timer msg error, code:%d"
,
pSyncNode
->
vgId
,
code
);
rpcFreeCont
(
rpcMsg
.
pCont
);
syncTimeoutDestroy
(
pSyncMsg
);
return
;
}
}
else
{
sError
(
"vgId:%d, enqueue msg cb ptr (i.e. syncEqMsg) not set."
,
pSyncNode
->
vgId
);
SSyncNode
*
pNode
=
param
;
if
(
pNode
->
replicaNum
>
1
)
{
if
(
atomic_load_64
(
&
pNode
->
heartbeatTimerLogicClockUser
)
<=
atomic_load_64
(
&
pNode
->
heartbeatTimerLogicClock
))
{
SRpcMsg
rpcMsg
=
{
0
};
int32_t
code
=
syncTimeoutBuild
(
&
rpcMsg
,
SYNC_TIMEOUT_HEARTBEAT
,
atomic_load_64
(
&
pNode
->
heartbeatTimerLogicClock
),
pNode
->
heartbeatTimerMS
,
pNode
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to build heartbeat msg"
);
return
;
}
syncTimeoutDestroy
(
pSyncMsg
);
if
(
syncIsInit
())
{
taosTmrReset
(
syncNodeEqHeartbeatTimer
,
pSyncNode
->
heartbeatTimerMS
,
pSyncNode
,
syncEnv
()
->
pTimerManager
,
&
pSyncNode
->
pHeartbeatTimer
);
}
else
{
sError
(
"sync env is stop, syncNodeEqHeartbeatTimer"
);
sNTrace
(
pNode
,
"enqueue heartbeat timer"
);
code
=
pNode
->
syncEqMsg
(
pNode
->
msgcb
,
&
rpcMsg
);
if
(
code
!=
0
)
{
sNError
(
pNode
,
"failed to enqueue heartbeat msg since %s"
,
terrstr
());
rpcFreeCont
(
rpcMsg
.
pCont
);
return
;
}
taosTmrReset
(
syncNodeEqHeartbeatTimer
,
pNode
->
heartbeatTimerMS
,
pNode
,
syncEnv
()
->
pTimerManager
,
&
pNode
->
pHeartbeatTimer
);
}
else
{
sTrace
(
"==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%"
PRId64
", heartbeatTimerLogicClockUser:%"
PRId64
""
,
pSyncNode
->
heartbeatTimerLogicClock
,
pSyncNode
->
heartbeatTimerLogicClockUser
);
sTrace
(
"==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%"
PRId64
", heartbeatTimerLogicClockUser:%"
PRId64
,
pNode
->
heartbeatTimerLogicClock
,
pNode
->
heartbeatTimerLogicClockUser
);
}
}
}
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
8318c2ac
...
...
@@ -20,139 +20,28 @@
#include "syncUtil.h"
#include "tcoding.h"
// ---- message process SyncTimeout----
SyncTimeout
*
syncTimeoutBuild
()
{
uint32_t
bytes
=
sizeof
(
SyncTimeout
);
SyncTimeout
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
TDMT_SYNC_TIMEOUT
;
return
pMsg
;
}
SyncTimeout
*
syncTimeoutBuild2
(
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
int32_t
vgId
,
void
*
data
)
{
SyncTimeout
*
pMsg
=
syncTimeoutBuild
();
pMsg
->
vgId
=
vgId
;
pMsg
->
timeoutType
=
timeoutType
;
pMsg
->
logicClock
=
logicClock
;
pMsg
->
timerMS
=
timerMS
;
pMsg
->
data
=
data
;
return
pMsg
;
}
void
syncTimeoutDestroy
(
SyncTimeout
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
taosMemoryFree
(
pMsg
);
}
}
void
syncTimeoutSerialize
(
const
SyncTimeout
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
ASSERT
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncTimeoutDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncTimeout
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
}
char
*
syncTimeoutSerialize2
(
const
SyncTimeout
*
pMsg
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pMsg
->
bytes
);
ASSERT
(
buf
!=
NULL
);
syncTimeoutSerialize
(
pMsg
,
buf
,
pMsg
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pMsg
->
bytes
;
}
return
buf
;
}
SyncTimeout
*
syncTimeoutDeserialize2
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SyncTimeout
*
pMsg
=
taosMemoryMalloc
(
bytes
);
ASSERT
(
pMsg
!=
NULL
);
syncTimeoutDeserialize
(
buf
,
len
,
pMsg
);
ASSERT
(
len
==
pMsg
->
bytes
);
return
pMsg
;
}
void
syncTimeout2RpcMsg
(
const
SyncTimeout
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncTimeoutSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncTimeoutFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncTimeout
*
pMsg
)
{
syncTimeoutDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
SyncTimeout
*
syncTimeoutFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
)
{
SyncTimeout
*
pMsg
=
syncTimeoutDeserialize2
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
ASSERT
(
pMsg
!=
NULL
);
return
pMsg
;
}
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pMsg
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"timeoutType"
,
pMsg
->
timeoutType
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
logicClock
);
cJSON_AddStringToObject
(
pRoot
,
"logicClock"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"timerMS"
,
pMsg
->
timerMS
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pMsg
->
data
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
u64buf
);
int32_t
syncTimeoutBuild
(
SRpcMsg
*
pTimeoutRpcMsg
,
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
SSyncNode
*
pNode
)
{
int32_t
bytes
=
sizeof
(
SyncTimeout
);
pTimeoutRpcMsg
->
pCont
=
rpcMallocCont
(
bytes
);
pTimeoutRpcMsg
->
msgType
=
TDMT_SYNC_TIMEOUT
;
pTimeoutRpcMsg
->
contLen
=
bytes
;
if
(
pTimeoutRpcMsg
->
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncTimeout"
,
pRoot
);
return
pJson
;
}
char
*
syncTimeout2Str
(
const
SyncTimeout
*
pMsg
)
{
cJSON
*
pJson
=
syncTimeout2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
// for debug ----------------------
void
syncTimeoutPrint
(
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
printf
(
"syncTimeoutPrint | len:%zu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutPrint2
(
char
*
s
,
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
printf
(
"syncTimeoutPrint2 | len:%d | %s | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutLog
(
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
sTrace
(
"syncTimeoutLog | len:%d | %s"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutLog2
(
char
*
s
,
const
SyncTimeout
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
sTrace
(
"syncTimeoutLog2 | len:%d | %s | %s"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
SyncTimeout
*
pTimeout
=
pTimeoutRpcMsg
->
pCont
;
pTimeout
->
bytes
=
bytes
;
pTimeout
->
msgType
=
TDMT_SYNC_TIMEOUT
;
pTimeout
->
vgId
=
pNode
->
vgId
;
pTimeout
->
timeoutType
=
timeoutType
;
pTimeout
->
logicClock
=
logicClock
;
pTimeout
->
timerMS
=
timerMS
;
pTimeout
->
data
=
pNode
;
return
0
;
}
// ---- message process SyncClientRequest----
SyncClientRequest
*
syncClientRequestAlloc
(
uint32_t
dataLen
)
{
uint32_t
bytes
=
sizeof
(
SyncClientRequest
)
+
dataLen
;
SyncClientRequest
*
pMsg
=
taosMemoryCalloc
(
1
,
bytes
);
...
...
@@ -166,6 +55,8 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR
bool
isWeak
,
int32_t
vgId
)
{
int32_t
bytes
=
sizeof
(
SyncClientRequest
)
+
pOriginalRpcMsg
->
contLen
;
pClientRequestRpcMsg
->
pCont
=
rpcMallocCont
(
bytes
);
pClientRequestRpcMsg
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pClientRequestRpcMsg
->
contLen
=
bytes
;
if
(
pClientRequestRpcMsg
->
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -181,14 +72,15 @@ int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SR
pClientRequest
->
dataLen
=
pOriginalRpcMsg
->
contLen
;
memcpy
(
pClientRequest
->
data
,
(
char
*
)
pOriginalRpcMsg
->
pCont
,
pOriginalRpcMsg
->
contLen
);
pClientRequestRpcMsg
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pClientRequestRpcMsg
->
contLen
=
bytes
;
return
0
;
}
int32_t
syncClientRequestBuildFromNoopEntry
(
SRpcMsg
*
pClientRequestRpcMsg
,
const
SSyncRaftEntry
*
pEntry
,
int32_t
vgId
)
{
int32_t
bytes
=
sizeof
(
SyncClientRequest
)
+
pEntry
->
bytes
;
pClientRequestRpcMsg
->
pCont
=
rpcMallocCont
(
bytes
);
pClientRequestRpcMsg
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pClientRequestRpcMsg
->
contLen
=
bytes
;
if
(
pClientRequestRpcMsg
->
pCont
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
...
...
@@ -202,8 +94,6 @@ int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const
pClientRequest
->
dataLen
=
pEntry
->
bytes
;
memcpy
(
pClientRequest
->
data
,
(
char
*
)
pEntry
,
pEntry
->
bytes
);
pClientRequestRpcMsg
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST
;
pClientRequestRpcMsg
->
contLen
=
bytes
;
return
0
;
}
...
...
@@ -2343,4 +2233,4 @@ void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg) {
sTrace
(
"syncLocalCmdLog2 | len:%d | %s | %s"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
\ No newline at end of file
}
source/libs/sync/src/syncTimeout.c
浏览文件 @
8318c2ac
...
...
@@ -85,8 +85,10 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
return
0
;
}
int32_t
syncNodeOnTimer
(
SSyncNode
*
ths
,
SyncTimeout
*
pMsg
)
{
int32_t
ret
=
0
;
int32_t
syncNodeOnTimer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpc
)
{
int32_t
ret
=
0
;
SyncTimeout
*
pMsg
=
pRpc
->
pCont
;
syncLogRecvTimer
(
ths
,
pMsg
,
""
);
if
(
pMsg
->
timeoutType
==
SYNC_TIMEOUT_PING
)
{
...
...
source/libs/sync/test/syncTimeoutTest.cpp
浏览文件 @
8318c2ac
...
...
@@ -28,7 +28,7 @@ void test2() {
uint32_t
len
=
pMsg
->
bytes
;
char
*
serialized
=
(
char
*
)
taosMemoryMalloc
(
len
);
syncTimeoutSerialize
(
pMsg
,
serialized
,
len
);
SyncTimeout
*
pMsg2
=
syncTimeoutBuild
();
SyncTimeout
*
pMsg2
=
syncTimeoutBuild
X
();
syncTimeoutDeserialize
(
serialized
,
len
,
pMsg2
);
syncTimeoutLog2
((
char
*
)
"test2: syncTimeoutSerialize -> syncTimeoutDeserialize "
,
pMsg2
);
...
...
source/libs/sync/test/sync_test_lib/inc/syncTest.h
浏览文件 @
8318c2ac
...
...
@@ -213,6 +213,24 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode);
int32_t
syncNodePingPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePingAll
(
SSyncNode
*
pSyncNode
);
SyncTimeout
*
syncTimeoutBuildX
();
SyncTimeout
*
syncTimeoutBuild2
(
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
int32_t
vgId
,
void
*
data
);
void
syncTimeoutDestroy
(
SyncTimeout
*
pMsg
);
void
syncTimeoutSerialize
(
const
SyncTimeout
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncTimeoutDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncTimeout
*
pMsg
);
char
*
syncTimeoutSerialize2
(
const
SyncTimeout
*
pMsg
,
uint32_t
*
len
);
SyncTimeout
*
syncTimeoutDeserialize2
(
const
char
*
buf
,
uint32_t
len
);
void
syncTimeout2RpcMsg
(
const
SyncTimeout
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncTimeoutFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncTimeout
*
pMsg
);
SyncTimeout
*
syncTimeoutFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
);
char
*
syncTimeout2Str
(
const
SyncTimeout
*
pMsg
);
void
syncTimeoutPrint
(
const
SyncTimeout
*
pMsg
);
void
syncTimeoutPrint2
(
char
*
s
,
const
SyncTimeout
*
pMsg
);
void
syncTimeoutLog
(
const
SyncTimeout
*
pMsg
);
void
syncTimeoutLog2
(
char
*
s
,
const
SyncTimeout
*
pMsg
);
#ifdef __cplusplus
}
#endif
...
...
source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
浏览文件 @
8318c2ac
...
...
@@ -1046,3 +1046,134 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
taosMemoryFree
(
serialized
);
}
}
// ---- message process SyncTimeout----
SyncTimeout
*
syncTimeoutBuildX
()
{
uint32_t
bytes
=
sizeof
(
SyncTimeout
);
SyncTimeout
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
msgType
=
TDMT_SYNC_TIMEOUT
;
return
pMsg
;
}
SyncTimeout
*
syncTimeoutBuild2
(
ESyncTimeoutType
timeoutType
,
uint64_t
logicClock
,
int32_t
timerMS
,
int32_t
vgId
,
void
*
data
)
{
SyncTimeout
*
pMsg
=
syncTimeoutBuildX
();
pMsg
->
vgId
=
vgId
;
pMsg
->
timeoutType
=
timeoutType
;
pMsg
->
logicClock
=
logicClock
;
pMsg
->
timerMS
=
timerMS
;
pMsg
->
data
=
data
;
return
pMsg
;
}
char
*
syncTimeoutSerialize2
(
const
SyncTimeout
*
pMsg
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pMsg
->
bytes
);
ASSERT
(
buf
!=
NULL
);
syncTimeoutSerialize
(
pMsg
,
buf
,
pMsg
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pMsg
->
bytes
;
}
return
buf
;
}
void
syncTimeoutDestroy
(
SyncTimeout
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
taosMemoryFree
(
pMsg
);
}
}
void
syncTimeoutSerialize
(
const
SyncTimeout
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
ASSERT
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncTimeoutDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncTimeout
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
}
SyncTimeout
*
syncTimeoutDeserialize2
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SyncTimeout
*
pMsg
=
taosMemoryMalloc
(
bytes
);
ASSERT
(
pMsg
!=
NULL
);
syncTimeoutDeserialize
(
buf
,
len
,
pMsg
);
ASSERT
(
len
==
pMsg
->
bytes
);
return
pMsg
;
}
void
syncTimeout2RpcMsg
(
const
SyncTimeout
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncTimeoutSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncTimeoutFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncTimeout
*
pMsg
)
{
syncTimeoutDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
SyncTimeout
*
syncTimeoutFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
)
{
SyncTimeout
*
pMsg
=
syncTimeoutDeserialize2
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
ASSERT
(
pMsg
!=
NULL
);
return
pMsg
;
}
cJSON
*
syncTimeout2Json
(
const
SyncTimeout
*
pMsg
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pMsg
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON_AddNumberToObject
(
pRoot
,
"timeoutType"
,
pMsg
->
timeoutType
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%"
PRIu64
,
pMsg
->
logicClock
);
cJSON_AddStringToObject
(
pRoot
,
"logicClock"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"timerMS"
,
pMsg
->
timerMS
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%p"
,
pMsg
->
data
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
u64buf
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncTimeout"
,
pRoot
);
return
pJson
;
}
char
*
syncTimeout2Str
(
const
SyncTimeout
*
pMsg
)
{
cJSON
*
pJson
=
syncTimeout2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
// for debug ----------------------
void
syncTimeoutPrint
(
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
printf
(
"syncTimeoutPrint | len:%zu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutPrint2
(
char
*
s
,
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
printf
(
"syncTimeoutPrint2 | len:%d | %s | %s
\n
"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutLog
(
const
SyncTimeout
*
pMsg
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
sTrace
(
"syncTimeoutLog | len:%d | %s"
,
(
int32_t
)
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
syncTimeoutLog2
(
char
*
s
,
const
SyncTimeout
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncTimeout2Str
(
pMsg
);
sTrace
(
"syncTimeoutLog2 | len:%d | %s | %s"
,
(
int32_t
)
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录