Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
a5fdb787
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a5fdb787
编写于
11月 10, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change int to int32_t
上级
42693517
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
208 addition
and
209 deletion
+208
-209
src/inc/tsync.h
src/inc/tsync.h
+28
-29
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+23
-23
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+103
-103
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+13
-13
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+30
-30
src/sync/src/tarbitrator.c
src/sync/src/tarbitrator.c
+11
-11
未找到文件。
src/inc/tsync.h
浏览文件 @
a5fdb787
...
@@ -51,9 +51,9 @@ typedef struct {
...
@@ -51,9 +51,9 @@ typedef struct {
}
SSyncCfg
;
}
SSyncCfg
;
typedef
struct
{
typedef
struct
{
int
selfIndex
;
int
32_t
selfIndex
;
uint32_t
nodeId
[
TAOS_SYNC_MAX_REPLICA
];
uint32_t
nodeId
[
TAOS_SYNC_MAX_REPLICA
];
int
role
[
TAOS_SYNC_MAX_REPLICA
];
int
32_t
role
[
TAOS_SYNC_MAX_REPLICA
];
}
SNodesRole
;
}
SNodesRole
;
/*
/*
...
@@ -83,25 +83,24 @@ typedef void (*FNotifyRole)(void *ahandle, int8_t role);
...
@@ -83,25 +83,24 @@ typedef void (*FNotifyRole)(void *ahandle, int8_t role);
typedef
void
(
*
FNotifyFlowCtrl
)(
void
*
ahandle
,
int32_t
mseconds
);
typedef
void
(
*
FNotifyFlowCtrl
)(
void
*
ahandle
,
int32_t
mseconds
);
// when data file is synced successfully, notity app
// when data file is synced successfully, notity app
typedef
int
(
*
FNotifyFileSynced
)(
void
*
ahandle
,
uint64_t
fversion
);
typedef
int
32_t
(
*
FNotifyFileSynced
)(
void
*
ahandle
,
uint64_t
fversion
);
typedef
struct
{
typedef
struct
{
int32_t
vgId
;
// vgroup ID
int32_t
vgId
;
// vgroup ID
uint64_t
version
;
// initial version
uint64_t
version
;
// initial version
SSyncCfg
syncCfg
;
// configuration from mgmt
SSyncCfg
syncCfg
;
// configuration from mgmt
char
path
[
128
];
// path to the file
char
path
[
128
];
// path to the file
void
*
ahandle
;
// handle provided by APP
void
*
ahandle
;
// handle provided by APP
FGetFileInfo
getFileInfo
;
FGetFileInfo
getFileInfo
;
FGetWalInfo
getWalInfo
;
FGetWalInfo
getWalInfo
;
FWriteToCache
writeToCache
;
FWriteToCache
writeToCache
;
FConfirmForward
confirmForward
;
FConfirmForward
confirmForward
;
FNotifyRole
notifyRole
;
FNotifyRole
notifyRole
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFlowCtrl
notifyFlowCtrl
;
FNotifyFileSynced
notifyFileSynced
;
FNotifyFileSynced
notifyFileSynced
;
}
SSyncInfo
;
}
SSyncInfo
;
typedef
void
*
tsync_h
;
typedef
void
*
tsync_h
;
int32_t
syncInit
();
int32_t
syncInit
();
void
syncCleanUp
();
void
syncCleanUp
();
...
@@ -109,22 +108,22 @@ void syncCleanUp();
...
@@ -109,22 +108,22 @@ void syncCleanUp();
int64_t
syncStart
(
const
SSyncInfo
*
);
int64_t
syncStart
(
const
SSyncInfo
*
);
void
syncStop
(
int64_t
rid
);
void
syncStop
(
int64_t
rid
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
);
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
);
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
pHead
,
void
*
mhandle
,
int
qtype
);
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
pHead
,
void
*
mhandle
,
int
32_t
qtype
);
void
syncConfirmForward
(
int64_t
rid
,
uint64_t
version
,
int32_t
code
);
void
syncConfirmForward
(
int64_t
rid
,
uint64_t
version
,
int32_t
code
);
void
syncRecover
(
int64_t
rid
);
// recover from other nodes:
void
syncRecover
(
int64_t
rid
);
// recover from other nodes:
int
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
);
int
32_t
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
);
extern
char
*
syncRole
[];
extern
char
*
syncRole
[];
//global configurable parameters
//global configurable parameters
extern
int
tsMaxSyncNum
;
extern
int32_t
tsMaxSyncNum
;
extern
int
tsSyncTcpThreads
;
extern
int32_t
tsSyncTcpThreads
;
extern
int
tsMaxWatchFiles
;
extern
int32_t
tsMaxWatchFiles
;
extern
int
tsSyncTimer
;
extern
int32_t
tsSyncTimer
;
extern
int
tsMaxFwdInfo
;
extern
int32_t
tsMaxFwdInfo
;
extern
int
sDebugFlag
;
extern
int32_t
sDebugFlag
;
extern
char
tsArbitrator
[];
extern
char
tsArbitrator
[];
extern
uint16_t
tsSyncPort
;
extern
uint16_t
tsSyncPort
;
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/sync/inc/syncInt.h
浏览文件 @
a5fdb787
...
@@ -89,11 +89,11 @@ typedef struct {
...
@@ -89,11 +89,11 @@ typedef struct {
#pragma pack(pop)
#pragma pack(pop)
typedef
struct
{
typedef
struct
{
char
*
buffer
;
char
*
buffer
;
int
bufferSize
;
int
32_t
bufferSize
;
char
*
offset
;
char
*
offset
;
int
forwards
;
int
32_t
forwards
;
int
code
;
int
32_t
code
;
}
SRecvBuffer
;
}
SRecvBuffer
;
typedef
struct
{
typedef
struct
{
...
@@ -107,10 +107,10 @@ typedef struct {
...
@@ -107,10 +107,10 @@ typedef struct {
}
SFwdInfo
;
}
SFwdInfo
;
typedef
struct
{
typedef
struct
{
int
first
;
int
32_t
first
;
int
last
;
int
32_t
last
;
int
fwds
;
// number of forwards
int
32_t
fwds
;
// number of forwards
SFwdInfo
fwdInfo
[];
SFwdInfo
fwdInfo
[];
}
SSyncFwds
;
}
SSyncFwds
;
typedef
struct
SsyncPeer
{
typedef
struct
SsyncPeer
{
...
@@ -123,15 +123,15 @@ typedef struct SsyncPeer {
...
@@ -123,15 +123,15 @@ typedef struct SsyncPeer {
int8_t
sstatus
;
// sync status
int8_t
sstatus
;
// sync status
uint64_t
version
;
uint64_t
version
;
uint64_t
sversion
;
// track the peer version in retrieve process
uint64_t
sversion
;
// track the peer version in retrieve process
int
syncFd
;
int
32_t
syncFd
;
int
peerFd
;
// forward FD
int
32_t
peerFd
;
// forward FD
int
numOfRetrieves
;
// number of retrieves tried
int
32_t
numOfRetrieves
;
// number of retrieves tried
int
fileChanged
;
// a flag to indicate file is changed during retrieving process
int
32_t
fileChanged
;
// a flag to indicate file is changed during retrieving process
void
*
timer
;
void
*
timer
;
void
*
pConn
;
void
*
pConn
;
int
notifyFd
;
int
32_t
notifyFd
;
int
watchNum
;
int
32_t
watchNum
;
int
*
watchFd
;
int
32_t
*
watchFd
;
int8_t
refCount
;
// reference count
int8_t
refCount
;
// reference count
struct
SSyncNode
*
pSyncNode
;
struct
SSyncNode
*
pSyncNode
;
}
SSyncPeer
;
}
SSyncPeer
;
...
@@ -161,16 +161,16 @@ typedef struct SSyncNode {
...
@@ -161,16 +161,16 @@ typedef struct SSyncNode {
}
SSyncNode
;
}
SSyncNode
;
// sync module global
// sync module global
extern
int
tsSyncNum
;
extern
int
32_t
tsSyncNum
;
extern
char
tsNodeFqdn
[
TSDB_FQDN_LEN
];
extern
char
tsNodeFqdn
[
TSDB_FQDN_LEN
];
void
*
syncRetrieveData
(
void
*
param
);
void
*
syncRetrieveData
(
void
*
param
);
void
*
syncRestoreData
(
void
*
param
);
void
*
syncRestoreData
(
void
*
param
);
int
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
);
int
32_t
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
);
void
syncRestartConnection
(
SSyncPeer
*
pPeer
);
void
syncRestartConnection
(
SSyncPeer
*
pPeer
);
void
syncBroadcastStatus
(
SSyncNode
*
pNode
);
void
syncBroadcastStatus
(
SSyncNode
*
pNode
);
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
);
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
);
int
syncDecPeerRef
(
SSyncPeer
*
pPeer
);
int
32_t
syncDecPeerRef
(
SSyncPeer
*
pPeer
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/sync/src/syncMain.c
浏览文件 @
a5fdb787
...
@@ -31,38 +31,38 @@
...
@@ -31,38 +31,38 @@
#include "syncInt.h"
#include "syncInt.h"
// global configurable
// global configurable
int
tsMaxSyncNum
=
2
;
int
32_t
tsMaxSyncNum
=
2
;
int
tsSyncTcpThreads
=
2
;
int
32_t
tsSyncTcpThreads
=
2
;
int
tsMaxWatchFiles
=
500
;
int
32_t
tsMaxWatchFiles
=
500
;
int
tsMaxFwdInfo
=
200
;
int
32_t
tsMaxFwdInfo
=
200
;
int
tsSyncTimer
=
1
;
int
32_t
tsSyncTimer
=
1
;
// module global, not configurable
// module global, not configurable
int
tsSyncNum
;
// number of sync in process in whole system
int
32_t
tsSyncNum
;
// number of sync in process in whole system
char
tsNodeFqdn
[
TSDB_FQDN_LEN
];
char
tsNodeFqdn
[
TSDB_FQDN_LEN
];
static
ttpool_h
tsTcpPool
;
static
ttpool_h
tsTcpPool
;
static
void
*
s
yncTmrCtrl
=
NULL
;
static
void
*
tsS
yncTmrCtrl
=
NULL
;
static
void
*
v
gIdHash
;
static
void
*
tsV
gIdHash
;
static
int
tsSyncRefId
=
-
1
;
static
int
32_t
tsSyncRefId
=
-
1
;
// local functions
// local functions
static
void
syncProcessSyncRequest
(
char
*
pMsg
,
SSyncPeer
*
pPeer
);
static
void
syncProcessSyncRequest
(
char
*
pMsg
,
SSyncPeer
*
pPeer
);
static
void
syncRecoverFromMaster
(
SSyncPeer
*
pPeer
);
static
void
syncRecoverFromMaster
(
SSyncPeer
*
pPeer
);
static
void
syncCheckPeerConnection
(
void
*
param
,
void
*
tmrId
);
static
void
syncCheckPeerConnection
(
void
*
param
,
void
*
tmrId
);
static
void
syncSendPeersStatusMsgToPeer
(
SSyncPeer
*
pPeer
,
char
ack
);
static
void
syncSendPeersStatusMsgToPeer
(
SSyncPeer
*
pPeer
,
char
ack
);
static
void
syncProcessBrokenLink
(
void
*
param
);
static
void
syncProcessBrokenLink
(
void
*
param
);
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
int
32_t
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
void
syncProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
);
static
void
syncProcessIncommingConnection
(
int32_t
connFd
,
uint32_t
sourceIp
);
static
void
syncRemovePeer
(
SSyncPeer
*
pPeer
);
static
void
syncRemovePeer
(
SSyncPeer
*
pPeer
);
static
void
syncAddArbitrator
(
SSyncNode
*
pNode
);
static
void
syncAddArbitrator
(
SSyncNode
*
pNode
);
static
void
syncFreeNode
(
void
*
);
static
void
syncFreeNode
(
void
*
);
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
);
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
);
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
);
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
);
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
);
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
);
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
);
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
);
static
void
syncRestartPeer
(
SSyncPeer
*
pPeer
);
static
void
syncRestartPeer
(
SSyncPeer
*
pPeer
);
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
qtyp
);
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
32_t
qtyp
);
static
SSyncPeer
*
syncAddPeer
(
SSyncNode
*
pNode
,
const
SNodeInfo
*
pInfo
);
static
SSyncPeer
*
syncAddPeer
(
SSyncNode
*
pNode
,
const
SNodeInfo
*
pInfo
);
char
*
syncRole
[]
=
{
char
*
syncRole
[]
=
{
...
@@ -90,21 +90,21 @@ int32_t syncInit() {
...
@@ -90,21 +90,21 @@ int32_t syncInit() {
return
-
1
;
return
-
1
;
}
}
s
yncTmrCtrl
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
tsS
yncTmrCtrl
=
taosTmrInit
(
1000
,
50
,
10000
,
"SYNC"
);
if
(
s
yncTmrCtrl
==
NULL
)
{
if
(
tsS
yncTmrCtrl
==
NULL
)
{
sError
(
"failed to init tmrCtrl"
);
sError
(
"failed to init tmrCtrl"
);
taosCloseTcpThreadPool
(
tsTcpPool
);
taosCloseTcpThreadPool
(
tsTcpPool
);
tsTcpPool
=
NULL
;
tsTcpPool
=
NULL
;
return
-
1
;
return
-
1
;
}
}
v
gIdHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
tsV
gIdHash
=
taosHashInit
(
TSDB_MIN_VNODES
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
true
);
if
(
v
gIdHash
==
NULL
)
{
if
(
tsV
gIdHash
==
NULL
)
{
sError
(
"failed to init
v
gIdHash"
);
sError
(
"failed to init
tsV
gIdHash"
);
taosTmrCleanUp
(
s
yncTmrCtrl
);
taosTmrCleanUp
(
tsS
yncTmrCtrl
);
taosCloseTcpThreadPool
(
tsTcpPool
);
taosCloseTcpThreadPool
(
tsTcpPool
);
tsTcpPool
=
NULL
;
tsTcpPool
=
NULL
;
s
yncTmrCtrl
=
NULL
;
tsS
yncTmrCtrl
=
NULL
;
return
-
1
;
return
-
1
;
}
}
...
@@ -126,14 +126,14 @@ void syncCleanUp() {
...
@@ -126,14 +126,14 @@ void syncCleanUp() {
tsTcpPool
=
NULL
;
tsTcpPool
=
NULL
;
}
}
if
(
s
yncTmrCtrl
)
{
if
(
tsS
yncTmrCtrl
)
{
taosTmrCleanUp
(
s
yncTmrCtrl
);
taosTmrCleanUp
(
tsS
yncTmrCtrl
);
s
yncTmrCtrl
=
NULL
;
tsS
yncTmrCtrl
=
NULL
;
}
}
if
(
v
gIdHash
)
{
if
(
tsV
gIdHash
)
{
taosHashCleanup
(
v
gIdHash
);
taosHashCleanup
(
tsV
gIdHash
);
v
gIdHash
=
NULL
;
tsV
gIdHash
=
NULL
;
}
}
taosCloseRef
(
tsSyncRefId
);
taosCloseRef
(
tsSyncRefId
);
...
@@ -176,7 +176,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
...
@@ -176,7 +176,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return
-
1
;
return
-
1
;
}
}
for
(
int
i
=
0
;
i
<
pCfg
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pCfg
->
replica
;
++
i
)
{
const
SNodeInfo
*
pNodeInfo
=
pCfg
->
nodeInfo
+
i
;
const
SNodeInfo
*
pNodeInfo
=
pCfg
->
nodeInfo
+
i
;
pNode
->
peerInfo
[
i
]
=
syncAddPeer
(
pNode
,
pNodeInfo
);
pNode
->
peerInfo
[
i
]
=
syncAddPeer
(
pNode
,
pNodeInfo
);
if
((
strcmp
(
pNodeInfo
->
nodeFqdn
,
tsNodeFqdn
)
==
0
)
&&
(
pNodeInfo
->
nodePort
==
tsSyncPort
))
{
if
((
strcmp
(
pNodeInfo
->
nodeFqdn
,
tsNodeFqdn
)
==
0
)
&&
(
pNodeInfo
->
nodePort
==
tsSyncPort
))
{
...
@@ -204,7 +204,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
...
@@ -204,7 +204,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
return
-
1
;
return
-
1
;
}
}
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
(
void
*
)
pNode
->
rid
,
s
yncTmrCtrl
);
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
(
void
*
)
pNode
->
rid
,
tsS
yncTmrCtrl
);
if
(
pNode
->
pFwdTimer
==
NULL
)
{
if
(
pNode
->
pFwdTimer
==
NULL
)
{
sError
(
"vgId:%d, failed to allocate timer"
,
pNode
->
vgId
);
sError
(
"vgId:%d, failed to allocate timer"
,
pNode
->
vgId
);
syncStop
(
pNode
->
rid
);
syncStop
(
pNode
->
rid
);
...
@@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
...
@@ -212,7 +212,7 @@ int64_t syncStart(const SSyncInfo *pInfo) {
}
}
syncAddArbitrator
(
pNode
);
syncAddArbitrator
(
pNode
);
taosHashPut
(
v
gIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pNode
),
sizeof
(
SSyncNode
*
));
taosHashPut
(
tsV
gIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
),
(
char
*
)(
&
pNode
),
sizeof
(
SSyncNode
*
));
if
(
pNode
->
notifyRole
)
{
if
(
pNode
->
notifyRole
)
{
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
...
@@ -231,10 +231,10 @@ void syncStop(int64_t rid) {
...
@@ -231,10 +231,10 @@ void syncStop(int64_t rid) {
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
if
(
vgIdHash
)
taosHashRemove
(
v
gIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
));
if
(
tsVgIdHash
)
taosHashRemove
(
tsV
gIdHash
,
(
const
char
*
)
&
pNode
->
vgId
,
sizeof
(
int32_t
));
if
(
pNode
->
pFwdTimer
)
taosTmrStop
(
pNode
->
pFwdTimer
);
if
(
pNode
->
pFwdTimer
)
taosTmrStop
(
pNode
->
pFwdTimer
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
if
(
pPeer
)
syncRemovePeer
(
pPeer
);
if
(
pPeer
)
syncRemovePeer
(
pPeer
);
}
}
...
@@ -249,7 +249,7 @@ void syncStop(int64_t rid) {
...
@@ -249,7 +249,7 @@ void syncStop(int64_t rid) {
}
}
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
)
{
int32_t
syncReconfig
(
int64_t
rid
,
const
SSyncCfg
*
pNewCfg
)
{
int
i
,
j
;
int
32_t
i
,
j
;
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
if
(
pNode
==
NULL
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
if
(
pNode
==
NULL
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
...
@@ -321,7 +321,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
...
@@ -321,7 +321,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg *pNewCfg) {
return
0
;
return
0
;
}
}
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
{
int32_t
syncForwardToPeer
(
int64_t
rid
,
void
*
data
,
void
*
mhandle
,
int
32_t
qtype
)
{
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
if
(
pNode
==
NULL
)
return
0
;
if
(
pNode
==
NULL
)
return
0
;
...
@@ -348,8 +348,8 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
...
@@ -348,8 +348,8 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
pFwdRsp
->
version
=
version
;
pFwdRsp
->
version
=
version
;
pFwdRsp
->
code
=
code
;
pFwdRsp
->
code
=
code
;
int
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int
32_t
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
msgLen
);
int
32_t
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
msgLen
);
if
(
retLen
==
msgLen
)
{
if
(
retLen
==
msgLen
)
{
sDebug
(
"%s, forward-rsp is sent, ver:%"
PRIu64
,
pPeer
->
id
,
version
);
sDebug
(
"%s, forward-rsp is sent, ver:%"
PRIu64
,
pPeer
->
id
,
version
);
...
@@ -377,7 +377,7 @@ void syncRecover(int64_t rid) {
...
@@ -377,7 +377,7 @@ void syncRecover(int64_t rid) {
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
(
SSyncPeer
*
)
pNode
->
peerInfo
[
i
];
pPeer
=
(
SSyncPeer
*
)
pNode
->
peerInfo
[
i
];
if
(
pPeer
->
peerFd
>=
0
)
{
if
(
pPeer
->
peerFd
>=
0
)
{
syncRestartConnection
(
pPeer
);
syncRestartConnection
(
pPeer
);
...
@@ -389,12 +389,12 @@ void syncRecover(int64_t rid) {
...
@@ -389,12 +389,12 @@ void syncRecover(int64_t rid) {
taosReleaseRef
(
tsSyncRefId
,
rid
);
taosReleaseRef
(
tsSyncRefId
,
rid
);
}
}
int
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
pNodesRole
)
{
int
32_t
syncGetNodesRole
(
int64_t
rid
,
SNodesRole
*
pNodesRole
)
{
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
SSyncNode
*
pNode
=
taosAcquireRef
(
tsSyncRefId
,
rid
);
if
(
pNode
==
NULL
)
return
-
1
;
if
(
pNode
==
NULL
)
return
-
1
;
pNodesRole
->
selfIndex
=
pNode
->
selfIndex
;
pNodesRole
->
selfIndex
=
pNode
->
selfIndex
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pNodesRole
->
nodeId
[
i
]
=
pNode
->
peerInfo
[
i
]
->
nodeId
;
pNodesRole
->
nodeId
[
i
]
=
pNode
->
peerInfo
[
i
]
->
nodeId
;
pNodesRole
->
role
[
i
]
=
pNode
->
peerInfo
[
i
]
->
role
;
pNodesRole
->
role
[
i
]
=
pNode
->
peerInfo
[
i
]
->
role
;
}
}
...
@@ -416,7 +416,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
...
@@ -416,7 +416,7 @@ static void syncAddArbitrator(SSyncNode *pNode) {
SNodeInfo
nodeInfo
;
SNodeInfo
nodeInfo
;
nodeInfo
.
nodeId
=
0
;
nodeInfo
.
nodeId
=
0
;
int
ret
=
taosGetFqdnPortFromEp
(
tsArbitrator
,
nodeInfo
.
nodeFqdn
,
&
nodeInfo
.
nodePort
);
int
32_t
ret
=
taosGetFqdnPortFromEp
(
tsArbitrator
,
nodeInfo
.
nodeFqdn
,
&
nodeInfo
.
nodePort
);
if
(
-
1
==
ret
)
{
if
(
-
1
==
ret
)
{
nodeInfo
.
nodePort
=
tsArbitratorPort
;
nodeInfo
.
nodePort
=
tsArbitratorPort
;
}
}
...
@@ -444,7 +444,7 @@ static void syncFreeNode(void *param) {
...
@@ -444,7 +444,7 @@ static void syncFreeNode(void *param) {
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
)
{
atomic_add_fetch_8
(
&
pPeer
->
refCount
,
1
);
}
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
)
{
atomic_add_fetch_8
(
&
pPeer
->
refCount
,
1
);
}
int
syncDecPeerRef
(
SSyncPeer
*
pPeer
)
{
int
32_t
syncDecPeerRef
(
SSyncPeer
*
pPeer
)
{
if
(
atomic_sub_fetch_8
(
&
pPeer
->
refCount
,
1
)
==
0
)
{
if
(
atomic_sub_fetch_8
(
&
pPeer
->
refCount
,
1
)
==
0
)
{
taosReleaseRef
(
tsSyncRefId
,
pPeer
->
pSyncNode
->
rid
);
taosReleaseRef
(
tsSyncRefId
,
pPeer
->
pSyncNode
->
rid
);
...
@@ -495,12 +495,12 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
...
@@ -495,12 +495,12 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer
->
refCount
=
1
;
pPeer
->
refCount
=
1
;
sInfo
(
"%s, it is configured"
,
pPeer
->
id
);
sInfo
(
"%s, it is configured"
,
pPeer
->
id
);
int
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
int
32_t
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
pPeer
->
nodeId
==
0
||
(
ret
>
0
)
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
if
(
pPeer
->
nodeId
==
0
||
(
ret
>
0
)
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
int32_t
checkMs
=
100
+
(
pNode
->
vgId
*
10
)
%
100
;
int32_t
checkMs
=
100
+
(
pNode
->
vgId
*
10
)
%
100
;
if
(
pNode
->
vgId
>
1
)
checkMs
=
tsStatusInterval
*
2000
+
checkMs
;
if
(
pNode
->
vgId
>
1
)
checkMs
=
tsStatusInterval
*
2000
+
checkMs
;
sDebug
(
"%s, start to check peer connection after %d ms"
,
pPeer
->
id
,
checkMs
);
sDebug
(
"%s, start to check peer connection after %d ms"
,
pPeer
->
id
,
checkMs
);
taosTmrReset
(
syncCheckPeerConnection
,
checkMs
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncCheckPeerConnection
,
checkMs
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
}
}
taosAcquireRef
(
tsSyncRefId
,
pNode
->
rid
);
taosAcquireRef
(
tsSyncRefId
,
pNode
->
rid
);
...
@@ -510,7 +510,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
...
@@ -510,7 +510,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
{
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
{
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
i
==
pNode
->
selfIndex
)
continue
;
if
(
i
==
pNode
->
selfIndex
)
continue
;
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
);
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
);
...
@@ -518,7 +518,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
...
@@ -518,7 +518,7 @@ void syncBroadcastStatus(SSyncNode *pNode) {
}
}
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pNode
->
peerInfo
[
i
]
->
numOfRetrieves
=
0
;
pNode
->
peerInfo
[
i
]
->
numOfRetrieves
=
0
;
}
}
...
@@ -529,13 +529,13 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
...
@@ -529,13 +529,13 @@ static void syncResetFlowCtrl(SSyncNode *pNode) {
static
void
syncChooseMaster
(
SSyncNode
*
pNode
)
{
static
void
syncChooseMaster
(
SSyncNode
*
pNode
)
{
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
int
onlineNum
=
0
;
int
32_t
onlineNum
=
0
;
int
index
=
-
1
;
int
32_t
index
=
-
1
;
int
replica
=
pNode
->
replica
;
int
32_t
replica
=
pNode
->
replica
;
sDebug
(
"vgId:%d, choose master"
,
pNode
->
vgId
);
sDebug
(
"vgId:%d, choose master"
,
pNode
->
vgId
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
pNode
->
peerInfo
[
i
]
->
role
!=
TAOS_SYNC_ROLE_OFFLINE
)
{
if
(
pNode
->
peerInfo
[
i
]
->
role
!=
TAOS_SYNC_ROLE_OFFLINE
)
{
onlineNum
++
;
onlineNum
++
;
}
}
...
@@ -544,7 +544,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
...
@@ -544,7 +544,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
if
(
onlineNum
==
pNode
->
replica
)
{
if
(
onlineNum
==
pNode
->
replica
)
{
// if all peers are online, peer with highest version shall be master
// if all peers are online, peer with highest version shall be master
index
=
0
;
index
=
0
;
for
(
int
i
=
1
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
1
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
pNode
->
peerInfo
[
i
]
->
version
>
pNode
->
peerInfo
[
index
]
->
version
)
{
if
(
pNode
->
peerInfo
[
i
]
->
version
>
pNode
->
peerInfo
[
index
]
->
version
)
{
index
=
i
;
index
=
i
;
}
}
...
@@ -560,7 +560,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
...
@@ -560,7 +560,7 @@ static void syncChooseMaster(SSyncNode *pNode) {
if
(
index
<
0
&&
onlineNum
>
replica
/
2
.
0
)
{
if
(
index
<
0
&&
onlineNum
>
replica
/
2
.
0
)
{
// over half of nodes are online
// over half of nodes are online
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
// slave with highest version shall be master
// slave with highest version shall be master
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
if
(
pPeer
->
role
==
TAOS_SYNC_ROLE_SLAVE
||
pPeer
->
role
==
TAOS_SYNC_ROLE_MASTER
)
{
if
(
pPeer
->
role
==
TAOS_SYNC_ROLE_SLAVE
||
pPeer
->
role
==
TAOS_SYNC_ROLE_MASTER
)
{
...
@@ -587,11 +587,11 @@ static void syncChooseMaster(SSyncNode *pNode) {
...
@@ -587,11 +587,11 @@ static void syncChooseMaster(SSyncNode *pNode) {
}
}
static
SSyncPeer
*
syncCheckMaster
(
SSyncNode
*
pNode
)
{
static
SSyncPeer
*
syncCheckMaster
(
SSyncNode
*
pNode
)
{
int
onlineNum
=
0
;
int
32_t
onlineNum
=
0
;
int
index
=
-
1
;
int
32_t
index
=
-
1
;
int
replica
=
pNode
->
replica
;
int
32_t
replica
=
pNode
->
replica
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
pNode
->
peerInfo
[
i
]
->
role
!=
TAOS_SYNC_ROLE_OFFLINE
)
{
if
(
pNode
->
peerInfo
[
i
]
->
role
!=
TAOS_SYNC_ROLE_OFFLINE
)
{
onlineNum
++
;
onlineNum
++
;
}
}
...
@@ -612,7 +612,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
...
@@ -612,7 +612,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
sInfo
(
"vgId:%d, change to unsynced state, online:%d replica:%d"
,
pNode
->
vgId
,
onlineNum
,
replica
);
sInfo
(
"vgId:%d, change to unsynced state, online:%d replica:%d"
,
pNode
->
vgId
,
onlineNum
,
replica
);
}
}
}
else
{
}
else
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
if
(
pTemp
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
continue
;
if
(
pTemp
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
continue
;
if
(
index
<
0
)
{
if
(
index
<
0
)
{
...
@@ -631,9 +631,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
...
@@ -631,9 +631,9 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode) {
return
pMaster
;
return
pMaster
;
}
}
static
int
syncValidateMaster
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncValidateMaster
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int
code
=
0
;
int
32_t
code
=
0
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_MASTER
&&
nodeVersion
<
pPeer
->
version
)
{
if
(
nodeRole
==
TAOS_SYNC_ROLE_MASTER
&&
nodeVersion
<
pPeer
->
version
)
{
sDebug
(
"%s, slave has higher version, restart all connections!!!"
,
pPeer
->
id
);
sDebug
(
"%s, slave has higher version, restart all connections!!!"
,
pPeer
->
id
);
...
@@ -641,7 +641,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
...
@@ -641,7 +641,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
code
=
-
1
;
code
=
-
1
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
i
==
pNode
->
selfIndex
)
continue
;
if
(
i
==
pNode
->
selfIndex
)
continue
;
syncRestartPeer
(
pNode
->
peerInfo
[
i
]);
syncRestartPeer
(
pNode
->
peerInfo
[
i
]);
}
}
...
@@ -683,7 +683,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
...
@@ -683,7 +683,7 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
}
}
}
else
{
}
else
{
// master not there, if all peer's state and version are consistent, choose the master
// master not there, if all peer's state and version are consistent, choose the master
int
consistent
=
0
;
int
32_t
consistent
=
0
;
if
(
peersStatus
)
{
if
(
peersStatus
)
{
for
(
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
...
@@ -721,9 +721,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
...
@@ -721,9 +721,9 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
int
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
int
32_t
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
if
(
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
}
}
}
}
...
@@ -757,7 +757,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
...
@@ -757,7 +757,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) {
pthread_t
thread
;
pthread_t
thread
;
pthread_attr_init
(
&
thattr
);
pthread_attr_init
(
&
thattr
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
int
ret
=
pthread_create
(
&
thread
,
&
thattr
,
syncRetrieveData
,
pPeer
);
int
32_t
ret
=
pthread_create
(
&
thread
,
&
thattr
,
syncRetrieveData
,
pPeer
);
pthread_attr_destroy
(
&
thattr
);
pthread_attr_destroy
(
&
thattr
);
if
(
ret
!=
0
)
{
if
(
ret
!=
0
)
{
...
@@ -802,7 +802,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
...
@@ -802,7 +802,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
// Ensure the sync of mnode not interrupted
// Ensure the sync of mnode not interrupted
if
(
pNode
->
vgId
!=
1
&&
tsSyncNum
>=
tsMaxSyncNum
)
{
if
(
pNode
->
vgId
!=
1
&&
tsSyncNum
>=
tsMaxSyncNum
)
{
sInfo
(
"%s, %d syncs are in process, try later"
,
pPeer
->
id
,
tsSyncNum
);
sInfo
(
"%s, %d syncs are in process, try later"
,
pPeer
->
id
,
tsSyncNum
);
taosTmrReset
(
syncTryRecoverFromMaster
,
500
+
(
pNode
->
vgId
*
10
)
%
200
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncTryRecoverFromMaster
,
500
+
(
pNode
->
vgId
*
10
)
%
200
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
return
;
return
;
}
}
...
@@ -815,7 +815,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
...
@@ -815,7 +815,7 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer) {
firstPkt
.
syncHead
.
len
=
sizeof
(
firstPkt
)
-
sizeof
(
SSyncHead
);
firstPkt
.
syncHead
.
len
=
sizeof
(
firstPkt
)
-
sizeof
(
SSyncHead
);
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
tstrncpy
(
firstPkt
.
fqdn
,
tsNodeFqdn
,
sizeof
(
firstPkt
.
fqdn
));
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
taosTmrReset
(
syncNotStarted
,
tsSyncTimer
*
1000
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncNotStarted
,
tsSyncTimer
*
1000
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
if
(
write
(
pPeer
->
peerFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
!=
sizeof
(
firstPkt
))
{
if
(
write
(
pPeer
->
peerFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
!=
sizeof
(
firstPkt
))
{
sError
(
"%s, failed to send sync-req to peer"
,
pPeer
->
id
);
sError
(
"%s, failed to send sync-req to peer"
,
pPeer
->
id
);
...
@@ -836,7 +836,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
...
@@ -836,7 +836,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
if
(
pFirst
->
version
<=
pFwdRsp
->
version
&&
pSyncFwds
->
fwds
>
0
)
{
if
(
pFirst
->
version
<=
pFwdRsp
->
version
&&
pSyncFwds
->
fwds
>
0
)
{
// find the forwardInfo from first
// find the forwardInfo from first
for
(
int
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
i
+
pSyncFwds
->
first
)
%
tsMaxFwdInfo
;
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
i
+
pSyncFwds
->
first
)
%
tsMaxFwdInfo
;
if
(
pFwdRsp
->
version
==
pFwdInfo
->
version
)
break
;
if
(
pFwdRsp
->
version
==
pFwdInfo
->
version
)
break
;
}
}
...
@@ -879,10 +879,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
...
@@ -879,10 +879,10 @@ static void syncProcessPeersStatusMsg(char *cont, SSyncPeer *pPeer) {
}
}
}
}
static
int
syncReadPeerMsg
(
SSyncPeer
*
pPeer
,
SSyncHead
*
pHead
,
char
*
cont
)
{
static
int
32_t
syncReadPeerMsg
(
SSyncPeer
*
pPeer
,
SSyncHead
*
pHead
,
char
*
cont
)
{
if
(
pPeer
->
peerFd
<
0
)
return
-
1
;
if
(
pPeer
->
peerFd
<
0
)
return
-
1
;
int
hlen
=
taosReadMsg
(
pPeer
->
peerFd
,
pHead
,
sizeof
(
SSyncHead
));
int
32_t
hlen
=
taosReadMsg
(
pPeer
->
peerFd
,
pHead
,
sizeof
(
SSyncHead
));
if
(
hlen
!=
sizeof
(
SSyncHead
))
{
if
(
hlen
!=
sizeof
(
SSyncHead
))
{
sDebug
(
"%s, failed to read msg, hlen:%d"
,
pPeer
->
id
,
hlen
);
sDebug
(
"%s, failed to read msg, hlen:%d"
,
pPeer
->
id
,
hlen
);
return
-
1
;
return
-
1
;
...
@@ -894,7 +894,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
...
@@ -894,7 +894,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return
-
1
;
return
-
1
;
}
}
int
bytes
=
taosReadMsg
(
pPeer
->
peerFd
,
cont
,
pHead
->
len
);
int
32_t
bytes
=
taosReadMsg
(
pPeer
->
peerFd
,
cont
,
pHead
->
len
);
if
(
bytes
!=
pHead
->
len
)
{
if
(
bytes
!=
pHead
->
len
)
{
sError
(
"%s, failed to read, bytes:%d len:%d"
,
pPeer
->
id
,
bytes
,
pHead
->
len
);
sError
(
"%s, failed to read, bytes:%d len:%d"
,
pPeer
->
id
,
bytes
,
pHead
->
len
);
return
-
1
;
return
-
1
;
...
@@ -903,7 +903,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
...
@@ -903,7 +903,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return
0
;
return
0
;
}
}
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
static
int
32_t
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
SSyncPeer
*
pPeer
=
param
;
SSyncPeer
*
pPeer
=
param
;
SSyncHead
head
;
SSyncHead
head
;
char
*
cont
=
buffer
;
char
*
cont
=
buffer
;
...
@@ -911,7 +911,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
...
@@ -911,7 +911,7 @@ static int syncProcessPeerMsg(void *param, void *buffer) {
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
int
code
=
syncReadPeerMsg
(
pPeer
,
&
head
,
cont
);
int
32_t
code
=
syncReadPeerMsg
(
pPeer
,
&
head
,
cont
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
if
(
head
.
type
==
TAOS_SMSG_FORWARD
)
{
if
(
head
.
type
==
TAOS_SMSG_FORWARD
)
{
...
@@ -948,12 +948,12 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
...
@@ -948,12 +948,12 @@ static void syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack) {
pPeersStatus
->
role
=
nodeRole
;
pPeersStatus
->
role
=
nodeRole
;
pPeersStatus
->
ack
=
ack
;
pPeersStatus
->
ack
=
ack
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeersStatus
->
peersStatus
[
i
].
role
=
pNode
->
peerInfo
[
i
]
->
role
;
pPeersStatus
->
peersStatus
[
i
].
role
=
pNode
->
peerInfo
[
i
]
->
role
;
pPeersStatus
->
peersStatus
[
i
].
version
=
pNode
->
peerInfo
[
i
]
->
version
;
pPeersStatus
->
peersStatus
[
i
].
version
=
pNode
->
peerInfo
[
i
]
->
version
;
}
}
int
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
);
int
32_t
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
statusMsgLen
);
if
(
retLen
==
statusMsgLen
)
{
if
(
retLen
==
statusMsgLen
)
{
sDebug
(
"%s, status msg is sent, self:%s ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
syncRole
[
pPeersStatus
->
role
],
sDebug
(
"%s, status msg is sent, self:%s ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
syncRole
[
pPeersStatus
->
role
],
pPeersStatus
->
version
,
pPeersStatus
->
ack
);
pPeersStatus
->
version
,
pPeersStatus
->
ack
);
...
@@ -975,10 +975,10 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...
@@ -975,10 +975,10 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
return
;
return
;
}
}
int
connFd
=
taosOpenTcpClientSocket
(
pPeer
->
ip
,
pPeer
->
port
,
0
);
int
32_t
connFd
=
taosOpenTcpClientSocket
(
pPeer
->
ip
,
pPeer
->
port
,
0
);
if
(
connFd
<
0
)
{
if
(
connFd
<
0
)
{
sDebug
(
"%s, failed to open tcp socket(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
sDebug
(
"%s, failed to open tcp socket(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
return
;
return
;
}
}
...
@@ -999,7 +999,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...
@@ -999,7 +999,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
}
else
{
}
else
{
sDebug
(
"try later"
);
sDebug
(
"try later"
);
close
(
connFd
);
close
(
connFd
);
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
s
yncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
tsS
yncTmrCtrl
,
&
pPeer
->
timer
);
}
}
}
}
...
@@ -1024,7 +1024,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
...
@@ -1024,7 +1024,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
pthread_attr_setdetachstate
(
&
thattr
,
PTHREAD_CREATE_DETACHED
);
syncAddPeerRef
(
pPeer
);
syncAddPeerRef
(
pPeer
);
int
ret
=
pthread_create
(
&
(
thread
),
&
thattr
,
(
void
*
)
syncRestoreData
,
pPeer
);
int
32_t
ret
=
pthread_create
(
&
(
thread
),
&
thattr
,
(
void
*
)
syncRestoreData
,
pPeer
);
pthread_attr_destroy
(
&
thattr
);
pthread_attr_destroy
(
&
thattr
);
if
(
ret
<
0
)
{
if
(
ret
<
0
)
{
...
@@ -1036,9 +1036,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
...
@@ -1036,9 +1036,9 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) {
}
}
}
}
static
void
syncProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
{
static
void
syncProcessIncommingConnection
(
int
32_t
connFd
,
uint32_t
sourceIp
)
{
char
ipstr
[
24
];
char
ipstr
[
24
];
int
i
;
int
32_t
i
;
tinet_ntoa
(
ipstr
,
sourceIp
);
tinet_ntoa
(
ipstr
,
sourceIp
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
...
@@ -1051,7 +1051,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
...
@@ -1051,7 +1051,7 @@ static void syncProcessIncommingConnection(int connFd, uint32_t sourceIp) {
}
}
int32_t
vgId
=
firstPkt
.
syncHead
.
vgId
;
int32_t
vgId
=
firstPkt
.
syncHead
.
vgId
;
SSyncNode
**
ppNode
=
(
SSyncNode
**
)
taosHashGet
(
v
gIdHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
SSyncNode
**
ppNode
=
(
SSyncNode
**
)
taosHashGet
(
tsV
gIdHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppNode
==
NULL
||
*
ppNode
==
NULL
)
{
if
(
ppNode
==
NULL
||
*
ppNode
==
NULL
)
{
sError
(
"vgId:%d, vgId could not be found"
,
vgId
);
sError
(
"vgId:%d, vgId could not be found"
,
vgId
);
taosCloseSocket
(
connFd
);
taosCloseSocket
(
connFd
);
...
@@ -1137,8 +1137,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
...
@@ -1137,8 +1137,8 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
)
{
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
)
{
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
int
fwds
=
pSyncFwds
->
fwds
;
int
32_t
fwds
=
pSyncFwds
->
fwds
;
for
(
int
i
=
0
;
i
<
fwds
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
pSyncFwds
->
first
;
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
pSyncFwds
->
first
;
if
(
pFwdInfo
->
confirmed
==
0
)
break
;
if
(
pFwdInfo
->
confirmed
==
0
)
break
;
...
@@ -1152,7 +1152,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
...
@@ -1152,7 +1152,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
}
}
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
)
{
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
)
{
int
confirm
=
0
;
int
32_t
confirm
=
0
;
if
(
pFwdInfo
->
code
==
0
)
pFwdInfo
->
code
=
code
;
if
(
pFwdInfo
->
code
==
0
)
pFwdInfo
->
code
=
code
;
if
(
code
==
0
)
{
if
(
code
==
0
)
{
...
@@ -1186,7 +1186,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
...
@@ -1186,7 +1186,7 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
if
(
pSyncFwds
->
fwds
>
0
)
{
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
if
(
time
-
pFwdInfo
->
time
<
2000
)
break
;
if
(
time
-
pFwdInfo
->
time
<
2000
)
break
;
syncProcessFwdAck
(
pNode
,
pFwdInfo
,
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
syncProcessFwdAck
(
pNode
,
pFwdInfo
,
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
...
@@ -1196,23 +1196,23 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
...
@@ -1196,23 +1196,23 @@ static void syncMonitorFwdInfos(void *param, void *tmrId) {
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
}
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
(
void
*
)
pNode
->
rid
,
s
yncTmrCtrl
);
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
(
void
*
)
pNode
->
rid
,
tsS
yncTmrCtrl
);
}
}
taosReleaseRef
(
tsSyncRefId
,
rid
);
taosReleaseRef
(
tsSyncRefId
,
rid
);
}
}
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
{
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
32_t
qtype
)
{
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
SSyncHead
*
pSyncHead
;
SSyncHead
*
pSyncHead
;
SWalHead
*
pWalHead
=
data
;
SWalHead
*
pWalHead
=
data
;
int
fwdLen
;
int
32_t
fwdLen
;
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pWalHead
->
version
!=
nodeVersion
+
1
)
{
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pWalHead
->
version
!=
nodeVersion
+
1
)
{
sError
(
"vgId:%d, received ver:%"
PRIu64
", inconsistent with last ver:%"
PRIu64
", restart connection"
,
pNode
->
vgId
,
sError
(
"vgId:%d, received ver:%"
PRIu64
", inconsistent with last ver:%"
PRIu64
", restart connection"
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
pWalHead
->
version
,
nodeVersion
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
syncRestartConnection
(
pPeer
);
syncRestartConnection
(
pPeer
);
}
}
...
@@ -1238,7 +1238,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
...
@@ -1238,7 +1238,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
32_t
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
if
(
pPeer
==
NULL
||
pPeer
->
peerFd
<
0
)
continue
;
if
(
pPeer
==
NULL
||
pPeer
->
peerFd
<
0
)
continue
;
if
(
pPeer
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pPeer
->
sstatus
!=
TAOS_SYNC_STATUS_CACHE
)
continue
;
if
(
pPeer
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pPeer
->
sstatus
!=
TAOS_SYNC_STATUS_CACHE
)
continue
;
...
@@ -1248,7 +1248,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
...
@@ -1248,7 +1248,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
code
=
1
;
code
=
1
;
}
}
int
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
int
32_t
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
if
(
retLen
==
fwdLen
)
{
if
(
retLen
==
fwdLen
)
{
sDebug
(
"%s, forward is sent, ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
sDebug
(
"%s, forward is sent, ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
}
else
{
}
else
{
...
...
src/sync/src/syncRestore.c
浏览文件 @
a5fdb787
...
@@ -48,12 +48,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
...
@@ -48,12 +48,12 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, int32_t sindex, int32_t eindex
}
}
}
}
static
int
syncRestoreFile
(
SSyncPeer
*
pPeer
,
uint64_t
*
fversion
)
{
static
int
32_t
syncRestoreFile
(
SSyncPeer
*
pPeer
,
uint64_t
*
fversion
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFileInfo
minfo
;
memset
(
&
minfo
,
0
,
sizeof
(
minfo
));
/* = {0}; */
// master file info
SFileInfo
minfo
;
memset
(
&
minfo
,
0
,
sizeof
(
minfo
));
/* = {0}; */
// master file info
SFileInfo
sinfo
;
memset
(
&
sinfo
,
0
,
sizeof
(
sinfo
));
/* = {0}; */
// slave file info
SFileInfo
sinfo
;
memset
(
&
sinfo
,
0
,
sizeof
(
sinfo
));
/* = {0}; */
// slave file info
SFileAck
fileAck
;
SFileAck
fileAck
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
uint32_t
pindex
=
0
;
// index in last restore
uint32_t
pindex
=
0
;
// index in last restore
bool
fileChanged
=
false
;
bool
fileChanged
=
false
;
...
@@ -62,7 +62,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -62,7 +62,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
sinfo
.
index
=
0
;
sinfo
.
index
=
0
;
while
(
1
)
{
while
(
1
)
{
// read file info
// read file info
int
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
minfo
),
sizeof
(
minfo
));
int
32_t
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
minfo
),
sizeof
(
minfo
));
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
// if no more file from master, break;
// if no more file from master, break;
...
@@ -104,7 +104,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -104,7 +104,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
minfo
.
name
[
sizeof
(
minfo
.
name
)
-
1
]
=
0
;
minfo
.
name
[
sizeof
(
minfo
.
name
)
-
1
]
=
0
;
snprintf
(
name
,
sizeof
(
name
),
"%s/%s"
,
pNode
->
path
,
minfo
.
name
);
snprintf
(
name
,
sizeof
(
name
),
"%s/%s"
,
pNode
->
path
,
minfo
.
name
);
int
dfd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_TRUNC
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
int
32_t
dfd
=
open
(
name
,
O_WRONLY
|
O_CREAT
|
O_TRUNC
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
dfd
<
0
)
{
if
(
dfd
<
0
)
{
sError
(
"%s, failed to open file:%s"
,
pPeer
->
id
,
name
);
sError
(
"%s, failed to open file:%s"
,
pPeer
->
id
,
name
);
break
;
break
;
...
@@ -132,9 +132,9 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -132,9 +132,9 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
return
code
;
return
code
;
}
}
static
int
syncRestoreWal
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncRestoreWal
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int
ret
,
code
=
-
1
;
int
32_t
ret
,
code
=
-
1
;
void
*
buffer
=
calloc
(
1024000
,
1
);
// size for one record
void
*
buffer
=
calloc
(
1024000
,
1
);
// size for one record
if
(
buffer
==
NULL
)
return
-
1
;
if
(
buffer
==
NULL
)
return
-
1
;
...
@@ -175,10 +175,10 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
...
@@ -175,10 +175,10 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset) {
return
offset
;
return
offset
;
}
}
static
int
syncProcessBufferedFwd
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncProcessBufferedFwd
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
int
forwards
=
0
;
int
32_t
forwards
=
0
;
sDebug
(
"%s, number of buffered forwards:%d"
,
pPeer
->
id
,
pRecv
->
forwards
);
sDebug
(
"%s, number of buffered forwards:%d"
,
pPeer
->
id
,
pRecv
->
forwards
);
...
@@ -203,12 +203,12 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
...
@@ -203,12 +203,12 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer) {
return
pRecv
->
code
;
return
pRecv
->
code
;
}
}
int
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
)
{
int
32_t
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
if
(
pRecv
==
NULL
)
return
-
1
;
if
(
pRecv
==
NULL
)
return
-
1
;
int
len
=
pHead
->
len
+
sizeof
(
SWalHead
);
int
32_t
len
=
pHead
->
len
+
sizeof
(
SWalHead
);
if
(
pRecv
->
bufferSize
-
(
pRecv
->
offset
-
pRecv
->
buffer
)
>=
len
)
{
if
(
pRecv
->
bufferSize
-
(
pRecv
->
offset
-
pRecv
->
buffer
)
>=
len
)
{
memcpy
(
pRecv
->
offset
,
pHead
,
len
);
memcpy
(
pRecv
->
offset
,
pHead
,
len
);
...
@@ -231,7 +231,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) {
...
@@ -231,7 +231,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode) {
tfree
(
pNode
->
pRecv
);
tfree
(
pNode
->
pRecv
);
}
}
static
int
syncOpenRecvBuffer
(
SSyncNode
*
pNode
)
{
static
int
32_t
syncOpenRecvBuffer
(
SSyncNode
*
pNode
)
{
syncCloseRecvBuffer
(
pNode
);
syncCloseRecvBuffer
(
pNode
);
SRecvBuffer
*
pRecv
=
calloc
(
sizeof
(
SRecvBuffer
),
1
);
SRecvBuffer
*
pRecv
=
calloc
(
sizeof
(
SRecvBuffer
),
1
);
...
@@ -252,13 +252,13 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) {
...
@@ -252,13 +252,13 @@ static int syncOpenRecvBuffer(SSyncNode *pNode) {
return
0
;
return
0
;
}
}
static
int
syncRestoreDataStepByStep
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncRestoreDataStepByStep
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
nodeSStatus
=
TAOS_SYNC_STATUS_FILE
;
nodeSStatus
=
TAOS_SYNC_STATUS_FILE
;
uint64_t
fversion
=
0
;
uint64_t
fversion
=
0
;
sDebug
(
"%s, start to restore file"
,
pPeer
->
id
);
sDebug
(
"%s, start to restore file"
,
pPeer
->
id
);
int
code
=
syncRestoreFile
(
pPeer
,
&
fversion
);
int
32_t
code
=
syncRestoreFile
(
pPeer
,
&
fversion
);
if
(
code
<
0
)
{
if
(
code
<
0
)
{
sError
(
"%s, failed to restore file"
,
pPeer
->
id
);
sError
(
"%s, failed to restore file"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
a5fdb787
...
@@ -27,7 +27,7 @@
...
@@ -27,7 +27,7 @@
#include "tsync.h"
#include "tsync.h"
#include "syncInt.h"
#include "syncInt.h"
static
int
syncAddIntoWatchList
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
static
int
32_t
syncAddIntoWatchList
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
sDebug
(
"%s, start to monitor:%s"
,
pPeer
->
id
,
name
);
sDebug
(
"%s, start to monitor:%s"
,
pPeer
->
id
,
name
);
if
(
pPeer
->
notifyFd
<=
0
)
{
if
(
pPeer
->
notifyFd
<=
0
)
{
...
@@ -38,16 +38,16 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
...
@@ -38,16 +38,16 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
return
-
1
;
return
-
1
;
}
}
if
(
pPeer
->
watchFd
==
NULL
)
pPeer
->
watchFd
=
malloc
(
sizeof
(
int
)
*
tsMaxWatchFiles
);
if
(
pPeer
->
watchFd
==
NULL
)
pPeer
->
watchFd
=
malloc
(
sizeof
(
int
32_t
)
*
tsMaxWatchFiles
);
if
(
pPeer
->
watchFd
==
NULL
)
{
if
(
pPeer
->
watchFd
==
NULL
)
{
sError
(
"%s, failed to allocate watchFd"
,
pPeer
->
id
);
sError
(
"%s, failed to allocate watchFd"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
}
}
memset
(
pPeer
->
watchFd
,
-
1
,
sizeof
(
int
)
*
tsMaxWatchFiles
);
memset
(
pPeer
->
watchFd
,
-
1
,
sizeof
(
int
32_t
)
*
tsMaxWatchFiles
);
}
}
int
*
wd
=
pPeer
->
watchFd
+
pPeer
->
watchNum
;
int
32_t
*
wd
=
pPeer
->
watchFd
+
pPeer
->
watchNum
;
if
(
*
wd
>=
0
)
{
if
(
*
wd
>=
0
)
{
if
(
inotify_rm_watch
(
pPeer
->
notifyFd
,
*
wd
)
<
0
)
{
if
(
inotify_rm_watch
(
pPeer
->
notifyFd
,
*
wd
)
<
0
)
{
...
@@ -69,17 +69,17 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
...
@@ -69,17 +69,17 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name) {
return
0
;
return
0
;
}
}
static
int
syncAreFilesModified
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncAreFilesModified
(
SSyncPeer
*
pPeer
)
{
if
(
pPeer
->
notifyFd
<=
0
)
return
0
;
if
(
pPeer
->
notifyFd
<=
0
)
return
0
;
char
buf
[
2048
];
char
buf
[
2048
];
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
int
32_t
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
int
code
=
0
;
int
32_t
code
=
0
;
if
(
len
>
0
)
{
if
(
len
>
0
)
{
const
struct
inotify_event
*
event
;
const
struct
inotify_event
*
event
;
char
*
ptr
;
char
*
ptr
;
...
@@ -97,11 +97,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) {
...
@@ -97,11 +97,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer) {
return
code
;
return
code
;
}
}
static
int
syncRetrieveFile
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncRetrieveFile
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFileInfo
fileInfo
;
SFileInfo
fileInfo
;
SFileAck
fileAck
;
SFileAck
fileAck
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
char
name
[
TSDB_FILENAME_LEN
*
2
]
=
{
0
};
memset
(
&
fileInfo
,
0
,
sizeof
(
fileInfo
));
memset
(
&
fileInfo
,
0
,
sizeof
(
fileInfo
));
...
@@ -146,7 +146,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -146,7 +146,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
}
}
// send the file to peer
// send the file to peer
int
sfd
=
open
(
name
,
O_RDONLY
);
int
32_t
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
if
(
sfd
<
0
)
break
;
ret
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
ret
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
...
@@ -169,8 +169,8 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -169,8 +169,8 @@ static int syncRetrieveFile(SSyncPeer *pPeer) {
/* if only a partial record is read out, set the IN_MODIFY flag in event,
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
so upper layer will reload the file to get a complete record */
static
int
syncReadOneWalRecord
(
in
t
sfd
,
SWalHead
*
pHead
,
uint32_t
*
pEvent
)
{
static
int
32_t
syncReadOneWalRecord
(
int32_
t
sfd
,
SWalHead
*
pHead
,
uint32_t
*
pEvent
)
{
int
ret
;
int
32_t
ret
;
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
<
0
)
return
-
1
;
if
(
ret
<
0
)
return
-
1
;
...
@@ -194,7 +194,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
...
@@ -194,7 +194,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent) {
return
sizeof
(
SWalHead
)
+
pHead
->
len
;
return
sizeof
(
SWalHead
)
+
pHead
->
len
;
}
}
static
int
syncMonitorLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
static
int
32_t
syncMonitorLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
pPeer
->
watchNum
=
0
;
pPeer
->
watchNum
=
0
;
taosClose
(
pPeer
->
notifyFd
);
taosClose
(
pPeer
->
notifyFd
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
...
@@ -203,14 +203,14 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
...
@@ -203,14 +203,14 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
return
-
1
;
return
-
1
;
}
}
if
(
pPeer
->
watchFd
==
NULL
)
pPeer
->
watchFd
=
malloc
(
sizeof
(
int
)
*
tsMaxWatchFiles
);
if
(
pPeer
->
watchFd
==
NULL
)
pPeer
->
watchFd
=
malloc
(
sizeof
(
int
32_t
)
*
tsMaxWatchFiles
);
if
(
pPeer
->
watchFd
==
NULL
)
{
if
(
pPeer
->
watchFd
==
NULL
)
{
sError
(
"%s, failed to allocate watchFd"
,
pPeer
->
id
);
sError
(
"%s, failed to allocate watchFd"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
}
}
memset
(
pPeer
->
watchFd
,
-
1
,
sizeof
(
int
)
*
tsMaxWatchFiles
);
memset
(
pPeer
->
watchFd
,
-
1
,
sizeof
(
int
32_t
)
*
tsMaxWatchFiles
);
int
*
wd
=
pPeer
->
watchFd
;
int
32_t
*
wd
=
pPeer
->
watchFd
;
*
wd
=
inotify_add_watch
(
pPeer
->
notifyFd
,
name
,
IN_MODIFY
|
IN_CLOSE_WRITE
);
*
wd
=
inotify_add_watch
(
pPeer
->
notifyFd
,
name
,
IN_MODIFY
|
IN_CLOSE_WRITE
);
if
(
*
wd
==
-
1
)
{
if
(
*
wd
==
-
1
)
{
...
@@ -222,8 +222,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
...
@@ -222,8 +222,8 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name) {
}
}
static
int32_t
syncCheckLastWalChanges
(
SSyncPeer
*
pPeer
,
uint32_t
*
pEvent
)
{
static
int32_t
syncCheckLastWalChanges
(
SSyncPeer
*
pPeer
,
uint32_t
*
pEvent
)
{
char
buf
[
2048
];
char
buf
[
2048
];
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
int
32_t
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
...
@@ -243,11 +243,11 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
...
@@ -243,11 +243,11 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) {
return
0
;
return
0
;
}
}
static
int
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
,
uint32_t
*
pEvent
)
{
static
int
32_t
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
,
uint32_t
*
pEvent
)
{
SWalHead
*
pHead
=
malloc
(
640000
);
SWalHead
*
pHead
=
malloc
(
640000
);
int
code
=
-
1
;
int
32_t
code
=
-
1
;
int32_t
bytes
=
0
;
int32_t
bytes
=
0
;
int
sfd
;
int
32_t
sfd
;
sfd
=
open
(
name
,
O_RDONLY
);
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
{
if
(
sfd
<
0
)
{
...
@@ -259,7 +259,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
...
@@ -259,7 +259,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug
(
"%s, retrieve last wal, offset:%"
PRId64
" fver:%"
PRIu64
,
pPeer
->
id
,
offset
,
fversion
);
sDebug
(
"%s, retrieve last wal, offset:%"
PRId64
" fver:%"
PRIu64
,
pPeer
->
id
,
offset
,
fversion
);
while
(
1
)
{
while
(
1
)
{
int
wsize
=
syncReadOneWalRecord
(
sfd
,
pHead
,
pEvent
);
int
32_t
wsize
=
syncReadOneWalRecord
(
sfd
,
pHead
,
pEvent
);
if
(
wsize
<
0
)
break
;
if
(
wsize
<
0
)
break
;
if
(
wsize
==
0
)
{
if
(
wsize
==
0
)
{
code
=
0
;
code
=
0
;
...
@@ -267,7 +267,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
...
@@ -267,7 +267,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
}
}
sDebug
(
"%s, last wal is forwarded, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
sDebug
(
"%s, last wal is forwarded, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
int
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
int
32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
if
(
ret
!=
wsize
)
break
;
if
(
ret
!=
wsize
)
break
;
pPeer
->
sversion
=
pHead
->
version
;
pPeer
->
sversion
=
pHead
->
version
;
...
@@ -287,9 +287,9 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
...
@@ -287,9 +287,9 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return
-
1
;
return
-
1
;
}
}
static
int
syncProcessLastWal
(
SSyncPeer
*
pPeer
,
char
*
wname
,
int64_t
index
)
{
static
int
32_t
syncProcessLastWal
(
SSyncPeer
*
pPeer
,
char
*
wname
,
int64_t
index
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
char
fname
[
TSDB_FILENAME_LEN
*
2
];
// full path to wal file
char
fname
[
TSDB_FILENAME_LEN
*
2
];
// full path to wal file
if
(
syncAreFilesModified
(
pPeer
)
!=
0
)
return
-
1
;
if
(
syncAreFilesModified
(
pPeer
)
!=
0
)
return
-
1
;
...
@@ -370,13 +370,13 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
...
@@ -370,13 +370,13 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, int64_t index) {
return
code
;
return
code
;
}
}
static
int
syncRetrieveWal
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncRetrieveWal
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
char
fname
[
TSDB_FILENAME_LEN
*
3
];
char
fname
[
TSDB_FILENAME_LEN
*
3
];
char
wname
[
TSDB_FILENAME_LEN
*
2
];
char
wname
[
TSDB_FILENAME_LEN
*
2
];
int32_t
size
;
int32_t
size
;
struct
stat
fstat
;
struct
stat
fstat
;
int
code
=
-
1
;
int
32_t
code
=
-
1
;
int64_t
index
=
0
;
int64_t
index
=
0
;
while
(
1
)
{
while
(
1
)
{
...
@@ -403,7 +403,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
...
@@ -403,7 +403,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
size
=
fstat
.
st_size
;
size
=
fstat
.
st_size
;
sDebug
(
"%s, retrieve wal:%s size:%d"
,
pPeer
->
id
,
fname
,
size
);
sDebug
(
"%s, retrieve wal:%s size:%d"
,
pPeer
->
id
,
fname
,
size
);
int
sfd
=
open
(
fname
,
O_RDONLY
);
int
32_t
sfd
=
open
(
fname
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
if
(
sfd
<
0
)
break
;
code
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
size
);
code
=
taosSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
size
);
...
@@ -428,7 +428,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
...
@@ -428,7 +428,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer) {
return
code
;
return
code
;
}
}
static
int
syncRetrieveDataStepByStep
(
SSyncPeer
*
pPeer
)
{
static
int
32_t
syncRetrieveDataStepByStep
(
SSyncPeer
*
pPeer
)
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFirstPkt
firstPkt
;
SFirstPkt
firstPkt
;
...
...
src/sync/src/tarbitrator.c
浏览文件 @
a5fdb787
...
@@ -28,22 +28,22 @@
...
@@ -28,22 +28,22 @@
#include "syncInt.h"
#include "syncInt.h"
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
);
static
void
arbProcessIncommingConnection
(
int
32_t
connFd
,
uint32_t
sourceIp
);
static
void
arbProcessBrokenLink
(
void
*
param
);
static
void
arbProcessBrokenLink
(
void
*
param
);
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
int
32_t
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
tsem_t
tsArbSem
;
static
tsem_t
tsArbSem
;
static
ttpool_h
tsArbTcpPool
;
static
ttpool_h
tsArbTcpPool
;
typedef
struct
{
typedef
struct
{
char
id
[
TSDB_EP_LEN
+
24
];
char
id
[
TSDB_EP_LEN
+
24
];
int
nodeFd
;
int
32_t
nodeFd
;
void
*
pConn
;
void
*
pConn
;
}
SNodeConn
;
}
SNodeConn
;
int
main
(
in
t
argc
,
char
*
argv
[])
{
int
32_t
main
(
int32_
t
argc
,
char
*
argv
[])
{
char
arbLogPath
[
TSDB_FILENAME_LEN
+
16
]
=
{
0
};
char
arbLogPath
[
TSDB_FILENAME_LEN
+
16
]
=
{
0
};
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
for
(
int
32_t
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
tsArbitratorPort
=
atoi
(
argv
[
++
i
]);
tsArbitratorPort
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
...
@@ -108,7 +108,7 @@ int main(int argc, char *argv[]) {
...
@@ -108,7 +108,7 @@ int main(int argc, char *argv[]) {
return
0
;
return
0
;
}
}
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
{
static
void
arbProcessIncommingConnection
(
int
32_t
connFd
,
uint32_t
sourceIp
)
{
char
ipstr
[
24
];
char
ipstr
[
24
];
tinet_ntoa
(
ipstr
,
sourceIp
);
tinet_ntoa
(
ipstr
,
sourceIp
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
...
@@ -150,13 +150,13 @@ static void arbProcessBrokenLink(void *param) {
...
@@ -150,13 +150,13 @@ static void arbProcessBrokenLink(void *param) {
tfree
(
pNode
);
tfree
(
pNode
);
}
}
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
static
int
32_t
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
SNodeConn
*
pNode
=
param
;
SNodeConn
*
pNode
=
param
;
SSyncHead
head
;
SSyncHead
head
;
int
bytes
=
0
;
int
32_t
bytes
=
0
;
char
*
cont
=
(
char
*
)
buffer
;
char
*
cont
=
(
char
*
)
buffer
;
int
hlen
=
taosReadMsg
(
pNode
->
nodeFd
,
&
head
,
sizeof
(
head
));
int
32_t
hlen
=
taosReadMsg
(
pNode
->
nodeFd
,
&
head
,
sizeof
(
head
));
if
(
hlen
!=
sizeof
(
head
))
{
if
(
hlen
!=
sizeof
(
head
))
{
sDebug
(
"%s, failed to read msg, hlen:%d"
,
pNode
->
id
,
hlen
);
sDebug
(
"%s, failed to read msg, hlen:%d"
,
pNode
->
id
,
hlen
);
return
-
1
;
return
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录