Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
24c5f02c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
24c5f02c
编写于
9月 27, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1617
上级
251fdf2c
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
47 addition
and
48 deletion
+47
-48
src/sync/inc/syncInt.h
src/sync/inc/syncInt.h
+20
-20
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+1
-2
src/sync/src/tarbitrator.c
src/sync/src/tarbitrator.c
+26
-26
未找到文件。
src/sync/inc/syncInt.h
浏览文件 @
24c5f02c
...
@@ -114,26 +114,26 @@ typedef struct {
...
@@ -114,26 +114,26 @@ typedef struct {
}
SSyncFwds
;
}
SSyncFwds
;
typedef
struct
SsyncPeer
{
typedef
struct
SsyncPeer
{
int32_t
nodeId
;
int32_t
nodeId
;
uint32_t
ip
;
uint32_t
ip
;
uint16_t
port
;
uint16_t
port
;
char
fqdn
[
TSDB_FQDN_LEN
];
// peer ip string
char
fqdn
[
TSDB_FQDN_LEN
];
// peer ip string
char
id
[
TSDB_EP_LEN
+
16
];
// peer vgId + end point
char
id
[
TSDB_EP_LEN
+
32
];
// peer vgId + end point
int8_t
role
;
int8_t
role
;
int8_t
sstatus
;
// sync status
int8_t
sstatus
;
// sync status
uint64_t
version
;
uint64_t
version
;
uint64_t
sversion
;
// track the peer version in retrieve process
uint64_t
sversion
;
// track the peer version in retrieve process
int
syncFd
;
int
syncFd
;
int
peerFd
;
// forward FD
int
peerFd
;
// forward FD
int
numOfRetrieves
;
// number of retrieves tried
int
numOfRetrieves
;
// number of retrieves tried
int
fileChanged
;
// a flag to indicate file is changed during retrieving process
int
fileChanged
;
// a flag to indicate file is changed during retrieving process
void
*
timer
;
void
*
timer
;
void
*
pConn
;
void
*
pConn
;
int
notifyFd
;
int
notifyFd
;
int
watchNum
;
int
watchNum
;
int
*
watchFd
;
int
*
watchFd
;
int8_t
refCount
;
// reference count
int8_t
refCount
;
// reference count
struct
SSyncNode
*
pSyncNode
;
struct
SSyncNode
*
pSyncNode
;
}
SSyncPeer
;
}
SSyncPeer
;
typedef
struct
SSyncNode
{
typedef
struct
SSyncNode
{
...
...
src/sync/src/syncMain.c
浏览文件 @
24c5f02c
...
@@ -671,7 +671,6 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
...
@@ -671,7 +671,6 @@ static void syncCheckRole(SSyncPeer *pPeer, SPeerStatus peersStatus[], int8_t ne
int8_t
selfOldRole
=
nodeRole
;
int8_t
selfOldRole
=
nodeRole
;
int8_t
i
,
syncRequired
=
0
;
int8_t
i
,
syncRequired
=
0
;
pNode
->
peerInfo
[
pNode
->
selfIndex
]
->
version
=
nodeVersion
;
pPeer
->
role
=
newRole
;
pPeer
->
role
=
newRole
;
sDebug
(
"%s, own role:%s, new peer role:%s"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
syncRole
[
pPeer
->
role
]);
sDebug
(
"%s, own role:%s, new peer role:%s"
,
pPeer
->
id
,
syncRole
[
nodeRole
],
syncRole
[
pPeer
->
role
]);
...
@@ -923,7 +922,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
...
@@ -923,7 +922,7 @@ static int syncReadPeerMsg(SSyncPeer *pPeer, SSyncHead *pHead, char *cont) {
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
static
int
syncProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
SSyncPeer
*
pPeer
=
param
;
SSyncPeer
*
pPeer
=
param
;
SSyncHead
head
;
SSyncHead
head
;
char
*
cont
=
(
char
*
)
buffer
;
char
*
cont
=
buffer
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
...
...
src/sync/src/tarbitrator.c
浏览文件 @
24c5f02c
...
@@ -27,29 +27,29 @@
...
@@ -27,29 +27,29 @@
#include "tsync.h"
#include "tsync.h"
#include "syncInt.h"
#include "syncInt.h"
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
static
void
arbSignalHandler
(
int32_t
signum
,
siginfo_t
*
sigInfo
,
void
*
context
);
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
);
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
);
static
void
arbProcessBrokenLink
(
void
*
param
);
static
void
arbProcessBrokenLink
(
void
*
param
);
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
);
static
tsem_t
tsArbSem
;
static
tsem_t
tsArbSem
;
static
ttpool_h
tsArbTcpPool
;
static
ttpool_h
tsArbTcpPool
;
typedef
struct
{
typedef
struct
{
char
id
[
TSDB_EP_LEN
+
24
];
char
id
[
TSDB_EP_LEN
+
24
];
int
nodeFd
;
int
nodeFd
;
void
*
pConn
;
void
*
pConn
;
}
SNodeConn
;
}
SNodeConn
;
int
main
(
int
argc
,
char
*
argv
[])
{
int
main
(
int
argc
,
char
*
argv
[])
{
char
arbLogPath
[
TSDB_FILENAME_LEN
+
16
]
=
{
0
};
char
arbLogPath
[
TSDB_FILENAME_LEN
+
16
]
=
{
0
};
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
tsArbitratorPort
=
atoi
(
argv
[
++
i
]);
tsArbitratorPort
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
debugFlag
=
atoi
(
argv
[
++
i
]);
debugFlag
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
&&
i
<
argc
-
1
)
{
}
else
if
(
strcmp
(
argv
[
i
],
"-g"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strlen
(
argv
[
++
i
])
>
TSDB_FILENAME_LEN
)
continue
;
if
(
strlen
(
argv
[
++
i
])
>
TSDB_FILENAME_LEN
)
continue
;
tstrncpy
(
arbLogPath
,
argv
[
i
],
sizeof
(
arbLogPath
));
tstrncpy
(
arbLogPath
,
argv
[
i
],
sizeof
(
arbLogPath
));
}
else
{
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
...
@@ -62,8 +62,8 @@ int main(int argc, char *argv[]) {
...
@@ -62,8 +62,8 @@ int main(int argc, char *argv[]) {
}
}
sDebugFlag
=
debugFlag
;
sDebugFlag
=
debugFlag
;
if
(
tsem_init
(
&
tsArbSem
,
0
,
0
)
!=
0
)
{
if
(
tsem_init
(
&
tsArbSem
,
0
,
0
)
!=
0
)
{
printf
(
"failed to create exit semphore
\n
"
);
printf
(
"failed to create exit semphore
\n
"
);
exit
(
EXIT_FAILURE
);
exit
(
EXIT_FAILURE
);
}
}
...
@@ -91,10 +91,10 @@ int main(int argc, char *argv[]) {
...
@@ -91,10 +91,10 @@ int main(int argc, char *argv[]) {
info
.
processIncomingMsg
=
arbProcessPeerMsg
;
info
.
processIncomingMsg
=
arbProcessPeerMsg
;
info
.
processIncomingConn
=
arbProcessIncommingConnection
;
info
.
processIncomingConn
=
arbProcessIncommingConnection
;
tsArbTcpPool
=
taosOpenTcpThreadPool
(
&
info
);
tsArbTcpPool
=
taosOpenTcpThreadPool
(
&
info
);
if
(
tsArbTcpPool
==
NULL
)
{
if
(
tsArbTcpPool
==
NULL
)
{
sDebug
(
"failed to open TCP thread pool, exit..."
);
sDebug
(
"failed to open TCP thread pool, exit..."
);
return
-
1
;
return
-
1
;
}
}
sInfo
(
"TAOS arbitrator: %s:%d is running"
,
tsNodeFqdn
,
tsArbitratorPort
);
sInfo
(
"TAOS arbitrator: %s:%d is running"
,
tsNodeFqdn
,
tsArbitratorPort
);
...
@@ -108,9 +108,8 @@ int main(int argc, char *argv[]) {
...
@@ -108,9 +108,8 @@ int main(int argc, char *argv[]) {
return
0
;
return
0
;
}
}
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
static
void
arbProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
)
{
{
char
ipstr
[
24
];
char
ipstr
[
24
];
tinet_ntoa
(
ipstr
,
sourceIp
);
tinet_ntoa
(
ipstr
,
sourceIp
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
sDebug
(
"peer TCP connection from ip:%s"
,
ipstr
);
...
@@ -121,15 +120,16 @@ static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
...
@@ -121,15 +120,16 @@ static void arbProcessIncommingConnection(int connFd, uint32_t sourceIp)
return
;
return
;
}
}
SNodeConn
*
pNode
=
(
SNodeConn
*
)
calloc
(
sizeof
(
SNodeConn
),
1
);
SNodeConn
*
pNode
=
(
SNodeConn
*
)
calloc
(
sizeof
(
SNodeConn
),
1
);
if
(
pNode
==
NULL
)
{
if
(
pNode
==
NULL
)
{
sError
(
"failed to allocate memory(%s)"
,
strerror
(
errno
));
sError
(
"failed to allocate memory(%s)"
,
strerror
(
errno
));
taosCloseSocket
(
connFd
);
taosCloseSocket
(
connFd
);
return
;
return
;
}
}
snprintf
(
pNode
->
id
,
sizeof
(
pNode
->
id
),
"vgId:%d peer:%s:%d"
,
firstPkt
.
sourceId
,
firstPkt
.
fqdn
,
firstPkt
.
port
);
firstPkt
.
fqdn
[
sizeof
(
firstPkt
.
fqdn
)
-
1
]
=
0
;
if
(
firstPkt
.
syncHead
.
vgId
)
{
snprintf
(
pNode
->
id
,
sizeof
(
pNode
->
id
),
"vgId:%d peer:%s:%d"
,
firstPkt
.
sourceId
,
firstPkt
.
fqdn
,
firstPkt
.
port
);
if
(
firstPkt
.
syncHead
.
vgId
)
{
sDebug
(
"%s, vgId in head is not zero, close the connection"
,
pNode
->
id
);
sDebug
(
"%s, vgId in head is not zero, close the connection"
,
pNode
->
id
);
taosTFree
(
pNode
);
taosTFree
(
pNode
);
taosCloseSocket
(
connFd
);
taosCloseSocket
(
connFd
);
...
@@ -151,10 +151,10 @@ static void arbProcessBrokenLink(void *param) {
...
@@ -151,10 +151,10 @@ static void arbProcessBrokenLink(void *param) {
}
}
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
static
int
arbProcessPeerMsg
(
void
*
param
,
void
*
buffer
)
{
SNodeConn
*
pNode
=
param
;
SNodeConn
*
pNode
=
param
;
SSyncHead
head
;
SSyncHead
head
;
int
bytes
=
0
;
int
bytes
=
0
;
char
*
cont
=
(
char
*
)
buffer
;
char
*
cont
=
(
char
*
)
buffer
;
int
hlen
=
taosReadMsg
(
pNode
->
nodeFd
,
&
head
,
sizeof
(
head
));
int
hlen
=
taosReadMsg
(
pNode
->
nodeFd
,
&
head
,
sizeof
(
head
));
if
(
hlen
!=
sizeof
(
head
))
{
if
(
hlen
!=
sizeof
(
head
))
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录