Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3cece8da
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3cece8da
编写于
12月 07, 2021
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'hotfix/TS-787' of github.com:taosdata/TDengine into hotfix/TS-787
上级
e4c47677
df17d3c6
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
142 addition
and
38 deletion
+142
-38
src/os/inc/osSocket.h
src/os/inc/osSocket.h
+11
-8
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+116
-26
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+11
-3
src/sync/src/syncTcp.c
src/sync/src/syncTcp.c
+3
-1
src/util/src/ttimer.c
src/util/src/ttimer.c
+1
-0
未找到文件。
src/os/inc/osSocket.h
浏览文件 @
3cece8da
...
...
@@ -15,7 +15,6 @@
#ifndef TDENGINE_OS_SOCKET_H
#define TDENGINE_OS_SOCKET_H
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -33,13 +32,17 @@ extern "C" {
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosCloseSocketNoCheck(x) close(x)
#define taosCloseSocket(x) \
{ \
if (FD_VALID(x)) { \
close(x); \
x = FD_INITIALIZER; \
} \
}
static
FORCE_INLINE
int32_t
taosCloseSocket
(
int
x
)
{
int32_t
ret
=
0
;
if
(
x
>
STDERR_FILENO
)
{
ret
=
close
(
x
);
x
=
((
int32_t
)
-
1
);
}
if
(
ret
!=
0
)
{
assert
(
false
);
}
return
ret
;
}
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
...
...
src/sync/src/syncMain.c
浏览文件 @
3cece8da
...
...
@@ -271,7 +271,11 @@ void syncStop(int64_t rid) {
sInfo
(
"vgId:%d, cleanup sync"
,
pNode
->
vgId
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
if
(
tsVgIdHash
)
taosHashRemove
(
tsVgIdHash
,
&
pNode
->
vgId
,
sizeof
(
int32_t
));
if
(
pNode
->
pFwdTimer
)
taosTmrStop
(
pNode
->
pFwdTimer
);
...
...
@@ -285,7 +289,10 @@ void syncStop(int64_t rid) {
pPeer
=
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
];
if
(
pPeer
)
syncRemovePeer
(
pPeer
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleaseNode
(
pNode
);
taosRemoveRef
(
tsNodeRefId
,
rid
);
...
...
@@ -300,7 +307,11 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
sInfo
(
"vgId:%d, reconfig, role:%s replica:%d old:%d"
,
pNode
->
vgId
,
syncRole
[
nodeRole
],
pNewCfg
->
replica
,
pNode
->
replica
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncStopCheckPeerConn
(
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
]);
// arb
for
(
int32_t
syn_index
=
0
;
syn_index
<
pNode
->
replica
;
++
syn_index
)
{
...
...
@@ -368,7 +379,10 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
syncStartCheckPeerConn
(
pNode
->
peerInfo
[
syn_index
]);
}
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
sInfo
(
"vgId:%d, %d replicas are configured, quorum:%d"
,
pNode
->
vgId
,
pNode
->
replica
,
pNode
->
quorum
);
syncBroadcastStatus
(
pNode
);
...
...
@@ -418,7 +432,11 @@ void syncRecover(int64_t rid) {
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
(
*
pNode
->
notifyRoleFp
)(
pNode
->
vgId
,
nodeRole
);
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
nodeVersion
=
0
;
...
...
@@ -431,7 +449,10 @@ void syncRecover(int64_t rid) {
}
}
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleaseNode
(
pNode
);
}
...
...
@@ -545,7 +566,10 @@ static void syncClosePeerConn(SSyncPeer *pPeer) {
sDebug
(
"%s, pfd:%d sfd:%d will be closed"
,
pPeer
->
id
,
pPeer
->
peerFd
,
pPeer
->
syncFd
);
taosTmrStopA
(
&
pPeer
->
timer
);
taosCloseSocket
(
pPeer
->
syncFd
);
if
(
taosCloseSocket
(
pPeer
->
syncFd
)
!=
0
)
{
exit
(
false
);
}
if
(
pPeer
->
peerFd
>=
0
)
{
pPeer
->
peerFd
=
-
1
;
void
*
pConn
=
pPeer
->
pConn
;
...
...
@@ -871,7 +895,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
int32_t
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
pPeer
->
nodeId
==
0
||
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
sDebug
(
"%s, check peer connection in 1000 ms"
,
pPeer
->
id
);
taosTmrReset
(
syncCheckPeerConnection
,
SYNC_CHECK_INTERVAL
,
(
void
*
)
pPeer
->
rid
,
tsSyncTmrCtrl
,
&
pPeer
->
timer
);
if
(
!
taosTmrReset
(
syncCheckPeerConnection
,
SYNC_CHECK_INTERVAL
,
(
void
*
)
pPeer
->
rid
,
tsSyncTmrCtrl
,
&
pPeer
->
timer
))
{
assert
(
false
);
}
}
}
...
...
@@ -930,12 +956,20 @@ static void syncNotStarted(void *param, void *tmrId) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
pPeer
->
timer
=
NULL
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
sInfo
(
"%s, sync conn is still not up, restart and set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
syncRestartConnection
(
pPeer
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleasePeer
(
pPeer
);
}
...
...
@@ -947,9 +981,17 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncRecoverFromMaster
(
pPeer
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleasePeer
(
pPeer
);
}
...
...
@@ -1078,8 +1120,12 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
SSyncHead
*
pHead
=
buffer
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
int32_t
code
=
syncReadPeerMsg
(
pPeer
,
pHead
);
...
...
@@ -1095,7 +1141,10 @@ static int32_t syncProcessPeerMsg(int64_t rid, void *buffer) {
}
}
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleasePeer
(
pPeer
);
return
code
;
...
...
@@ -1188,12 +1237,19 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
sDebug
(
"%s, check peer connection"
,
pPeer
->
id
);
syncSetupPeerConnection
(
pPeer
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleasePeer
(
pPeer
);
}
...
...
@@ -1274,7 +1330,12 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
sDebug
(
"vgId:%d, sync connection is incoming, tranId:%u"
,
vgId
,
msg
.
tranId
);
SSyncNode
*
pNode
=
*
ppNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
SSyncPeer
*
pPeer
;
for
(
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
...
...
@@ -1305,7 +1366,10 @@ static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) {
}
}
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
}
static
void
syncProcessBrokenLink
(
int64_t
rid
,
int32_t
closedByApp
)
{
...
...
@@ -1314,7 +1378,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
sDebug
(
"%s, TCP link is broken since %s, pfd:%d sfd:%d closedByApp:%d"
,
pPeer
->
id
,
strerror
(
errno
),
pPeer
->
peerFd
,
pPeer
->
syncFd
,
closedByApp
);
...
...
@@ -1324,7 +1392,11 @@ static void syncProcessBrokenLink(int64_t rid, int32_t closedByApp) {
}
syncRestartConnection
(
pPeer
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
syncReleasePeer
(
pPeer
);
}
...
...
@@ -1428,7 +1500,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
int64_t
syn_time
=
taosGetTimestampMs
();
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
for
(
int32_t
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
SYNC_MAX_FWDS
;
if
(
ABS
(
syn_time
-
pFwdInfo
->
time
)
<
10000
)
break
;
...
...
@@ -1439,7 +1515,11 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
}
syncRemoveConfirmedFwdInfo
(
pNode
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
}
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
SYNC_FWD_TIMER
,
(
void
*
)
pNode
->
rid
,
tsSyncTmrCtrl
);
...
...
@@ -1485,7 +1565,11 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
syncBuildSyncFwdMsg
(
pSyncHead
,
pNode
->
vgId
,
sizeof
(
SWalHead
)
+
pWalHead
->
len
);
fwdLen
=
pSyncHead
->
len
+
sizeof
(
SSyncHead
);
// include the WAL and SYNC head
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
for
(
int32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
...
...
@@ -1497,7 +1581,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if
(
code
>=
0
)
{
code
=
1
;
}
else
{
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
return
code
;
}
}
...
...
@@ -1513,7 +1600,10 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
}
}
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
return
code
;
}
src/sync/src/syncRestore.c
浏览文件 @
3cece8da
...
...
@@ -144,7 +144,11 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
forwards
++
;
}
pthread_mutex_lock
(
&
pNode
->
mutex
);
int
ret
=
0
;
if
((
ret
=
pthread_mutex_lock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
while
(
forwards
<
pRecv
->
forwards
&&
pRecv
->
code
==
0
)
{
offset
=
syncProcessOneBufferedFwd
(
pPeer
,
offset
);
...
...
@@ -154,7 +158,10 @@ static int32_t syncProcessBufferedFwd(SSyncPeer *pPeer) {
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
sDebug
(
"%s, finish processing buffered fwds:%d"
,
pPeer
->
id
,
forwards
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
if
((
ret
=
pthread_mutex_unlock
(
&
pNode
->
mutex
))
!=
0
)
{
sFatal
(
"%d:: vgId:%d, failed to lock pNode->mutex"
,
__LINE__
,
pNode
->
vgId
);
exit
(
ret
);
};
return
pRecv
->
code
;
}
...
...
@@ -286,7 +293,7 @@ void *syncRestoreData(void *param) {
}
else
{
if
(
syncRestoreDataStepByStep
(
pPeer
)
==
0
)
{
sInfo
(
"%s, it is synced successfully"
,
pPeer
->
id
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
syncBroadcastStatus
(
pNode
);
}
else
{
sError
(
"%s, failed to restore data, restart connection"
,
pPeer
->
id
);
...
...
@@ -310,3 +317,4 @@ void *syncRestoreData(void *param) {
return
NULL
;
}
\ No newline at end of file
src/sync/src/syncTcp.c
浏览文件 @
3cece8da
...
...
@@ -168,7 +168,9 @@ void syncFreeTcpConn(void *param) {
sDebug
(
"%p TCP connection will be closed, fd:%d"
,
pThread
,
pConn
->
fd
);
pConn
->
closedByApp
=
1
;
shutdown
(
pConn
->
fd
,
SHUT_WR
);
if
(
shutdown
(
pConn
->
fd
,
SHUT_WR
)
!=
0
)
{
ASSERT
(
false
);
}
}
static
void
taosProcessBrokenLink
(
SConnObj
*
pConn
)
{
...
...
src/util/src/ttimer.c
浏览文件 @
3cece8da
...
...
@@ -445,6 +445,7 @@ bool taosTmrStop(tmr_h timerId) {
bool
taosTmrStopA
(
tmr_h
*
timerId
)
{
bool
ret
=
taosTmrStop
(
*
timerId
);
*
timerId
=
NULL
;
return
ret
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录