Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
11a5fe27
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
11a5fe27
编写于
11月 23, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
11月 23, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4321 from taosdata/feature/wal
TD-2157
上级
3a94d126
aae38f8b
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
110 addition
and
50 deletion
+110
-50
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+1
-0
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+48
-28
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+29
-10
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+30
-10
src/util/src/tsocket.c
src/util/src/tsocket.c
+2
-2
未找到文件。
src/sync/inc/syncInt.h
浏览文件 @
11a5fe27
...
...
@@ -172,6 +172,7 @@ typedef struct SSyncNode {
// sync module global
extern
int32_t
tsSyncNum
;
extern
char
tsNodeFqdn
[
TSDB_FQDN_LEN
];
extern
char
*
syncStatus
[];
void
*
syncRetrieveData
(
void
*
param
);
void
*
syncRestoreData
(
void
*
param
);
...
...
src/sync/src/syncMain.c
浏览文件 @
11a5fe27
...
...
@@ -73,6 +73,14 @@ char* syncRole[] = {
"master"
};
char
*
syncStatus
[]
=
{
"init"
,
"start"
,
"file"
,
"cache"
,
"invalid"
};
typedef
enum
{
SYNC_STATUS_BROADCAST
,
SYNC_STATUS_BROADCAST_RSP
,
...
...
@@ -282,7 +290,7 @@ void syncStop(int64_t rid) {
pPeer
=
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
];
if
(
pPeer
)
syncRemovePeer
(
pPeer
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
taosReleaseRef
(
tsSyncRefId
,
rid
);
taosRemoveRef
(
tsSyncRefId
,
rid
);
...
...
@@ -350,7 +358,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
sInfo
(
"vgId:%d, %d replicas are configured, quorum:%d"
,
pNode
->
vgId
,
pNode
->
replica
,
pNode
->
quorum
);
syncBroadcastStatus
(
pNode
);
...
...
@@ -423,7 +431,7 @@ void syncRecover(int64_t rid) {
}
}
pthread_mutex_unlock(&
(pNode->mutex)
);
pthread_mutex_unlock(&
pNode->mutex
);
taosReleaseRef(tsSyncRefId, rid);
}
...
...
@@ -498,6 +506,8 @@ int32_t syncDecPeerRef(SSyncPeer *pPeer) {
}
static
void
syncClosePeerConn
(
SSyncPeer
*
pPeer
)
{
sDebug
(
"%s, pfd:%d sfd:%d will be closed"
,
pPeer
->
id
,
pPeer
->
peerFd
,
pPeer
->
syncFd
);
taosTmrStopA
(
&
pPeer
->
timer
);
taosClose
(
pPeer
->
syncFd
);
if
(
pPeer
->
peerFd
>=
0
)
{
...
...
@@ -751,7 +761,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
sDebug
(
"vgId:%d, choose master"
,
pNode
->
vgId
);
syncChooseMaster
(
pNode
);
}
else
{
sDebug
(
"vgId:%d,
version inconsistent, cannot choose master
"
,
pNode
->
vgId
);
sDebug
(
"vgId:%d,
cannot choose master since roles inconsistent
"
,
pNode
->
vgId
);
}
}
...
...
@@ -770,11 +780,12 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus* peersStatus, int8_t new
}
static
void
syncRestartPeer
(
SSyncPeer
*
pPeer
)
{
sDebug
(
"%s, restart peer connection
"
,
pPeer
->
id
);
sDebug
(
"%s, restart peer connection
, last sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]
);
syncClosePeerConn
(
pPeer
);
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
sDebug
(
"%s, peer conn is restart and set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
int32_t
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
...
...
@@ -803,7 +814,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
}
if
(
pPeer
->
sstatus
!=
TAOS_SYNC_STATUS_INIT
)
{
sDebug
(
"%s, sync is already started
"
,
pPeer
->
id
);
sDebug
(
"%s, sync is already started
for sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]
);
return
;
// already started
}
...
...
@@ -821,7 +832,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
syncDecPeerRef
(
pPeer
);
}
else
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_START
;
sDebug
(
"%s, thread is created to retrieve data
"
,
pPeer
->
id
);
sDebug
(
"%s, thread is created to retrieve data
, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]
);
}
}
...
...
@@ -831,9 +842,10 @@ static void syncNotStarted(void *param, void *tmrId) {
pthread_mutex_lock
(
&
pNode
->
mutex
);
pPeer
->
timer
=
NULL
;
sInfo
(
"%s, sync connection is still not up, restart"
,
pPeer
->
id
);
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
sInfo
(
"%s, sync conn is still not up, restart and set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
syncRestartConnection
(
pPeer
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
}
static
void
syncTryRecoverFromMaster
(
void
*
param
,
void
*
tmrId
)
{
...
...
@@ -842,14 +854,14 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_lock
(
&
pNode
->
mutex
);
syncRecoverFromMaster
(
pPeer
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
}
static
void
syncRecoverFromMaster
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
sDebug
(
"%s, sync is already started
, status:%d"
,
pPeer
->
id
,
nodeSStatus
);
sDebug
(
"%s, sync is already started
since sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]
);
return
;
}
...
...
@@ -877,7 +889,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
sError
(
"%s, failed to send sync-req to peer"
,
pPeer
->
id
);
}
else
{
nodeSStatus
=
TAOS_SYNC_STATUS_START
;
sInfo
(
"%s, sync-req is sent
"
,
pPeer
->
id
);
sInfo
(
"%s, sync-req is sent
to peer, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]
);
}
}
...
...
@@ -915,7 +927,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
syncSaveIntoBuffer
(
pPeer
,
pHead
);
}
else
{
sError
(
"%s, forward discarded, hver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
sError
(
"%s, forward discarded since sstatus:%s, hver:%"
PRIu64
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
],
pHead
->
version
);
}
}
}
...
...
@@ -924,8 +937,9 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SPeersStatus
*
pPeersStatus
=
(
SPeersStatus
*
)
cont
;
sDebug
(
"%s, status msg is received, self:%s sver:%"
PRIu64
" peer:%s sver:%"
PRIu64
", ack:%d tranId:%u type:%s"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
nodeVersion
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
statusType
[
pPeersStatus
->
type
]);
sDebug
(
"%s, status is received, self:%s:%s:%"
PRIu64
", peer:%s:%"
PRIu64
", ack:%d tranId:%u type:%s pfd:%d"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
syncStatus
[
nodeSStatus
],
nodeVersion
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
statusType
[
pPeersStatus
->
type
],
pPeer
->
peerFd
);
pPeer
->
version
=
pPeersStatus
->
version
;
syncCheckRole
(
pPeer
,
pPeersStatus
->
peersStatus
,
pPeersStatus
->
role
);
...
...
@@ -982,7 +996,7 @@ static int32_t syncProcessPeerMsg(void *param, void *buffer) {
}
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
return
code
;
}
...
...
@@ -1014,8 +1028,10 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type
int32_t
retLen
=
taosWriteMsg
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
);
if
(
retLen
==
statusMsgLen
)
{
sDebug
(
"%s, status msg is sent, self:%s sver:%"
PRIu64
", ack:%d tranId:%u type:%s"
,
pPeer
->
id
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
statusType
[
pPeersStatus
->
type
]);
sDebug
(
"%s, status is sent, self:%s:%s:%"
PRIu64
", peer:%s:%s:%"
PRIu64
", ack:%d tranId:%u type:%s pfd:%d"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
syncStatus
[
nodeSStatus
],
nodeVersion
,
syncRole
[
pPeer
->
role
],
syncStatus
[
pPeer
->
sstatus
],
pPeer
->
version
,
pPeersStatus
->
ack
,
pPeersStatus
->
tranId
,
statusType
[
pPeersStatus
->
type
],
pPeer
->
peerFd
);
}
else
{
sDebug
(
"%s, failed to send status msg, restart"
,
pPeer
->
id
);
syncRestartConnection
(
pPeer
);
...
...
@@ -1048,7 +1064,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt
.
sourceId
=
pNode
->
vgId
;
// tell arbitrator its vgId
if
(
taosWriteMsg
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
==
sizeof
(
firstPkt
))
{
sDebug
(
"%s, connection to peer server is setup
"
,
pPeer
->
i
d
);
sDebug
(
"%s, connection to peer server is setup
, pfd:%d sfd:%d"
,
pPeer
->
id
,
connFd
,
pPeer
->
syncF
d
);
pPeer
->
peerFd
=
connFd
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
...
...
@@ -1069,7 +1085,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId) {
sDebug
(
"%s, check peer connection"
,
pPeer
->
id
);
syncSetupPeerConnection
(
pPeer
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
}
static
void
syncCreateRestoreDataThread
(
SSyncPeer
*
pPeer
)
{
...
...
@@ -1135,7 +1151,8 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pPeer
->
syncFd
=
connFd
;
syncCreateRestoreDataThread
(
pPeer
);
}
else
{
sDebug
(
"%s, TCP connection is already up, close one"
,
pPeer
->
id
);
sDebug
(
"%s, TCP connection is already up(pfd:%d), close one, new pfd:%d sfd:%d"
,
pPeer
->
id
,
pPeer
->
peerFd
,
connFd
,
pPeer
->
syncFd
);
syncClosePeerConn
(
pPeer
);
pPeer
->
peerFd
=
connFd
;
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
...
...
@@ -1145,7 +1162,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
}
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
}
static
void
syncProcessBrokenLink
(
void
*
param
)
{
...
...
@@ -1156,14 +1173,14 @@ static void syncProcessBrokenLink(void *param) {
if
(
taosAcquireRef
(
tsSyncRefId
,
pNode
->
rid
)
==
NULL
)
return
;
pthread_mutex_lock
(
&
pNode
->
mutex
);
sDebug
(
"%s, TCP link is broken since %s
"
,
pPeer
->
id
,
strerror
(
errno
)
);
sDebug
(
"%s, TCP link is broken since %s
, pfd:%d sfd:%d"
,
pPeer
->
id
,
strerror
(
errno
),
pPeer
->
peerFd
,
pPeer
->
syncFd
);
pPeer
->
peerFd
=
-
1
;
if
(
syncDecPeerRef
(
pPeer
)
!=
0
)
{
syncRestartConnection
(
pPeer
);
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
taosReleaseRef
(
tsSyncRefId
,
pNode
->
rid
);
}
...
...
@@ -1239,10 +1256,13 @@ static void syncMonitorNodeRole(void *param, void *tmrId) {
if
(
index
==
pNode
->
selfIndex
)
continue
;
SSyncPeer
*
pPeer
=
pNode
->
peerInfo
[
index
];
if
(
pPeer
->
role
>
TAOS_SYNC_ROLE_UNSYNCED
&&
nodeRole
>
TAOS_SYNC_ROLE_UNSYNCED
)
continue
;
if
(
pPeer
->
sstatus
>
TAOS_SYNC_STATUS_INIT
||
nodeSStatus
>
TAOS_SYNC_STATUS_INIT
)
continue
;
if
(
/*pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && */
nodeRole
>
TAOS_SYNC_ROLE_UNSYNCED
)
continue
;
if
(
/*pPeer->sstatus > TAOS_SYNC_STATUS_INIT || */
nodeSStatus
>
TAOS_SYNC_STATUS_INIT
)
continue
;
sDebug
(
"%s, check roles since self:%s sstatus:%s, peer:%s sstatus:%s"
,
pPeer
->
id
,
syncRole
[
pPeer
->
role
],
syncStatus
[
pPeer
->
sstatus
],
syncRole
[
nodeRole
],
syncStatus
[
nodeSStatus
]);
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
,
SYNC_STATUS_CHECK_ROLE
,
syncGenTranId
());
break
;
}
pNode
->
pRoleTimer
=
taosTmrStart
(
syncMonitorNodeRole
,
SYNC_ROLE_TIMER
,
(
void
*
)
pNode
->
rid
,
tsSyncTmrCtrl
);
...
...
@@ -1271,7 +1291,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
}
syncRemoveConfirmedFwdInfo
(
pNode
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
}
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
SYNC_FWD_TIMER
,
(
void
*
)
pNode
->
rid
,
tsSyncTmrCtrl
);
...
...
@@ -1339,7 +1359,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
}
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
)
);
pthread_mutex_unlock
(
&
pNode
->
mutex
);
return
code
;
}
...
...
src/sync/src/syncRestore.c
浏览文件 @
11a5fe27
...
...
@@ -65,7 +65,10 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
while
(
1
)
{
// read file info
int32_t
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
minfo
),
sizeof
(
minfo
));
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to read file info while restore file since %s"
,
pPeer
->
id
,
strerror
(
errno
));
break
;
}
// if no more file from master, break;
if
(
minfo
.
name
[
0
]
==
0
||
minfo
.
magic
==
0
)
{
...
...
@@ -83,7 +86,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// check the file info
sinfo
=
minfo
;
sDebug
(
"%s, get file
info:%s"
,
pPeer
->
id
,
minfo
.
nam
e
);
sDebug
(
"%s, get file
:%s info size:%"
PRId64
,
pPeer
->
id
,
minfo
.
name
,
minfo
.
siz
e
);
sinfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
sinfo
.
name
,
&
sinfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
sinfo
.
size
,
&
sinfo
.
fversion
);
...
...
@@ -92,8 +95,11 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
fileAck
.
sync
=
(
sinfo
.
magic
!=
minfo
.
magic
||
sinfo
.
name
[
0
]
==
0
)
?
1
:
0
;
// send file ack
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileAck
),
sizeof
(
fileAck
));
if
(
ret
<
0
)
break
;
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
fileAck
));
if
(
ret
<
0
)
{
sError
(
"%s, failed to write file:%s ack while restore file since %s"
,
pPeer
->
id
,
minfo
.
name
,
strerror
(
errno
));
break
;
}
// if sync is not required, continue
if
(
fileAck
.
sync
==
0
)
{
...
...
@@ -108,14 +114,17 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
int32_t
dfd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_TRUNC
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
dfd
<
0
)
{
sError
(
"%s, failed to open file:%s
"
,
pPeer
->
id
,
name
);
sError
(
"%s, failed to open file:%s
while restore file since %s"
,
pPeer
->
id
,
minfo
.
name
,
strerror
(
errno
)
);
break
;
}
ret
=
taosCopyFds
(
pPeer
->
syncFd
,
dfd
,
minfo
.
size
);
fsync
(
dfd
);
close
(
dfd
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to copy file:%s while restore file since %s"
,
pPeer
->
id
,
minfo
.
name
,
strerror
(
errno
));
break
;
}
fileChanged
=
true
;
sDebug
(
"%s, %s is received, size:%"
PRId64
,
pPeer
->
id
,
minfo
.
name
,
minfo
.
size
);
...
...
@@ -125,6 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// data file is changed, code shall be set to 1
*
fversion
=
minfo
.
fversion
;
code
=
1
;
sDebug
(
"%s, file changed while restore file"
,
pPeer
->
id
);
}
if
(
code
<
0
)
{
...
...
@@ -146,15 +156,22 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
while
(
1
)
{
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to read walhead while restore wal since %s"
,
pPeer
->
id
,
strerror
(
errno
));
break
;
}
if
(
pHead
->
len
==
0
)
{
sDebug
(
"%s, wal is synced over"
,
pPeer
->
id
);
code
=
0
;
break
;
}
// wal sync over
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to read walcont, len:%d while restore wal since %s"
,
pPeer
->
id
,
pHead
->
len
,
strerror
(
errno
));
break
;
}
sTrace
(
"%s, restore a record, qtype:wal len:%d hver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
len
,
pHead
->
version
);
...
...
@@ -267,7 +284,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
nodeSStatus
=
TAOS_SYNC_STATUS_FILE
;
uint64_t
fversion
=
0
;
sDebug
(
"%s, start to restore file
"
,
pPeer
->
id
);
sDebug
(
"%s, start to restore file
, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]
);
int32_t
code
=
syncRestoreFile
(
pPeer
,
&
fversion
);
if
(
code
<
0
)
{
sError
(
"%s, failed to restore file"
,
pPeer
->
id
);
...
...
@@ -291,7 +308,7 @@ static int32_t syncRestoreDataStepByStep(SSyncPeer *pPeer) {
}
nodeSStatus
=
TAOS_SYNC_STATUS_CACHE
;
sDebug
(
"%s, start to insert buffered points
"
,
pPeer
->
id
);
sDebug
(
"%s, start to insert buffered points
, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]
);
if
(
syncProcessBufferedFwd
(
pPeer
)
<
0
)
{
sError
(
"%s, failed to insert buffered points"
,
pPeer
->
id
);
return
-
1
;
...
...
@@ -327,6 +344,8 @@ void *syncRestoreData(void *param) {
(
*
pNode
->
notifyRole
)(
pNode
->
vgId
,
nodeRole
);
nodeSStatus
=
TAOS_SYNC_STATUS_INIT
;
sInfo
(
"%s, sync over, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
nodeSStatus
]);
taosClose
(
pPeer
->
syncFd
);
syncCloseRecvBuffer
(
pNode
);
__sync_fetch_and_sub
(
&
tsSyncNum
,
1
);
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
11a5fe27
...
...
@@ -114,7 +114,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file info
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to write file:%s info while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
// if no file anymore, break
if
(
fileInfo
.
magic
==
0
||
fileInfo
.
name
[
0
]
==
0
)
{
...
...
@@ -124,8 +127,11 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
}
// wait for the ack from peer
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
fileAck
),
sizeof
(
fileAck
));
if
(
ret
<
0
)
break
;
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
fileAck
,
sizeof
(
fileAck
));
if
(
ret
<
0
)
{
sError
(
"%s, failed to read file:%s ack while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
// set the peer sync version
pPeer
->
sversion
=
fileInfo
.
fversion
;
...
...
@@ -134,7 +140,10 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
snprintf
(
name
,
sizeof
(
name
),
"%s/%s"
,
pNode
->
path
,
fileInfo
.
name
);
// add the file into watch list
if
(
syncAddIntoWatchList
(
pPeer
,
name
)
<
0
)
break
;
if
(
syncAddIntoWatchList
(
pPeer
,
name
)
<
0
)
{
sError
(
"%s, failed to watch file:%s while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
// if sync is not required, continue
if
(
fileAck
.
sync
==
0
)
{
...
...
@@ -145,21 +154,30 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
// send the file to peer
int32_t
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
if
(
sfd
<
0
)
{
sError
(
"%s, failed to open file:%s while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
ret
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
close
(
sfd
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
{
sError
(
"%s, failed to send file:%s while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
sDebug
(
"%s, %s is sent, size:%"
PRId64
,
pPeer
->
id
,
name
,
fileInfo
.
size
);
fileInfo
.
index
++
;
// check if processed files are modified
if
(
syncAreFilesModified
(
pPeer
)
!=
0
)
break
;
if
(
syncAreFilesModified
(
pPeer
)
!=
0
)
{
sInfo
(
"%s, file:%s are modified while retrieve file since %s"
,
pPeer
->
id
,
fileInfo
.
name
,
strerror
(
errno
));
break
;
}
}
if
(
code
<
0
)
{
sError
(
"%s, failed to retrieve file
since %s"
,
pPeer
->
id
,
strerror
(
errno
)
);
sError
(
"%s, failed to retrieve file
"
,
pPeer
->
id
);
}
return
code
;
...
...
@@ -318,6 +336,7 @@ static int32_t syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index)
if
(((
event
&
IN_MODIFY
)
==
0
)
||
once
)
{
if
(
fversion
==
0
)
{
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
// start to forward pkt
sDebug
(
"%s, fversion is 0 then set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
fversion
=
nodeVersion
;
// must read data to fversion
}
}
...
...
@@ -416,8 +435,9 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) {
}
if
(
code
==
0
)
{
sInfo
(
"%s, wal retrieve is finished"
,
pPeer
->
id
);
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_CACHE
;
sInfo
(
"%s, wal retrieve is finished, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
SWalHead
walHead
;
memset
(
&
walHead
,
0
,
sizeof
(
walHead
));
code
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
walHead
,
sizeof
(
walHead
));
...
...
@@ -445,7 +465,7 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer
->
sversion
=
0
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_FILE
;
sInfo
(
"%s, start to retrieve file
"
,
pPeer
->
id
);
sInfo
(
"%s, start to retrieve file
, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]
);
if
(
syncRetrieveFile
(
pPeer
)
<
0
)
{
sError
(
"%s, failed to retrieve file"
,
pPeer
->
id
);
return
-
1
;
...
...
src/util/src/tsocket.c
浏览文件 @
11a5fe27
...
...
@@ -107,7 +107,7 @@ int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
while
(
nleft
>
0
)
{
nwritten
=
(
int32_t
)
taosWriteSocket
(
fd
,
(
char
*
)
ptr
,
(
size_t
)
nleft
);
if
(
nwritten
<=
0
)
{
if
(
errno
==
EINTR
||
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
)
if
(
errno
==
EINTR
/* || errno == EAGAIN || errno == EWOULDBLOCK */
)
continue
;
else
return
-
1
;
...
...
@@ -133,7 +133,7 @@ int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
if
(
nread
==
0
)
{
break
;
}
else
if
(
nread
<
0
)
{
if
(
errno
==
EINTR
||
errno
==
EAGAIN
||
errno
==
EWOULDBLOCK
)
{
if
(
errno
==
EINTR
/* || errno == EAGAIN || errno == EWOULDBLOCK*/
)
{
continue
;
}
else
{
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录