Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
d9faca1c
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
d9faca1c
编写于
8月 27, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
8月 27, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #3255 from taosdata/hotfix/sync
Hotfix/sync
上级
9028c1de
37c2d3eb
变更
16
隐藏空白更改
内联
并排
Showing
16 changed file
with
147 addition
and
199 deletion
+147
-199
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/inc/tsync.h
src/inc/tsync.h
+1
-1
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+1
-1
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+1
-1
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+68
-102
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+12
-21
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+44
-48
src/sync/src/taosTcpPool.c
src/sync/src/taosTcpPool.c
+5
-9
src/sync/src/tarbitrator.c
src/sync/src/tarbitrator.c
+2
-5
src/sync/test/syncServer.c
src/sync/test/syncServer.c
+1
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+1
-1
src/tsdb/src/tsdbFile.c
src/tsdb/src/tsdbFile.c
+2
-2
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+1
-1
src/util/inc/tkvstore.h
src/util/inc/tkvstore.h
+1
-1
src/util/src/tkvstore.c
src/util/src/tkvstore.c
+2
-2
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+4
-2
未找到文件。
src/inc/tsdb.h
浏览文件 @
d9faca1c
...
@@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
...
@@ -115,7 +115,7 @@ int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
int
tsdbUpdateTableTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
int
tsdbUpdateTableTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
TSKEY
tsdbGetTableLastKey
(
TSDB_REPO_T
*
repo
,
uint64_t
uid
);
TSKEY
tsdbGetTableLastKey
(
TSDB_REPO_T
*
repo
,
uint64_t
uid
);
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
);
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
);
// the TSDB repository info
// the TSDB repository info
typedef
struct
STsdbRepoInfo
{
typedef
struct
STsdbRepoInfo
{
...
...
src/inc/tsync.h
浏览文件 @
d9faca1c
...
@@ -64,7 +64,7 @@ typedef struct {
...
@@ -64,7 +64,7 @@ typedef struct {
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
if name is provided(name[0] is not zero), get the named file at the specified index. If not there, return
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
zero. If it is there, set the size to file size, and return file magic number. Index shall not be updated.
*/
*/
typedef
uint32_t
(
*
FGetFileInfo
)(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
,
uint64_t
*
fversion
);
typedef
uint32_t
(
*
FGetFileInfo
)(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
,
uint64_t
*
fversion
);
// get the wal file from index or after
// get the wal file from index or after
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
// return value, -1: error, 1:more wal files, 0:last WAL. if name[0]==0, no WAL file
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
d9faca1c
...
@@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() {
...
@@ -224,7 +224,7 @@ void sdbUpdateMnodeRoles() {
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
();
}
}
static
uint32_t
sdbGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
,
uint64_t
*
fversion
)
{
static
uint32_t
sdbGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
,
uint64_t
*
fversion
)
{
sdbUpdateMnodeRoles
();
sdbUpdateMnodeRoles
();
return
0
;
return
0
;
}
}
...
...
src/sync/inc/syncInt.h
浏览文件 @
d9faca1c
...
@@ -74,7 +74,7 @@ typedef struct {
...
@@ -74,7 +74,7 @@ typedef struct {
uint32_t
magic
;
uint32_t
magic
;
uint32_t
index
;
uint32_t
index
;
uint64_t
fversion
;
uint64_t
fversion
;
int
32
_t
size
;
int
64
_t
size
;
}
SFileInfo
;
}
SFileInfo
;
typedef
struct
{
typedef
struct
{
...
...
src/sync/src/syncMain.c
浏览文件 @
d9faca1c
...
@@ -108,8 +108,7 @@ static void syncModuleInitFunc() {
...
@@ -108,8 +108,7 @@ static void syncModuleInitFunc() {
tstrncpy
(
tsNodeFqdn
,
tsLocalFqdn
,
sizeof
(
tsNodeFqdn
));
tstrncpy
(
tsNodeFqdn
,
tsLocalFqdn
,
sizeof
(
tsNodeFqdn
));
}
}
void
*
syncStart
(
const
SSyncInfo
*
pInfo
)
void
*
syncStart
(
const
SSyncInfo
*
pInfo
)
{
{
const
SSyncCfg
*
pCfg
=
&
pInfo
->
syncCfg
;
const
SSyncCfg
*
pCfg
=
&
pInfo
->
syncCfg
;
SSyncNode
*
pNode
=
(
SSyncNode
*
)
calloc
(
sizeof
(
SSyncNode
),
1
);
SSyncNode
*
pNode
=
(
SSyncNode
*
)
calloc
(
sizeof
(
SSyncNode
),
1
);
...
@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo)
...
@@ -189,9 +188,8 @@ void *syncStart(const SSyncInfo *pInfo)
return
pNode
;
return
pNode
;
}
}
void
syncStop
(
void
*
param
)
void
syncStop
(
void
*
param
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
if
(
pNode
==
NULL
)
return
;
if
(
pNode
==
NULL
)
return
;
...
@@ -215,9 +213,8 @@ void syncStop(void *param)
...
@@ -215,9 +213,8 @@ void syncStop(void *param)
syncDecNodeRef
(
pNode
);
syncDecNodeRef
(
pNode
);
}
}
int32_t
syncReconfig
(
void
*
param
,
const
SSyncCfg
*
pNewCfg
)
int32_t
syncReconfig
(
void
*
param
,
const
SSyncCfg
*
pNewCfg
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
int
i
,
j
;
int
i
,
j
;
if
(
pNode
==
NULL
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
if
(
pNode
==
NULL
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
...
@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
...
@@ -283,10 +280,9 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg)
return
0
;
return
0
;
}
}
int32_t
syncForwardToPeer
(
void
*
param
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
int32_t
syncForwardToPeer
(
void
*
param
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
SSyncHead
*
pSyncHead
;
SSyncHead
*
pSyncHead
;
SWalHead
*
pWalHead
=
data
;
SWalHead
*
pWalHead
=
data
;
int
fwdLen
;
int
fwdLen
;
...
@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
...
@@ -334,9 +330,8 @@ int32_t syncForwardToPeer(void *param, void *data, void *mhandle, int qtype)
return
code
;
return
code
;
}
}
void
syncConfirmForward
(
void
*
param
,
uint64_t
version
,
int32_t
code
)
void
syncConfirmForward
(
void
*
param
,
uint64_t
version
,
int32_t
code
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
if
(
pNode
==
NULL
)
return
;
if
(
pNode
==
NULL
)
return
;
if
(
pNode
->
quorum
<=
1
)
return
;
if
(
pNode
->
quorum
<=
1
)
return
;
...
@@ -387,10 +382,9 @@ void syncRecover(void *param) {
...
@@ -387,10 +382,9 @@ void syncRecover(void *param) {
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
}
int
syncGetNodesRole
(
void
*
param
,
SNodesRole
*
pNodesRole
)
int
syncGetNodesRole
(
void
*
param
,
SNodesRole
*
pNodesRole
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
pNodesRole
->
selfIndex
=
pNode
->
selfIndex
;
pNodesRole
->
selfIndex
=
pNode
->
selfIndex
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pNodesRole
->
nodeId
[
i
]
=
pNode
->
peerInfo
[
i
]
->
nodeId
;
pNodesRole
->
nodeId
[
i
]
=
pNode
->
peerInfo
[
i
]
->
nodeId
;
...
@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
...
@@ -400,8 +394,7 @@ int syncGetNodesRole(void *param, SNodesRole *pNodesRole)
return
0
;
return
0
;
}
}
static
void
syncAddArbitrator
(
SSyncNode
*
pNode
)
static
void
syncAddArbitrator
(
SSyncNode
*
pNode
)
{
{
SSyncPeer
*
pPeer
=
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
];
SSyncPeer
*
pPeer
=
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
];
// if not configured, return right away
// if not configured, return right away
...
@@ -456,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode)
...
@@ -456,13 +449,11 @@ static void syncDecNodeRef(SSyncNode *pNode)
}
}
}
}
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
)
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
)
{
{
atomic_add_fetch_8
(
&
pPeer
->
refCount
,
1
);
atomic_add_fetch_8
(
&
pPeer
->
refCount
,
1
);
}
}
int
syncDecPeerRef
(
SSyncPeer
*
pPeer
)
int
syncDecPeerRef
(
SSyncPeer
*
pPeer
)
{
{
if
(
atomic_sub_fetch_8
(
&
pPeer
->
refCount
,
1
)
==
0
)
{
if
(
atomic_sub_fetch_8
(
&
pPeer
->
refCount
,
1
)
==
0
)
{
syncDecNodeRef
(
pPeer
->
pSyncNode
);
syncDecNodeRef
(
pPeer
->
pSyncNode
);
...
@@ -475,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer)
...
@@ -475,18 +466,16 @@ int syncDecPeerRef(SSyncPeer *pPeer)
return
1
;
return
1
;
}
}
static
void
syncClosePeerConn
(
SSyncPeer
*
pPeer
)
static
void
syncClosePeerConn
(
SSyncPeer
*
pPeer
)
{
{
taosTmrStopA
(
&
pPeer
->
timer
);
taosTmrStopA
(
&
pPeer
->
timer
);
taosClose
(
pPeer
->
syncFd
);
taosClose
(
pPeer
->
syncFd
);
if
(
pPeer
->
peerFd
>=
0
)
{
if
(
pPeer
->
peerFd
>=
0
)
{
pPeer
->
peerFd
=
-
1
;
pPeer
->
peerFd
=
-
1
;
taosFreeTcpConn
(
pPeer
->
pConn
);
taosFreeTcpConn
(
pPeer
->
pConn
);
}
}
}
}
static
void
syncRemovePeer
(
SSyncPeer
*
pPeer
)
static
void
syncRemovePeer
(
SSyncPeer
*
pPeer
)
{
{
sInfo
(
"%s, it is removed"
,
pPeer
->
id
);
sInfo
(
"%s, it is removed"
,
pPeer
->
id
);
pPeer
->
ip
=
0
;
pPeer
->
ip
=
0
;
...
@@ -494,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer)
...
@@ -494,8 +483,7 @@ static void syncRemovePeer(SSyncPeer *pPeer)
syncDecPeerRef
(
pPeer
);
syncDecPeerRef
(
pPeer
);
}
}
static
SSyncPeer
*
syncAddPeer
(
SSyncNode
*
pNode
,
const
SNodeInfo
*
pInfo
)
static
SSyncPeer
*
syncAddPeer
(
SSyncNode
*
pNode
,
const
SNodeInfo
*
pInfo
)
{
{
uint32_t
ip
=
taosGetIpFromFqdn
(
pInfo
->
nodeFqdn
);
uint32_t
ip
=
taosGetIpFromFqdn
(
pInfo
->
nodeFqdn
);
if
(
ip
==
-
1
)
return
NULL
;
if
(
ip
==
-
1
)
return
NULL
;
...
@@ -525,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
...
@@ -525,25 +513,24 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo)
return
pPeer
;
return
pPeer
;
}
}
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
void
syncBroadcastStatus
(
SSyncNode
*
pNode
)
{
{
SSyncPeer
*
pPeer
;
SSyncPeer
*
pPeer
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
if
(
i
==
pNode
->
selfIndex
)
continue
;
if
(
i
==
pNode
->
selfIndex
)
continue
;
pPeer
=
pNode
->
peerInfo
[
i
];
pPeer
=
pNode
->
peerInfo
[
i
];
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
);
syncSendPeersStatusMsgToPeer
(
pPeer
,
1
);
}
}
}
}
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
static
void
syncResetFlowCtrl
(
SSyncNode
*
pNode
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pNode
->
peerInfo
[
i
]
->
numOfRetrieves
=
0
;
pNode
->
peerInfo
[
i
]
->
numOfRetrieves
=
0
;
}
}
if
(
pNode
->
notifyFlowCtrl
)
if
(
pNode
->
notifyFlowCtrl
)
{
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
ahandle
,
0
);
(
*
pNode
->
notifyFlowCtrl
)(
pNode
->
ahandle
,
0
);
}
}
}
static
void
syncChooseMaster
(
SSyncNode
*
pNode
)
{
static
void
syncChooseMaster
(
SSyncNode
*
pNode
)
{
...
@@ -600,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
...
@@ -600,9 +587,9 @@ static void syncChooseMaster(SSyncNode *pNode) {
}
else
{
}
else
{
sDebug
(
"vgId:%d, failed to choose master"
,
pNode
->
vgId
);
sDebug
(
"vgId:%d, failed to choose master"
,
pNode
->
vgId
);
}
}
}
}
static
SSyncPeer
*
syncCheckMaster
(
SSyncNode
*
pNode
)
{
static
SSyncPeer
*
syncCheckMaster
(
SSyncNode
*
pNode
)
{
int
onlineNum
=
0
;
int
onlineNum
=
0
;
int
index
=
-
1
;
int
index
=
-
1
;
int
replica
=
pNode
->
replica
;
int
replica
=
pNode
->
replica
;
...
@@ -619,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
...
@@ -619,7 +606,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
replica
=
pNode
->
replica
+
1
;
replica
=
pNode
->
replica
+
1
;
}
}
if
(
onlineNum
<=
replica
*
0
.
5
)
{
if
(
onlineNum
<=
replica
*
0
.
5
)
{
if
(
nodeRole
!=
TAOS_SYNC_ROLE_UNSYNCED
)
{
if
(
nodeRole
!=
TAOS_SYNC_ROLE_UNSYNCED
)
{
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
pNode
->
peerInfo
[
pNode
->
selfIndex
]
->
role
=
nodeRole
;
pNode
->
peerInfo
[
pNode
->
selfIndex
]
->
role
=
nodeRole
;
...
@@ -627,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
...
@@ -627,13 +614,13 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
sInfo
(
"vgId:%d, change to unsynced state, online:%d replica:%d"
,
pNode
->
vgId
,
onlineNum
,
replica
);
sInfo
(
"vgId:%d, change to unsynced state, online:%d replica:%d"
,
pNode
->
vgId
,
onlineNum
,
replica
);
}
}
}
else
{
}
else
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
SSyncPeer
*
pTemp
=
pNode
->
peerInfo
[
i
];
if
(
pTemp
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
continue
;
if
(
pTemp
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
continue
;
if
(
index
<
0
)
{
if
(
index
<
0
)
{
index
=
i
;
index
=
i
;
}
else
{
// multiple masters, it shall not happen
}
else
{
// multiple masters, it shall not happen
if
(
i
==
pNode
->
selfIndex
)
{
if
(
i
==
pNode
->
selfIndex
)
{
sError
(
"%s, peer is master, work as slave instead"
,
pTemp
->
id
);
sError
(
"%s, peer is master, work as slave instead"
,
pTemp
->
id
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
...
@@ -642,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
...
@@ -642,7 +629,7 @@ static SSyncPeer *syncCheckMaster(SSyncNode *pNode ) {
}
}
}
}
SSyncPeer
*
pMaster
=
(
index
>=
0
)
?
pNode
->
peerInfo
[
index
]
:
NULL
;
SSyncPeer
*
pMaster
=
(
index
>=
0
)
?
pNode
->
peerInfo
[
index
]
:
NULL
;
return
pMaster
;
return
pMaster
;
}
}
...
@@ -651,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
...
@@ -651,7 +638,7 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
int
code
=
0
;
int
code
=
0
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_MASTER
&&
nodeVersion
<
pPeer
->
version
)
{
if
(
nodeRole
==
TAOS_SYNC_ROLE_MASTER
&&
nodeVersion
<
pPeer
->
version
)
{
sDebug
(
"%s, slave has higher version, restart all connections!!!"
,
pPeer
->
id
);
sDebug
(
"%s, slave has higher version, restart all connections!!!"
,
pPeer
->
id
);
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
nodeRole
=
TAOS_SYNC_ROLE_UNSYNCED
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
code
=
-
1
;
code
=
-
1
;
...
@@ -660,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
...
@@ -660,13 +647,12 @@ static int syncValidateMaster(SSyncPeer *pPeer) {
if
(
i
==
pNode
->
selfIndex
)
continue
;
if
(
i
==
pNode
->
selfIndex
)
continue
;
syncRestartPeer
(
pNode
->
peerInfo
[
i
]);
syncRestartPeer
(
pNode
->
peerInfo
[
i
]);
}
}
}
}
return
code
;
return
code
;
}
}
static
void
syncCheckRole
(
SSyncPeer
*
pPeer
,
SPeerStatus
peersStatus
[],
int8_t
newRole
)
static
void
syncCheckRole
(
SSyncPeer
*
pPeer
,
SPeerStatus
peersStatus
[],
int8_t
newRole
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int8_t
peerOldRole
=
pPeer
->
role
;
int8_t
peerOldRole
=
pPeer
->
role
;
int8_t
selfOldRole
=
nodeRole
;
int8_t
selfOldRole
=
nodeRole
;
...
@@ -688,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
...
@@ -688,14 +674,14 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
if
(
syncValidateMaster
(
pPeer
)
<
0
)
return
;
if
(
syncValidateMaster
(
pPeer
)
<
0
)
return
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_UNSYNCED
)
{
if
(
nodeRole
==
TAOS_SYNC_ROLE_UNSYNCED
)
{
if
(
nodeVersion
<
pMaster
->
version
)
{
if
(
nodeVersion
<
pMaster
->
version
)
{
syncRequired
=
1
;
syncRequired
=
1
;
}
else
{
}
else
{
sInfo
(
"%s is master, work as slave, ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
sInfo
(
"%s is master, work as slave, ver:%"
PRIu64
,
pMaster
->
id
,
pMaster
->
version
);
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
nodeRole
=
TAOS_SYNC_ROLE_SLAVE
;
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
(
*
pNode
->
notifyRole
)(
pNode
->
ahandle
,
nodeRole
);
}
}
}
else
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pMaster
==
pPeer
)
{
}
else
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pMaster
==
pPeer
)
{
// nodeVersion = pMaster->version;
// nodeVersion = pMaster->version;
}
}
}
else
{
}
else
{
...
@@ -736,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
...
@@ -736,20 +722,18 @@ static void syncRestartPeer(SSyncPeer *pPeer) {
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_INIT
;
int
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
int
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
)
)
if
(
ret
>
0
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
syncTmrCtrl
,
&
pPeer
->
timer
);
taosTmrReset
(
syncCheckPeerConnection
,
tsSyncTimer
*
1000
,
pPeer
,
syncTmrCtrl
,
&
pPeer
->
timer
);
}
}
void
syncRestartConnection
(
SSyncPeer
*
pPeer
)
void
syncRestartConnection
(
SSyncPeer
*
pPeer
)
{
{
if
(
pPeer
->
ip
==
0
)
return
;
if
(
pPeer
->
ip
==
0
)
return
;
syncRestartPeer
(
pPeer
);
syncRestartPeer
(
pPeer
);
syncCheckRole
(
pPeer
,
NULL
,
TAOS_SYNC_ROLE_OFFLINE
);
syncCheckRole
(
pPeer
,
NULL
,
TAOS_SYNC_ROLE_OFFLINE
);
}
}
static
void
syncProcessSyncRequest
(
char
*
msg
,
SSyncPeer
*
pPeer
)
static
void
syncProcessSyncRequest
(
char
*
msg
,
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
sDebug
(
"%s, sync-req is received"
,
pPeer
->
id
);
sDebug
(
"%s, sync-req is received"
,
pPeer
->
id
);
...
@@ -784,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
...
@@ -784,8 +768,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer)
}
}
}
}
static
void
syncNotStarted
(
void
*
param
,
void
*
tmrId
)
static
void
syncNotStarted
(
void
*
param
,
void
*
tmrId
)
{
{
SSyncPeer
*
pPeer
=
param
;
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
...
@@ -805,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
...
@@ -805,14 +788,13 @@ static void syncTryRecoverFromMaster(void *param, void *tmrId) {
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
}
static
void
syncRecoverFromMaster
(
SSyncPeer
*
pPeer
)
static
void
syncRecoverFromMaster
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
if
(
nodeSStatus
!=
TAOS_SYNC_STATUS_INIT
)
{
sDebug
(
"%s, sync is already started, status:%d"
,
pPeer
->
id
,
nodeSStatus
);
sDebug
(
"%s, sync is already started, status:%d"
,
pPeer
->
id
,
nodeSStatus
);
return
;
return
;
}
}
taosTmrStopA
(
&
pPeer
->
timer
);
taosTmrStopA
(
&
pPeer
->
timer
);
if
(
tsSyncNum
>=
tsMaxSyncNum
)
{
if
(
tsSyncNum
>=
tsMaxSyncNum
)
{
...
@@ -842,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
...
@@ -842,9 +824,8 @@ static void syncRecoverFromMaster(SSyncPeer *pPeer)
return
;
return
;
}
}
static
void
syncProcessFwdResponse
(
char
*
cont
,
SSyncPeer
*
pPeer
)
static
void
syncProcessFwdResponse
(
char
*
cont
,
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFwdRsp
*
pFwdRsp
=
(
SFwdRsp
*
)
cont
;
SFwdRsp
*
pFwdRsp
=
(
SFwdRsp
*
)
cont
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SFwdInfo
*
pFwdInfo
;
SFwdInfo
*
pFwdInfo
;
...
@@ -864,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
...
@@ -864,10 +845,8 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer)
}
}
}
}
static
void
syncProcessForwardFromPeer
(
char
*
cont
,
SSyncPeer
*
pPeer
)
{
static
void
syncProcessForwardFromPeer
(
char
*
cont
,
SSyncPeer
*
pPeer
)
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SWalHead
*
pHead
=
(
SWalHead
*
)
cont
;
SWalHead
*
pHead
=
(
SWalHead
*
)
cont
;
sDebug
(
"%s, forward is received, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
sDebug
(
"%s, forward is received, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
...
@@ -886,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
...
@@ -886,9 +865,8 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer)
return
;
return
;
}
}
static
void
syncProcessPeersStatusMsg
(
char
*
cont
,
SSyncPeer
*
pPeer
)
static
void
syncProcessPeersStatusMsg
(
char
*
cont
,
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SPeersStatus
*
pPeersStatus
=
(
SPeersStatus
*
)
cont
;
SPeersStatus
*
pPeersStatus
=
(
SPeersStatus
*
)
cont
;
sDebug
(
"%s, status msg received, self:%s ver:%"
PRIu64
" peer:%s ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
sDebug
(
"%s, status msg received, self:%s ver:%"
PRIu64
" peer:%s ver:%"
PRIu64
", ack:%d"
,
pPeer
->
id
,
...
@@ -911,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
...
@@ -911,10 +889,10 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
}
}
// head.len = htonl(head.len);
// head.len = htonl(head.len);
if
(
pHead
->
len
<
0
)
{
if
(
pHead
->
len
<
0
)
{
sError
(
"%s, invalid pkt length, len:%d"
,
pPeer
->
id
,
pHead
->
len
);
sError
(
"%s, invalid pkt length, len:%d"
,
pPeer
->
id
,
pHead
->
len
);
return
-
1
;
return
-
1
;
}
}
int
bytes
=
taosReadMsg
(
pPeer
->
peerFd
,
cont
,
pHead
->
len
);
int
bytes
=
taosReadMsg
(
pPeer
->
peerFd
,
cont
,
pHead
->
len
);
if
(
bytes
!=
pHead
->
len
)
{
if
(
bytes
!=
pHead
->
len
)
{
...
@@ -925,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
...
@@ -925,9 +903,8 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
return
0
;
return
0
;
}
}
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
{
SSyncPeer
*
pPeer
=
param
;
SSyncPeer
*
pPeer
=
param
;
SSyncHead
head
;
SSyncHead
head
;
char
*
cont
=
(
char
*
)
buffer
;
char
*
cont
=
(
char
*
)
buffer
;
...
@@ -955,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer)
...
@@ -955,8 +932,7 @@ static int syncProcessPeerMsg(void *param, void *buffer)
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
#define statusMsgLen sizeof(SSyncHead)+sizeof(SPeersStatus)+sizeof(SPeerStatus)*TAOS_SYNC_MAX_REPLICA
static
void
syncSendPeersStatusMsgToPeer
(
SSyncPeer
*
pPeer
,
char
ack
)
static
void
syncSendPeersStatusMsgToPeer
(
SSyncPeer
*
pPeer
,
char
ack
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
char
msg
[
statusMsgLen
]
=
{
0
};
char
msg
[
statusMsgLen
]
=
{
0
};
...
@@ -1013,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...
@@ -1013,7 +989,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
port
=
tsSyncPort
;
firstPkt
.
sourceId
=
pNode
->
vgId
;
// tell arbitrator its vgId
firstPkt
.
sourceId
=
pNode
->
vgId
;
// tell arbitrator its vgId
if
(
write
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
==
sizeof
(
firstPkt
))
{
if
(
write
(
connFd
,
&
firstPkt
,
sizeof
(
firstPkt
))
==
sizeof
(
firstPkt
))
{
sDebug
(
"%s, connection to peer server is setup"
,
pPeer
->
id
);
sDebug
(
"%s, connection to peer server is setup"
,
pPeer
->
id
);
pPeer
->
peerFd
=
connFd
;
pPeer
->
peerFd
=
connFd
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
...
@@ -1026,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
...
@@ -1026,8 +1002,7 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
}
}
}
}
static
void
syncCheckPeerConnection
(
void
*
param
,
void
*
tmrId
)
static
void
syncCheckPeerConnection
(
void
*
param
,
void
*
tmrId
)
{
{
SSyncPeer
*
pPeer
=
param
;
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
...
@@ -1039,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId)
...
@@ -1039,8 +1014,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId)
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
}
static
void
syncCreateRestoreDataThread
(
SSyncPeer
*
pPeer
)
static
void
syncCreateRestoreDataThread
(
SSyncPeer
*
pPeer
)
{
{
taosTmrStopA
(
&
pPeer
->
timer
);
taosTmrStopA
(
&
pPeer
->
timer
);
pthread_attr_t
thattr
;
pthread_attr_t
thattr
;
...
@@ -1061,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
...
@@ -1061,8 +1035,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer)
}
}
}
}
static
void
syncProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
static
void
syncProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
{
{
char
ipstr
[
24
];
char
ipstr
[
24
];
int
i
;
int
i
;
...
@@ -1139,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) {
...
@@ -1139,8 +1112,7 @@ static void syncProcessBrokenLink(void *param) {
syncDecNodeRef
(
pNode
);
syncDecNodeRef
(
pNode
);
}
}
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
)
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
)
{
{
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
uint64_t
time
=
taosGetTimestampMs
();
uint64_t
time
=
taosGetTimestampMs
();
...
@@ -1162,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
...
@@ -1162,8 +1134,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle)
sDebug
(
"vgId:%d, fwd info is saved, ver:%"
PRIu64
" fwds:%d "
,
pNode
->
vgId
,
version
,
pSyncFwds
->
fwds
);
sDebug
(
"vgId:%d, fwd info is saved, ver:%"
PRIu64
" fwds:%d "
,
pNode
->
vgId
,
version
,
pSyncFwds
->
fwds
);
}
}
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
)
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
)
{
{
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
int
fwds
=
pSyncFwds
->
fwds
;
int
fwds
=
pSyncFwds
->
fwds
;
...
@@ -1180,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
...
@@ -1180,8 +1151,7 @@ static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode)
}
}
}
}
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
)
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
)
{
{
int
confirm
=
0
;
int
confirm
=
0
;
if
(
pFwdInfo
->
code
==
0
)
pFwdInfo
->
code
=
code
;
if
(
pFwdInfo
->
code
==
0
)
pFwdInfo
->
code
=
code
;
...
@@ -1202,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
...
@@ -1202,8 +1172,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
}
}
}
}
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
)
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
)
{
{
SSyncNode
*
pNode
=
param
;
SSyncNode
*
pNode
=
param
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
uint64_t
time
=
taosGetTimestampMs
();
uint64_t
time
=
taosGetTimestampMs
();
...
@@ -1222,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId)
...
@@ -1222,6 +1191,3 @@ static void syncMonitorFwdInfos(void *param, void *tmrId)
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
pNode
,
syncTmrCtrl
);
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
pNode
,
syncTmrCtrl
);
}
}
src/sync/src/syncRestore.c
浏览文件 @
d9faca1c
...
@@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
...
@@ -28,7 +28,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
char
fname
[
TSDB_FILENAME_LEN
*
3
]
=
{
0
};
char
fname
[
TSDB_FILENAME_LEN
*
3
]
=
{
0
};
uint32_t
magic
;
uint32_t
magic
;
uint64_t
fversion
;
uint64_t
fversion
;
int
32
_t
size
;
int
64
_t
size
;
uint32_t
index
=
sindex
;
uint32_t
index
=
sindex
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
...
@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
...
@@ -48,8 +48,7 @@ static void syncRemoveExtraFile(SSyncPeer *pPeer, uint32_t sindex, uint32_t eind
}
}
}
}
static
int
syncRestoreFile
(
SSyncPeer
*
pPeer
,
uint64_t
*
fversion
)
static
int
syncRestoreFile
(
SSyncPeer
*
pPeer
,
uint64_t
*
fversion
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFileInfo
minfo
;
memset
(
&
minfo
,
0
,
sizeof
(
minfo
));
/* = {0}; */
// master file info
SFileInfo
minfo
;
memset
(
&
minfo
,
0
,
sizeof
(
minfo
));
/* = {0}; */
// master file info
SFileInfo
sinfo
;
memset
(
&
sinfo
,
0
,
sizeof
(
sinfo
));
/* = {0}; */
// slave file info
SFileInfo
sinfo
;
memset
(
&
sinfo
,
0
,
sizeof
(
sinfo
));
/* = {0}; */
// slave file info
...
@@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
...
@@ -113,7 +112,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
close
(
dfd
);
close
(
dfd
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
sDebug
(
"%s, %s is received, size:%
d"
,
pPeer
->
id
,
minfo
.
name
,
minfo
.
size
);
sDebug
(
"%s, %s is received, size:%
"
PRId64
,
pPeer
->
id
,
minfo
.
name
,
minfo
.
size
);
}
}
...
@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
...
@@ -130,8 +129,7 @@ static int syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion)
return
code
;
return
code
;
}
}
static
int
syncRestoreWal
(
SSyncPeer
*
pPeer
)
static
int
syncRestoreWal
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int
ret
,
code
=
-
1
;
int
ret
,
code
=
-
1
;
...
@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
...
@@ -172,8 +170,7 @@ static char *syncProcessOneBufferedFwd(SSyncPeer *pPeer, char *offset)
return
offset
;
return
offset
;
}
}
static
int
syncProcessBufferedFwd
(
SSyncPeer
*
pPeer
)
static
int
syncProcessBufferedFwd
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
int
forwards
=
0
;
int
forwards
=
0
;
...
@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer)
...
@@ -201,8 +198,7 @@ static int syncProcessBufferedFwd(SSyncPeer *pPeer)
return
pRecv
->
code
;
return
pRecv
->
code
;
}
}
int
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
)
int
syncSaveIntoBuffer
(
SSyncPeer
*
pPeer
,
SWalHead
*
pHead
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
SRecvBuffer
*
pRecv
=
pNode
->
pRecv
;
...
@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
...
@@ -222,8 +218,7 @@ int syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead)
return
pRecv
->
code
;
return
pRecv
->
code
;
}
}
static
void
syncCloseRecvBuffer
(
SSyncNode
*
pNode
)
static
void
syncCloseRecvBuffer
(
SSyncNode
*
pNode
)
{
{
if
(
pNode
->
pRecv
)
{
if
(
pNode
->
pRecv
)
{
taosTFree
(
pNode
->
pRecv
->
buffer
);
taosTFree
(
pNode
->
pRecv
->
buffer
);
}
}
...
@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode)
...
@@ -231,8 +226,7 @@ static void syncCloseRecvBuffer(SSyncNode *pNode)
taosTFree
(
pNode
->
pRecv
);
taosTFree
(
pNode
->
pRecv
);
}
}
static
int
syncOpenRecvBuffer
(
SSyncNode
*
pNode
)
static
int
syncOpenRecvBuffer
(
SSyncNode
*
pNode
)
{
{
syncCloseRecvBuffer
(
pNode
);
syncCloseRecvBuffer
(
pNode
);
SRecvBuffer
*
pRecv
=
calloc
(
sizeof
(
SRecvBuffer
),
1
);
SRecvBuffer
*
pRecv
=
calloc
(
sizeof
(
SRecvBuffer
),
1
);
...
@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode)
...
@@ -253,8 +247,7 @@ static int syncOpenRecvBuffer(SSyncNode *pNode)
return
0
;
return
0
;
}
}
static
int
syncRestoreDataStepByStep
(
SSyncPeer
*
pPeer
)
static
int
syncRestoreDataStepByStep
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
nodeSStatus
=
TAOS_SYNC_STATUS_FILE
;
nodeSStatus
=
TAOS_SYNC_STATUS_FILE
;
uint64_t
fversion
=
0
;
uint64_t
fversion
=
0
;
...
@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
...
@@ -292,10 +285,9 @@ static int syncRestoreDataStepByStep(SSyncPeer *pPeer)
return
0
;
return
0
;
}
}
void
*
syncRestoreData
(
void
*
param
)
void
*
syncRestoreData
(
void
*
param
)
{
{
SSyncPeer
*
pPeer
=
(
SSyncPeer
*
)
param
;
SSyncPeer
*
pPeer
=
(
SSyncPeer
*
)
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
taosBlockSIGPIPE
();
taosBlockSIGPIPE
();
__sync_fetch_and_add
(
&
tsSyncNum
,
1
);
__sync_fetch_and_add
(
&
tsSyncNum
,
1
);
...
@@ -326,4 +318,3 @@ void *syncRestoreData(void *param)
...
@@ -326,4 +318,3 @@ void *syncRestoreData(void *param)
return
NULL
;
return
NULL
;
}
}
src/sync/src/syncRetrieve.c
浏览文件 @
d9faca1c
...
@@ -27,11 +27,10 @@
...
@@ -27,11 +27,10 @@
#include "tsync.h"
#include "tsync.h"
#include "syncInt.h"
#include "syncInt.h"
static
int
syncAddIntoWatchList
(
SSyncPeer
*
pPeer
,
char
*
name
)
static
int
syncAddIntoWatchList
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
{
sDebug
(
"%s, start to monitor:%s"
,
pPeer
->
id
,
name
);
sDebug
(
"%s, start to monitor:%s"
,
pPeer
->
id
,
name
);
if
(
pPeer
->
notifyFd
<=
0
)
{
if
(
pPeer
->
notifyFd
<=
0
)
{
pPeer
->
watchNum
=
0
;
pPeer
->
watchNum
=
0
;
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
if
(
pPeer
->
notifyFd
<
0
)
{
if
(
pPeer
->
notifyFd
<
0
)
{
...
@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
...
@@ -70,9 +69,8 @@ static int syncAddIntoWatchList(SSyncPeer *pPeer, char *name)
return
0
;
return
0
;
}
}
static
int
syncAreFilesModified
(
SSyncPeer
*
pPeer
)
static
int
syncAreFilesModified
(
SSyncPeer
*
pPeer
)
{
{
if
(
pPeer
->
notifyFd
<=
0
)
return
0
;
if
(
pPeer
->
notifyFd
<=
0
)
return
0
;
char
buf
[
2048
];
char
buf
[
2048
];
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
...
@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
...
@@ -96,12 +94,11 @@ static int syncAreFilesModified(SSyncPeer *pPeer)
}
}
}
}
return
code
;
return
code
;
}
}
static
int
syncRetrieveFile
(
SSyncPeer
*
pPeer
)
static
int
syncRetrieveFile
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFileInfo
fileInfo
;
SFileInfo
fileInfo
;
SFileAck
fileAck
;
SFileAck
fileAck
;
int
code
=
-
1
;
int
code
=
-
1
;
...
@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
...
@@ -128,7 +125,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// wait for the ack from peer
// wait for the ack from peer
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
fileAck
),
sizeof
(
fileAck
));
ret
=
taosReadMsg
(
pPeer
->
syncFd
,
&
(
fileAck
),
sizeof
(
fileAck
));
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
// set the peer sync version
// set the peer sync version
pPeer
->
sversion
=
fileInfo
.
fversion
;
pPeer
->
sversion
=
fileInfo
.
fversion
;
...
@@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
...
@@ -148,13 +145,13 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
// send the file to peer
// send the file to peer
int
sfd
=
open
(
name
,
O_RDONLY
);
int
sfd
=
open
(
name
,
O_RDONLY
);
if
(
sfd
<
0
)
break
;
if
(
sfd
<
0
)
break
;
ret
=
taosTSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
ret
=
taosTSendFile
(
pPeer
->
syncFd
,
sfd
,
NULL
,
fileInfo
.
size
);
close
(
sfd
);
close
(
sfd
);
if
(
ret
<
0
)
break
;
if
(
ret
<
0
)
break
;
sDebug
(
"%s, %s is sent, size:%
d"
,
pPeer
->
id
,
name
,
fileInfo
.
size
);
sDebug
(
"%s, %s is sent, size:%
"
PRId64
,
pPeer
->
id
,
name
,
fileInfo
.
size
);
fileInfo
.
index
++
;
fileInfo
.
index
++
;
// check if processed files are modified
// check if processed files are modified
...
@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
...
@@ -170,8 +167,7 @@ static int syncRetrieveFile(SSyncPeer *pPeer)
/* if only a partial record is read out, set the IN_MODIFY flag in event,
/* if only a partial record is read out, set the IN_MODIFY flag in event,
so upper layer will reload the file to get a complete record */
so upper layer will reload the file to get a complete record */
static
int
syncReadOneWalRecord
(
int
sfd
,
SWalHead
*
pHead
,
uint32_t
*
pEvent
)
static
int
syncReadOneWalRecord
(
int
sfd
,
SWalHead
*
pHead
,
uint32_t
*
pEvent
)
{
{
int
ret
;
int
ret
;
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
ret
=
read
(
sfd
,
pHead
,
sizeof
(
SWalHead
));
...
@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
...
@@ -185,7 +181,7 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
}
}
ret
=
read
(
sfd
,
pHead
->
cont
,
pHead
->
len
);
ret
=
read
(
sfd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
<
0
)
return
-
1
;
if
(
ret
<
0
)
return
-
1
;
if
(
ret
!=
pHead
->
len
)
{
if
(
ret
!=
pHead
->
len
)
{
// file is not at end yet, it shall be reloaded
// file is not at end yet, it shall be reloaded
...
@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
...
@@ -194,10 +190,9 @@ static int syncReadOneWalRecord(int sfd, SWalHead *pHead, uint32_t *pEvent)
}
}
return
sizeof
(
SWalHead
)
+
pHead
->
len
;
return
sizeof
(
SWalHead
)
+
pHead
->
len
;
}
}
static
int
syncMonitorLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
)
static
int
syncMonitorLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
)
{
{
pPeer
->
watchNum
=
0
;
pPeer
->
watchNum
=
0
;
taosClose
(
pPeer
->
notifyFd
);
taosClose
(
pPeer
->
notifyFd
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
pPeer
->
notifyFd
=
inotify_init1
(
IN_NONBLOCK
);
...
@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
...
@@ -221,18 +216,17 @@ static int syncMonitorLastWal(SSyncPeer *pPeer, char *name)
return
-
1
;
return
-
1
;
}
}
return
0
;
return
0
;
}
}
static
uint32_t
syncCheckLastWalChanges
(
SSyncPeer
*
pPeer
,
uint32_t
*
pEvent
)
static
uint32_t
syncCheckLastWalChanges
(
SSyncPeer
*
pPeer
,
uint32_t
*
pEvent
)
{
{
char
buf
[
2048
];
char
buf
[
2048
];
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
int
len
=
read
(
pPeer
->
notifyFd
,
buf
,
sizeof
(
buf
));
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
if
(
len
<
0
&&
errno
!=
EAGAIN
)
{
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
sError
(
"%s, failed to read notify FD(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
return
-
1
;
return
-
1
;
}
}
if
(
len
==
0
)
return
0
;
if
(
len
==
0
)
return
0
;
struct
inotify_event
*
event
;
struct
inotify_event
*
event
;
...
@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
...
@@ -248,8 +242,7 @@ static uint32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent)
return
0
;
return
0
;
}
}
static
int
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
,
uint32_t
*
pEvent
)
static
int
syncRetrieveLastWal
(
SSyncPeer
*
pPeer
,
char
*
name
,
uint64_t
fversion
,
int64_t
offset
,
uint32_t
*
pEvent
)
{
{
SWalHead
*
pHead
=
(
SWalHead
*
)
malloc
(
640000
);
SWalHead
*
pHead
=
(
SWalHead
*
)
malloc
(
640000
);
int
code
=
-
1
;
int
code
=
-
1
;
int32_t
bytes
=
0
;
int32_t
bytes
=
0
;
...
@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
...
@@ -261,9 +254,12 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
sDebug
(
"%s, retrieve last wal, offset:%"
PRId64
" fversion:%"
PRIu64
,
pPeer
->
id
,
offset
,
fversion
);
sDebug
(
"%s, retrieve last wal, offset:%"
PRId64
" fversion:%"
PRIu64
,
pPeer
->
id
,
offset
,
fversion
);
while
(
1
)
{
while
(
1
)
{
int
wsize
=
syncReadOneWalRecord
(
sfd
,
pHead
,
pEvent
);
int
wsize
=
syncReadOneWalRecord
(
sfd
,
pHead
,
pEvent
);
if
(
wsize
<
0
)
break
;
if
(
wsize
<
0
)
break
;
if
(
wsize
==
0
)
{
code
=
0
;
break
;
}
if
(
wsize
==
0
)
{
code
=
0
;
break
;
}
sDebug
(
"%s, last wal is forwarded, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
sDebug
(
"%s, last wal is forwarded, ver:%"
PRIu64
,
pPeer
->
id
,
pHead
->
version
);
int
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
int
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
pHead
,
wsize
);
...
@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
...
@@ -286,8 +282,7 @@ static int syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversion,
return
-
1
;
return
-
1
;
}
}
static
int
syncProcessLastWal
(
SSyncPeer
*
pPeer
,
char
*
wname
,
uint32_t
index
)
static
int
syncProcessLastWal
(
SSyncPeer
*
pPeer
,
char
*
wname
,
uint32_t
index
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
int
code
=
-
1
;
int
code
=
-
1
;
char
fname
[
TSDB_FILENAME_LEN
*
2
];
// full path to wal file
char
fname
[
TSDB_FILENAME_LEN
*
2
];
// full path to wal file
...
@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
...
@@ -350,12 +345,16 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
}
}
if
(
code
<
0
)
break
;
if
(
code
<
0
)
break
;
if
(
pPeer
->
sversion
>=
fversion
&&
fversion
>
0
)
break
;
if
(
pPeer
->
sversion
>=
fversion
&&
fversion
>
0
)
break
;
index
++
;
wname
[
0
]
=
0
;
index
++
;
wname
[
0
]
=
0
;
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
ahandle
,
wname
,
&
index
);
code
=
(
*
pNode
->
getWalInfo
)(
pNode
->
ahandle
,
wname
,
&
index
);
if
(
code
<
0
)
break
;
if
(
code
<
0
)
break
;
if
(
wname
[
0
]
==
0
)
{
code
=
0
;
break
;}
if
(
wname
[
0
]
==
0
)
{
code
=
0
;
break
;
}
// current last wal is closed, there is a new one
// current last wal is closed, there is a new one
sDebug
(
"%s, last wal is closed, try new one"
,
pPeer
->
id
);
sDebug
(
"%s, last wal is closed, try new one"
,
pPeer
->
id
);
...
@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
...
@@ -366,9 +365,8 @@ static int syncProcessLastWal(SSyncPeer *pPeer, char *wname, uint32_t index)
return
code
;
return
code
;
}
}
static
int
syncRetrieveWal
(
SSyncPeer
*
pPeer
)
static
int
syncRetrieveWal
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
char
fname
[
TSDB_FILENAME_LEN
*
3
];
char
fname
[
TSDB_FILENAME_LEN
*
3
];
char
wname
[
TSDB_FILENAME_LEN
*
2
];
char
wname
[
TSDB_FILENAME_LEN
*
2
];
int32_t
size
;
int32_t
size
;
...
@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
...
@@ -396,7 +394,7 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
// send wal file,
// send wal file,
// inotify is not required, old wal file won't be modified, even remove is ok
// inotify is not required, old wal file won't be modified, even remove is ok
if
(
stat
(
fname
,
&
fstat
)
<
0
)
break
;
if
(
stat
(
fname
,
&
fstat
)
<
0
)
break
;
size
=
fstat
.
st_size
;
size
=
fstat
.
st_size
;
sDebug
(
"%s, retrieve wal:%s size:%d"
,
pPeer
->
id
,
fname
,
size
);
sDebug
(
"%s, retrieve wal:%s size:%d"
,
pPeer
->
id
,
fname
,
size
);
...
@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
...
@@ -425,9 +423,8 @@ static int syncRetrieveWal(SSyncPeer *pPeer)
return
code
;
return
code
;
}
}
static
int
syncRetrieveDataStepByStep
(
SSyncPeer
*
pPeer
)
static
int
syncRetrieveDataStepByStep
(
SSyncPeer
*
pPeer
)
{
{
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SFirstPkt
firstPkt
;
SFirstPkt
firstPkt
;
memset
(
&
firstPkt
,
0
,
sizeof
(
firstPkt
));
memset
(
&
firstPkt
,
0
,
sizeof
(
firstPkt
));
...
@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
...
@@ -462,9 +459,8 @@ static int syncRetrieveDataStepByStep(SSyncPeer *pPeer)
return
0
;
return
0
;
}
}
void
*
syncRetrieveData
(
void
*
param
)
void
*
syncRetrieveData
(
void
*
param
)
{
{
SSyncPeer
*
pPeer
=
(
SSyncPeer
*
)
param
;
SSyncPeer
*
pPeer
=
(
SSyncPeer
*
)
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
taosBlockSIGPIPE
();
taosBlockSIGPIPE
();
...
...
src/sync/src/taosTcpPool.c
浏览文件 @
d9faca1c
...
@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param);
...
@@ -48,8 +48,7 @@ static void *taosProcessTcpData(void *param);
static
SThreadObj
*
taosGetTcpThread
(
SPoolObj
*
pPool
);
static
SThreadObj
*
taosGetTcpThread
(
SPoolObj
*
pPool
);
static
void
taosStopPoolThread
(
SThreadObj
*
pThread
);
static
void
taosStopPoolThread
(
SThreadObj
*
pThread
);
void
*
taosOpenTcpThreadPool
(
SPoolInfo
*
pInfo
)
void
*
taosOpenTcpThreadPool
(
SPoolInfo
*
pInfo
)
{
{
pthread_attr_t
thattr
;
pthread_attr_t
thattr
;
SPoolObj
*
pPool
=
calloc
(
sizeof
(
SPoolObj
),
1
);
SPoolObj
*
pPool
=
calloc
(
sizeof
(
SPoolObj
),
1
);
...
@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
...
@@ -89,8 +88,7 @@ void *taosOpenTcpThreadPool(SPoolInfo *pInfo)
return
pPool
;
return
pPool
;
}
}
void
taosCloseTcpThreadPool
(
void
*
param
)
void
taosCloseTcpThreadPool
(
void
*
param
)
{
{
SPoolObj
*
pPool
=
(
SPoolObj
*
)
param
;
SPoolObj
*
pPool
=
(
SPoolObj
*
)
param
;
SThreadObj
*
pThread
;
SThreadObj
*
pThread
;
...
@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param)
...
@@ -107,8 +105,7 @@ void taosCloseTcpThreadPool(void *param)
uDebug
(
"%p TCP pool is closed"
,
pPool
);
uDebug
(
"%p TCP pool is closed"
,
pPool
);
}
}
void
*
taosAllocateTcpConn
(
void
*
param
,
void
*
pPeer
,
int
connFd
)
void
*
taosAllocateTcpConn
(
void
*
param
,
void
*
pPeer
,
int
connFd
)
{
{
struct
epoll_event
event
;
struct
epoll_event
event
;
SPoolObj
*
pPool
=
(
SPoolObj
*
)
param
;
SPoolObj
*
pPool
=
(
SPoolObj
*
)
param
;
...
@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
...
@@ -145,9 +142,8 @@ void *taosAllocateTcpConn(void *param, void *pPeer, int connFd)
return
pConn
;
return
pConn
;
}
}
void
taosFreeTcpConn
(
void
*
param
)
void
taosFreeTcpConn
(
void
*
param
)
{
{
SConnObj
*
pConn
=
(
SConnObj
*
)
param
;
SConnObj
*
pConn
=
(
SConnObj
*
)
param
;
SThreadObj
*
pThread
=
pConn
->
pThread
;
SThreadObj
*
pThread
=
pConn
->
pThread
;
uDebug
(
"%p TCP connection will be closed, fd:%d"
,
pThread
,
pConn
->
fd
);
uDebug
(
"%p TCP connection will be closed, fd:%d"
,
pThread
,
pConn
->
fd
);
...
...
src/sync/src/tarbitrator.c
浏览文件 @
d9faca1c
...
@@ -152,9 +152,8 @@ static void arbProcessBrokenLink(void *param) {
...
@@ -152,9 +152,8 @@ static void arbProcessBrokenLink(void *param) {
taosTFree
(
pNode
);
taosTFree
(
pNode
);
}
}
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
{
SNodeConn
*
pNode
=
param
;
SNodeConn
*
pNode
=
param
;
SSyncHead
head
;
SSyncHead
head
;
int
bytes
=
0
;
int
bytes
=
0
;
char
*
cont
=
(
char
*
)
buffer
;
char
*
cont
=
(
char
*
)
buffer
;
...
@@ -176,7 +175,6 @@ static int arbProcessPeerMsg(void *param, void *buffer)
...
@@ -176,7 +175,6 @@ static int arbProcessPeerMsg(void *param, void *buffer)
}
}
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
)
{
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
)
{
struct
sigaction
act
=
{{
0
}};
struct
sigaction
act
=
{{
0
}};
act
.
sa_handler
=
SIG_IGN
;
act
.
sa_handler
=
SIG_IGN
;
sigaction
(
SIGTERM
,
&
act
,
NULL
);
sigaction
(
SIGTERM
,
&
act
,
NULL
);
...
@@ -188,4 +186,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
...
@@ -188,4 +186,3 @@ static void arbSignalHandler(int32_t signum, siginfo_t *sigInfo, void *context)
// inform main thread to exit
// inform main thread to exit
tsem_post
(
&
tsArbSem
);
tsem_post
(
&
tsArbSem
);
}
}
src/sync/test/syncServer.c
浏览文件 @
d9faca1c
...
@@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
...
@@ -234,7 +234,7 @@ void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
taosWriteQitem
(
qhandle
,
TAOS_QTYPE_RPC
,
pTemp
);
taosWriteQitem
(
qhandle
,
TAOS_QTYPE_RPC
,
pTemp
);
}
}
uint32_t
getFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
,
uint64_t
*
fversion
)
uint32_t
getFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
,
uint64_t
*
fversion
)
{
{
uint32_t
magic
;
uint32_t
magic
;
struct
stat
fstat
;
struct
stat
fstat
;
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
d9faca1c
...
@@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
...
@@ -475,7 +475,7 @@ int tsdbUpdateFileHeader(SFile* pFile);
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
);
int
tsdbEncodeSFileInfo
(
void
**
buf
,
const
STsdbFileInfo
*
pInfo
);
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
);
void
*
tsdbDecodeSFileInfo
(
void
*
buf
,
STsdbFileInfo
*
pInfo
);
void
tsdbRemoveFileGroup
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pFGroup
);
void
tsdbRemoveFileGroup
(
STsdbRepo
*
pRepo
,
SFileGroup
*
pFGroup
);
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int
32
_t
*
size
);
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int
64
_t
*
size
);
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
void
tsdbGetFidKeyRange
(
int
daysPerFile
,
int8_t
precision
,
int
fileId
,
TSKEY
*
minKey
,
TSKEY
*
maxKey
);
// ------------------ tsdbRWHelper.c
// ------------------ tsdbRWHelper.c
...
...
src/tsdb/src/tsdbFile.c
浏览文件 @
d9faca1c
...
@@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
...
@@ -424,7 +424,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
}
}
}
}
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int
32
_t
*
size
)
{
void
tsdbGetFileInfoImpl
(
char
*
fname
,
uint32_t
*
magic
,
int
64
_t
*
size
)
{
char
buf
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
char
buf
[
TSDB_FILE_HEAD_SIZE
]
=
"
\0
"
;
uint32_t
version
=
0
;
uint32_t
version
=
0
;
STsdbFileInfo
info
=
{
0
};
STsdbFileInfo
info
=
{
0
};
...
@@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) {
...
@@ -445,7 +445,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int32_t *size) {
close
(
fd
);
close
(
fd
);
*
magic
=
info
.
magic
;
*
magic
=
info
.
magic
;
*
size
=
(
int32_t
)
offset
;
*
size
=
offset
;
return
;
return
;
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
d9faca1c
...
@@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
...
@@ -212,7 +212,7 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
return
0
;
return
0
;
}
}
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
)
{
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
// STsdbMeta *pMeta = pRepo->tsdbMeta;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
STsdbFileH
*
pFileH
=
pRepo
->
tsdbFileH
;
...
...
src/util/inc/tkvstore.h
浏览文件 @
d9faca1c
...
@@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore);
...
@@ -58,7 +58,7 @@ int tdKVStoreStartCommit(SKVStore *pStore);
int
tdUpdateKVStoreRecord
(
SKVStore
*
pStore
,
uint64_t
uid
,
void
*
cont
,
int
contLen
);
int
tdUpdateKVStoreRecord
(
SKVStore
*
pStore
,
uint64_t
uid
,
void
*
cont
,
int
contLen
);
int
tdDropKVStoreRecord
(
SKVStore
*
pStore
,
uint64_t
uid
);
int
tdDropKVStoreRecord
(
SKVStore
*
pStore
,
uint64_t
uid
);
int
tdKVStoreEndCommit
(
SKVStore
*
pStore
);
int
tdKVStoreEndCommit
(
SKVStore
*
pStore
);
void
tsdbGetStoreInfo
(
char
*
fname
,
uint32_t
*
magic
,
int
32
_t
*
size
);
void
tsdbGetStoreInfo
(
char
*
fname
,
uint32_t
*
magic
,
int
64
_t
*
size
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
src/util/src/tkvstore.c
浏览文件 @
d9faca1c
...
@@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
...
@@ -332,7 +332,7 @@ int tdKVStoreEndCommit(SKVStore *pStore) {
return
0
;
return
0
;
}
}
void
tsdbGetStoreInfo
(
char
*
fname
,
uint32_t
*
magic
,
int
32
_t
*
size
)
{
void
tsdbGetStoreInfo
(
char
*
fname
,
uint32_t
*
magic
,
int
64
_t
*
size
)
{
char
buf
[
TD_KVSTORE_HEADER_SIZE
]
=
"
\0
"
;
char
buf
[
TD_KVSTORE_HEADER_SIZE
]
=
"
\0
"
;
SStoreInfo
info
=
{
0
};
SStoreInfo
info
=
{
0
};
...
@@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) {
...
@@ -349,7 +349,7 @@ void tsdbGetStoreInfo(char *fname, uint32_t *magic, int32_t *size) {
close
(
fd
);
close
(
fd
);
*
magic
=
info
.
magic
;
*
magic
=
info
.
magic
;
*
size
=
(
int32_t
)
offset
;
*
size
=
offset
;
return
;
return
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
d9faca1c
...
@@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
...
@@ -41,7 +41,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static
int32_t
vnodeSaveVersion
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeSaveVersion
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeReadVersion
(
SVnodeObj
*
pVnode
);
static
int32_t
vnodeReadVersion
(
SVnodeObj
*
pVnode
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
);
static
int
vnodeProcessTsdbStatus
(
void
*
arg
,
int
status
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
,
uint64_t
*
fversion
);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
,
uint64_t
*
fversion
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeCtrlFlow
(
void
*
handle
,
int32_t
mseconds
);
static
void
vnodeCtrlFlow
(
void
*
handle
,
int32_t
mseconds
);
...
@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
...
@@ -290,6 +290,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
sync
=
syncStart
(
&
syncInfo
);
pVnode
->
sync
=
syncStart
(
&
syncInfo
);
if
(
pVnode
->
sync
==
NULL
)
{
if
(
pVnode
->
sync
==
NULL
)
{
vError
(
"vgId:%d, failed to open sync module, replica:%d reason:%s"
,
pVnode
->
vgId
,
pVnode
->
syncCfg
.
replica
,
tstrerror
(
terrno
));
vnodeCleanUp
(
pVnode
);
vnodeCleanUp
(
pVnode
);
return
terrno
;
return
terrno
;
}
}
...
@@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
...
@@ -536,7 +538,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status) {
return
0
;
return
0
;
}
}
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
32
_t
*
size
,
uint64_t
*
fversion
)
{
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int
64
_t
*
size
,
uint64_t
*
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
SVnodeObj
*
pVnode
=
ahandle
;
*
fversion
=
pVnode
->
fversion
;
*
fversion
=
pVnode
->
fversion
;
return
tsdbGetFileInfo
(
pVnode
->
tsdb
,
name
,
index
,
eindex
,
size
);
return
tsdbGetFileInfo
(
pVnode
->
tsdb
,
name
,
index
,
eindex
,
size
);
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录