Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
991cbc7d
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
991cbc7d
编写于
12月 04, 2020
作者:
H
huili
提交者:
GitHub
12月 04, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4439 from taosdata/feature/wal
TD-2331
上级
46250366
f538fe33
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
158 addition
and
119 deletion
+158
-119
src/dnode/inc/dnodeMInfos.h
src/dnode/inc/dnodeMInfos.h
+3
-3
src/dnode/src/dnodeMInfos.c
src/dnode/src/dnodeMInfos.c
+29
-29
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+2
-2
src/dnode/src/dnodeModule.c
src/dnode/src/dnodeModule.c
+3
-3
src/inc/dnode.h
src/inc/dnode.h
+1
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+7
-7
src/mnode/inc/mnodeMnode.h
src/mnode/inc/mnodeMnode.h
+1
-1
src/mnode/inc/mnodeSdb.h
src/mnode/inc/mnodeSdb.h
+1
-0
src/mnode/src/mnodeMnode.c
src/mnode/src/mnodeMnode.c
+92
-63
src/mnode/src/mnodeSdb.c
src/mnode/src/mnodeSdb.c
+17
-8
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+2
-2
未找到文件。
src/dnode/inc/dnodeMInfos.h
浏览文件 @
991cbc7d
...
...
@@ -24,9 +24,9 @@ extern "C" {
int32_t
dnodeInitMInfos
();
void
dnodeCleanupMInfos
();
void
dnodeUpdateMInfos
(
SM
nodeInfos
*
m
infos
);
void
dnodeUpdateEpSetForPeer
(
SRpcEpSet
*
e
pSet
);
void
dnodeGetMInfos
(
SM
nodeInfos
*
m
infos
);
void
dnodeUpdateMInfos
(
SM
Infos
*
pM
infos
);
void
dnodeUpdateEpSetForPeer
(
SRpcEpSet
*
pE
pSet
);
void
dnodeGetMInfos
(
SM
Infos
*
pM
infos
);
bool
dnodeIsMasterEp
(
char
*
ep
);
#ifdef __cplusplus
...
...
src/dnode/src/dnodeMInfos.c
浏览文件 @
991cbc7d
...
...
@@ -22,12 +22,12 @@
#include "dnodeInt.h"
#include "dnodeMInfos.h"
static
SM
nodeInfos
tsMInfos
;
static
SRpcEpSet
tsMEpSet
;
static
SM
Infos
tsMInfos
;
static
SRpcEpSet
tsMEpSet
;
static
pthread_mutex_t
tsMInfosMutex
;
static
void
dnodeResetMInfos
(
SM
node
Infos
*
minfos
);
static
void
dnodePrintMInfos
(
SM
node
Infos
*
minfos
);
static
void
dnodeResetMInfos
(
SMInfos
*
minfos
);
static
void
dnodePrintMInfos
(
SMInfos
*
minfos
);
static
int32_t
dnodeReadMInfos
();
static
int32_t
dnodeWriteMInfos
();
...
...
@@ -44,14 +44,14 @@ int32_t dnodeInitMInfos() {
void
dnodeCleanupMInfos
()
{
pthread_mutex_destroy
(
&
tsMInfosMutex
);
}
void
dnodeUpdateMInfos
(
SM
nodeInfos
*
m
infos
)
{
if
(
minfos
->
mnodeNum
<=
0
||
m
infos
->
mnodeNum
>
3
)
{
dError
(
"invalid mnode infos, mnodeNum:%d"
,
m
infos
->
mnodeNum
);
void
dnodeUpdateMInfos
(
SM
Infos
*
pM
infos
)
{
if
(
pMinfos
->
mnodeNum
<=
0
||
pM
infos
->
mnodeNum
>
3
)
{
dError
(
"invalid mnode infos, mnodeNum:%d"
,
pM
infos
->
mnodeNum
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
m
infos
->
mnodeNum
;
++
i
)
{
SM
nodeInfo
*
minfo
=
&
m
infos
->
mnodeInfos
[
i
];
for
(
int32_t
i
=
0
;
i
<
pM
infos
->
mnodeNum
;
++
i
)
{
SM
Info
*
minfo
=
&
pM
infos
->
mnodeInfos
[
i
];
minfo
->
mnodeId
=
htonl
(
minfo
->
mnodeId
);
if
(
minfo
->
mnodeId
<=
0
||
strlen
(
minfo
->
mnodeEp
)
<=
5
)
{
dError
(
"invalid mnode info:%d, mnodeId:%d mnodeEp:%s"
,
i
,
minfo
->
mnodeId
,
minfo
->
mnodeEp
);
...
...
@@ -60,14 +60,14 @@ void dnodeUpdateMInfos(SMnodeInfos *minfos) {
}
pthread_mutex_lock
(
&
tsMInfosMutex
);
if
(
m
infos
->
mnodeNum
!=
tsMInfos
.
mnodeNum
)
{
dnodeResetMInfos
(
m
infos
);
if
(
pM
infos
->
mnodeNum
!=
tsMInfos
.
mnodeNum
)
{
dnodeResetMInfos
(
pM
infos
);
dnodeWriteMInfos
();
sdbUpdateAsync
();
}
else
{
int32_t
size
=
sizeof
(
SM
node
Infos
);
if
(
memcmp
(
m
infos
,
&
tsMInfos
,
size
)
!=
0
)
{
dnodeResetMInfos
(
m
infos
);
int32_t
size
=
sizeof
(
SMInfos
);
if
(
memcmp
(
pM
infos
,
&
tsMInfos
,
size
)
!=
0
)
{
dnodeResetMInfos
(
pM
infos
);
dnodeWriteMInfos
();
sdbUpdateAsync
();
}
...
...
@@ -99,11 +99,11 @@ bool dnodeIsMasterEp(char *ep) {
return
isMaster
;
}
void
dnodeGetMInfos
(
SM
nodeInfos
*
m
infos
)
{
void
dnodeGetMInfos
(
SM
Infos
*
pM
infos
)
{
pthread_mutex_lock
(
&
tsMInfosMutex
);
memcpy
(
minfos
,
&
tsMInfos
,
sizeof
(
SMnode
Infos
));
memcpy
(
pMinfos
,
&
tsMInfos
,
sizeof
(
SM
Infos
));
for
(
int32_t
i
=
0
;
i
<
tsMInfos
.
mnodeNum
;
++
i
)
{
m
infos
->
mnodeInfos
[
i
].
mnodeId
=
htonl
(
tsMInfos
.
mnodeInfos
[
i
].
mnodeId
);
pM
infos
->
mnodeInfos
[
i
].
mnodeId
=
htonl
(
tsMInfos
.
mnodeInfos
[
i
].
mnodeId
);
}
pthread_mutex_unlock
(
&
tsMInfosMutex
);
}
...
...
@@ -123,15 +123,15 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet) {
pthread_mutex_unlock
(
&
tsMInfosMutex
);
}
static
void
dnodePrintMInfos
(
SM
nodeInfos
*
m
infos
)
{
dInfo
(
"print m
node infos, mnodeNum:%d inUse:%d"
,
minfos
->
mnodeNum
,
m
infos
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
m
infos
->
mnodeNum
;
i
++
)
{
dInfo
(
"mnode index:%d, %s"
,
minfos
->
mnodeInfos
[
i
].
mnodeId
,
m
infos
->
mnodeInfos
[
i
].
mnodeEp
);
static
void
dnodePrintMInfos
(
SM
Infos
*
pM
infos
)
{
dInfo
(
"print m
infos, mnodeNum:%d inUse:%d"
,
pMinfos
->
mnodeNum
,
pM
infos
->
inUse
);
for
(
int32_t
i
=
0
;
i
<
pM
infos
->
mnodeNum
;
i
++
)
{
dInfo
(
"mnode index:%d, %s"
,
pMinfos
->
mnodeInfos
[
i
].
mnodeId
,
pM
infos
->
mnodeInfos
[
i
].
mnodeEp
);
}
}
static
void
dnodeResetMInfos
(
SM
nodeInfos
*
m
infos
)
{
if
(
m
infos
==
NULL
)
{
static
void
dnodeResetMInfos
(
SM
Infos
*
pM
infos
)
{
if
(
pM
infos
==
NULL
)
{
tsMEpSet
.
numOfEps
=
1
;
taosGetFqdnPortFromEp
(
tsFirst
,
tsMEpSet
.
fqdn
[
0
],
&
tsMEpSet
.
port
[
0
]);
...
...
@@ -142,10 +142,10 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) {
return
;
}
if
(
m
infos
->
mnodeNum
==
0
)
return
;
if
(
pM
infos
->
mnodeNum
==
0
)
return
;
int32_t
size
=
sizeof
(
SM
node
Infos
);
memcpy
(
&
tsMInfos
,
m
infos
,
size
);
int32_t
size
=
sizeof
(
SMInfos
);
memcpy
(
&
tsMInfos
,
pM
infos
,
size
);
tsMEpSet
.
inUse
=
tsMInfos
.
inUse
;
tsMEpSet
.
numOfEps
=
tsMInfos
.
mnodeNum
;
...
...
@@ -153,7 +153,7 @@ static void dnodeResetMInfos(SMnodeInfos *minfos) {
taosGetFqdnPortFromEp
(
tsMInfos
.
mnodeInfos
[
i
].
mnodeEp
,
tsMEpSet
.
fqdn
[
i
],
&
tsMEpSet
.
port
[
i
]);
}
dnodePrintMInfos
(
m
infos
);
dnodePrintMInfos
(
pM
infos
);
}
static
int32_t
dnodeReadMInfos
()
{
...
...
@@ -162,7 +162,7 @@ static int32_t dnodeReadMInfos() {
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
FILE
*
fp
=
NULL
;
SM
nodeInfos
minfos
=
{
0
};
SM
Infos
minfos
=
{
0
};
char
file
[
TSDB_FILENAME_LEN
+
20
]
=
{
0
};
sprintf
(
file
,
"%s/mnodeEpSet.json"
,
tsDnodeDir
);
...
...
@@ -241,7 +241,7 @@ PARSE_MINFOS_OVER:
terrno
=
0
;
for
(
int32_t
i
=
0
;
i
<
minfos
.
mnodeNum
;
++
i
)
{
SM
node
Info
*
mInfo
=
&
minfos
.
mnodeInfos
[
i
];
SMInfo
*
mInfo
=
&
minfos
.
mnodeInfos
[
i
];
dnodeUpdateEp
(
mInfo
->
mnodeId
,
mInfo
->
mnodeEp
,
NULL
,
NULL
);
}
dnodeResetMInfos
(
&
minfos
);
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
991cbc7d
...
...
@@ -472,8 +472,8 @@ static void dnodeProcessStatusRsp(SRpcMsg *pMsg) {
}
SStatusRsp
*
pStatusRsp
=
pMsg
->
pCont
;
SM
nodeInfos
*
m
infos
=
&
pStatusRsp
->
mnodes
;
dnodeUpdateMInfos
(
m
infos
);
SM
Infos
*
pM
infos
=
&
pStatusRsp
->
mnodes
;
dnodeUpdateMInfos
(
pM
infos
);
SDnodeCfg
*
pCfg
=
&
pStatusRsp
->
dnodeCfg
;
pCfg
->
numOfVnodes
=
htonl
(
pCfg
->
numOfVnodes
);
...
...
src/dnode/src/dnodeModule.c
浏览文件 @
991cbc7d
...
...
@@ -147,8 +147,8 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
}
}
bool
dnodeStartMnode
(
SM
nodeInfos
*
m
infos
)
{
SM
nodeInfos
*
mnodes
=
m
infos
;
bool
dnodeStartMnode
(
SM
Infos
*
pM
infos
)
{
SM
Infos
*
pMnodes
=
pM
infos
;
if
(
tsModuleStatus
&
(
1
<<
TSDB_MOD_MNODE
))
{
dDebug
(
"mnode module is already started, module status:%d"
,
tsModuleStatus
);
...
...
@@ -159,7 +159,7 @@ bool dnodeStartMnode(SMnodeInfos *minfos) {
dInfo
(
"start mnode module, module status:%d, new status:%d"
,
tsModuleStatus
,
moduleStatus
);
dnodeProcessModuleStatus
(
moduleStatus
);
sdbUpdateSync
(
m
nodes
);
sdbUpdateSync
(
pM
nodes
);
return
true
;
}
src/inc/dnode.h
浏览文件 @
991cbc7d
...
...
@@ -45,7 +45,7 @@ void dnodeGetEpSetForShell(SRpcEpSet *epSet);
int32_t
dnodeGetDnodeId
();
void
dnodeUpdateEp
(
int32_t
dnodeId
,
char
*
ep
,
char
*
fqdn
,
uint16_t
*
port
);
bool
dnodeCheckEpChanged
(
int32_t
dnodeId
,
char
*
epstr
);
bool
dnodeStartMnode
(
SM
nodeInfos
*
m
infos
);
bool
dnodeStartMnode
(
SM
Infos
*
pM
infos
);
void
dnodeAddClientRspHandle
(
uint8_t
msgType
,
void
(
*
fp
)(
SRpcMsg
*
rpcMsg
));
void
dnodeSendMsgToDnode
(
SRpcEpSet
*
epSet
,
SRpcMsg
*
rpcMsg
);
...
...
src/inc/taosmsg.h
浏览文件 @
991cbc7d
...
...
@@ -591,13 +591,13 @@ typedef struct {
typedef
struct
{
int32_t
mnodeId
;
char
mnodeEp
[
TSDB_EP_LEN
];
}
SM
node
Info
;
}
SMInfo
;
typedef
struct
{
int8_t
inUse
;
int8_t
mnodeNum
;
SM
node
Info
mnodeInfos
[
TSDB_MAX_REPLICA
];
}
SM
node
Infos
;
int8_t
inUse
;
int8_t
mnodeNum
;
SMInfo
mnodeInfos
[
TSDB_MAX_REPLICA
];
}
SMInfos
;
typedef
struct
{
int32_t
numOfMnodes
;
// tsNumOfMnodes
...
...
@@ -632,7 +632,7 @@ typedef struct {
}
SStatusMsg
;
typedef
struct
{
SM
nodeInfos
mnodes
;
SM
Infos
mnodes
;
SDnodeCfg
dnodeCfg
;
SVgroupAccess
vgAccess
[];
}
SStatusRsp
;
...
...
@@ -761,7 +761,7 @@ typedef struct {
typedef
struct
{
int32_t
dnodeId
;
char
dnodeEp
[
TSDB_EP_LEN
];
// end point, hostname:port
SM
node
Infos
mnodes
;
SMInfos
mnodes
;
}
SCreateMnodeMsg
;
typedef
struct
{
...
...
src/mnode/inc/mnodeMnode.h
浏览文件 @
991cbc7d
...
...
@@ -48,7 +48,7 @@ void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet);
char
*
mnodeGetMnodeMasterEp
();
void
mnodeGetMnodeInfos
(
void
*
mnodes
);
void
mnodeUpdateMnodeEpSet
();
void
mnodeUpdateMnodeEpSet
(
SMInfos
*
pMnodes
);
#ifdef __cplusplus
}
...
...
src/mnode/inc/mnodeSdb.h
浏览文件 @
991cbc7d
...
...
@@ -89,6 +89,7 @@ void* sdbGetTableByRid(int64_t rid);
bool
sdbIsMaster
();
bool
sdbIsServing
();
void
sdbUpdateMnodeRoles
();
int32_t
sdbGetReplicaNum
();
int32_t
sdbInsertRow
(
SSdbRow
*
pRow
);
int32_t
sdbDeleteRow
(
SSdbRow
*
pRow
);
...
...
src/mnode/src/mnodeMnode.c
浏览文件 @
991cbc7d
...
...
@@ -34,14 +34,14 @@
#include "mnodeUser.h"
#include "mnodeVgroup.h"
int64_t
tsMnodeRid
=
-
1
;
static
void
*
tsMnodeSdb
=
NULL
;
static
int32_t
tsMnodeUpdateSize
=
0
;
static
SRpcEpSet
tsMnodeEpSet
ForShell
;
static
SRpcEpSet
tsMnodeEpSet
ForPeer
;
static
SM
nodeInfos
tsMnode
Infos
;
static
int32_t
mnodeGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mnodeRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
int64_t
tsMnodeRid
=
-
1
;
static
void
*
tsMnodeSdb
=
NULL
;
static
int32_t
tsMnodeUpdateSize
=
0
;
static
SRpcEpSet
tsMEp
ForShell
;
static
SRpcEpSet
tsMEp
ForPeer
;
static
SM
Infos
tsM
Infos
;
static
int32_t
mnodeGetMnodeMeta
(
STableMetaMsg
*
pMeta
,
SShowObj
*
pShow
,
void
*
pConn
);
static
int32_t
mnodeRetrieveMnodes
(
SShowObj
*
pShow
,
char
*
data
,
int32_t
rows
,
void
*
pConn
);
#if defined(LINUX)
static
pthread_rwlock_t
tsMnodeLock
;
...
...
@@ -127,7 +127,7 @@ static int32_t mnodeMnodeActionRestored() {
mnodeCancelGetNextMnode
(
pIter
);
}
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
(
NULL
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -199,106 +199,130 @@ void mnodeCancelGetNextMnode(void *pIter) {
sdbFreeIter
(
tsMnodeSdb
,
pIter
);
}
void
mnodeUpdateMnodeEpSet
()
{
mInfo
(
"update mnodes epSet, numOfEps:%d "
,
mnodeGetMnodesNum
());
void
mnodeUpdateMnodeEpSet
(
SMInfos
*
pMinfos
)
{
bool
set
=
false
;
SMInfos
mInfos
=
{
0
};
mInfo
(
"vgId:1, update mnodes epSet, numOfMnodes:%d pMinfos:%p"
,
mnodeGetMnodesNum
(),
pMinfos
);
mnodeMnodeWrLock
();
memset
(
&
tsMnodeEpSetForShell
,
0
,
sizeof
(
SRpcEpSet
));
memset
(
&
tsMnodeEpSetForPeer
,
0
,
sizeof
(
SRpcEpSet
));
memset
(
&
tsMnodeInfos
,
0
,
sizeof
(
SMnodeInfos
));
int32_t
index
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SMnodeObj
*
pMnode
=
NULL
;
pIter
=
mnodeGetNextMnode
(
pIter
,
&
pMnode
);
if
(
pMnode
==
NULL
)
break
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
!=
NULL
)
{
strcpy
(
tsMnodeEpSetForShell
.
fqdn
[
index
],
pDnode
->
dnodeFqdn
);
tsMnodeEpSetForShell
.
port
[
index
]
=
htons
(
pDnode
->
dnodePort
);
mDebug
(
"mnode:%d, for shell fqdn:%s %d"
,
pDnode
->
dnodeId
,
tsMnodeEpSetForShell
.
fqdn
[
index
],
htons
(
tsMnodeEpSetForShell
.
port
[
index
]));
strcpy
(
tsMnodeEpSetForPeer
.
fqdn
[
index
],
pDnode
->
dnodeFqdn
);
tsMnodeEpSetForPeer
.
port
[
index
]
=
htons
(
pDnode
->
dnodePort
+
TSDB_PORT_DNODEDNODE
);
mDebug
(
"mnode:%d, for peer fqdn:%s %d"
,
pDnode
->
dnodeId
,
tsMnodeEpSetForPeer
.
fqdn
[
index
],
htons
(
tsMnodeEpSetForPeer
.
port
[
index
]));
tsMnodeInfos
.
mnodeInfos
[
index
].
mnodeId
=
htonl
(
pMnode
->
mnodeId
);
strcpy
(
tsMnodeInfos
.
mnodeInfos
[
index
].
mnodeEp
,
pDnode
->
dnodeEp
);
if
(
pMnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
{
tsMnodeEpSetForShell
.
inUse
=
index
;
tsMnodeEpSetForPeer
.
inUse
=
index
;
tsMnodeInfos
.
inUse
=
index
;
if
(
pMinfos
!=
NULL
)
{
set
=
true
;
mInfos
=
*
pMinfos
;
}
else
{
int32_t
index
=
0
;
void
*
pIter
=
NULL
;
while
(
1
)
{
SMnodeObj
*
pMnode
=
NULL
;
pIter
=
mnodeGetNextMnode
(
pIter
,
&
pMnode
);
if
(
pMnode
==
NULL
)
break
;
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
pMnode
->
mnodeId
);
if
(
pDnode
!=
NULL
)
{
set
=
true
;
mInfos
.
mnodeInfos
[
index
].
mnodeId
=
pMnode
->
mnodeId
;
strcpy
(
mInfos
.
mnodeInfos
[
index
].
mnodeEp
,
pDnode
->
dnodeEp
);
if
(
pMnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
mInfos
.
inUse
=
index
;
index
++
;
}
else
{
set
=
false
;
}
m
Info
(
"mnode:%d, ep:%s %s"
,
pDnode
->
dnodeId
,
pDnode
->
dnodeEp
,
pMnode
->
role
==
TAOS_SYNC_ROLE_MASTER
?
"master"
:
""
);
index
++
;
m
nodeDecDnodeRef
(
pDnode
);
mnodeDecMnodeRef
(
pMnode
)
;
}
mnodeDecDnodeRef
(
pDnode
);
mnodeDecMnodeRef
(
pMnode
);
mInfos
.
mnodeNum
=
index
;
if
(
mInfos
.
mnodeNum
<
sdbGetReplicaNum
())
{
set
=
false
;
mDebug
(
"vgId:1, mnodes info not synced, current:%d syncCfgNum:%d"
,
mInfos
.
mnodeNum
,
sdbGetReplicaNum
());
}
}
tsMnodeInfos
.
mnodeNum
=
index
;
tsMnodeEpSetForShell
.
numOfEps
=
index
;
tsMnodeEpSetForPeer
.
numOfEps
=
index
;
mnodeMnodeWrLock
();
if
(
set
)
{
memset
(
&
tsMEpForShell
,
0
,
sizeof
(
SRpcEpSet
));
memset
(
&
tsMEpForPeer
,
0
,
sizeof
(
SRpcEpSet
));
memcpy
(
&
tsMInfos
,
&
mInfos
,
sizeof
(
SMInfos
));
tsMEpForShell
.
inUse
=
tsMInfos
.
inUse
;
tsMEpForPeer
.
inUse
=
tsMInfos
.
inUse
;
tsMEpForShell
.
numOfEps
=
tsMInfos
.
mnodeNum
;
tsMEpForPeer
.
numOfEps
=
tsMInfos
.
mnodeNum
;
mInfo
(
"vgId:1, mnodes epSet is set, num:%d inUse:%d"
,
tsMInfos
.
mnodeNum
,
tsMInfos
.
inUse
);
for
(
int
index
=
0
;
index
<
mInfos
.
mnodeNum
;
++
index
)
{
SMInfo
*
pInfo
=
&
tsMInfos
.
mnodeInfos
[
index
];
taosGetFqdnPortFromEp
(
pInfo
->
mnodeEp
,
tsMEpForShell
.
fqdn
[
index
],
&
tsMEpForShell
.
port
[
index
]);
taosGetFqdnPortFromEp
(
pInfo
->
mnodeEp
,
tsMEpForPeer
.
fqdn
[
index
],
&
tsMEpForPeer
.
port
[
index
]);
tsMEpForPeer
.
port
[
index
]
=
tsMEpForPeer
.
port
[
index
]
+
TSDB_PORT_DNODEDNODE
;
mInfo
(
"vgId:1, mnode:%d, fqdn:%s shell:%u peer:%u"
,
pInfo
->
mnodeId
,
tsMEpForShell
.
fqdn
[
index
],
tsMEpForShell
.
port
[
index
],
tsMEpForPeer
.
port
[
index
]);
tsMEpForShell
.
port
[
index
]
=
htons
(
tsMEpForShell
.
port
[
index
]);
tsMEpForPeer
.
port
[
index
]
=
htons
(
tsMEpForPeer
.
port
[
index
]);
pInfo
->
mnodeId
=
htonl
(
pInfo
->
mnodeId
);
}
}
else
{
mInfo
(
"vgId:1, mnodes epSet not set, num:%d inUse:%d"
,
tsMInfos
.
mnodeNum
,
tsMInfos
.
inUse
);
for
(
int
index
=
0
;
index
<
tsMInfos
.
mnodeNum
;
++
index
)
{
mInfo
(
"vgId:1, index:%d, ep:%s:%u"
,
index
,
tsMEpForShell
.
fqdn
[
index
],
htons
(
tsMEpForShell
.
port
[
index
]));
}
}
mnodeMnodeUnLock
();
}
void
mnodeGetMnodeEpSetForPeer
(
SRpcEpSet
*
epSet
)
{
mnodeMnodeRdLock
();
*
epSet
=
tsM
nodeEpSet
ForPeer
;
*
epSet
=
tsM
Ep
ForPeer
;
mnodeMnodeUnLock
();
mTrace
(
"vgId:1, mnodes epSet for peer is returned, num:%d inUse:%d"
,
tsMEpForPeer
.
numOfEps
,
tsMEpForPeer
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
+
TSDB_PORT_DNODEDNODE
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
mTrace
(
"mnode:%d, for peer ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
mTrace
(
"
vgId:1,
mnode:%d, for peer ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
}
else
{
mTrace
(
"mpeer:%d, for peer ep:%s:%u"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]));
mTrace
(
"
vgId:1,
mpeer:%d, for peer ep:%s:%u"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]));
}
}
}
void
mnodeGetMnodeEpSetForShell
(
SRpcEpSet
*
epSet
)
{
mnodeMnodeRdLock
();
*
epSet
=
tsM
nodeEpSet
ForShell
;
*
epSet
=
tsM
Ep
ForShell
;
mnodeMnodeUnLock
();
mTrace
(
"vgId:1, mnodes epSet for shell is returned, num:%d inUse:%d"
,
tsMEpForShell
.
numOfEps
,
tsMEpForShell
.
inUse
);
for
(
int32_t
i
=
0
;
i
<
epSet
->
numOfEps
;
++
i
)
{
if
(
strcmp
(
epSet
->
fqdn
[
i
],
tsLocalFqdn
)
==
0
&&
htons
(
epSet
->
port
[
i
])
==
tsServerPort
)
{
epSet
->
inUse
=
(
i
+
1
)
%
epSet
->
numOfEps
;
mTrace
(
"mnode:%d, for shell ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
mTrace
(
"
vgId:1,
mnode:%d, for shell ep:%s:%u, set inUse to %d"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]),
epSet
->
inUse
);
}
else
{
mTrace
(
"mnode:%d, for shell ep:%s:%u"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]));
mTrace
(
"
vgId:1,
mnode:%d, for shell ep:%s:%u"
,
i
,
epSet
->
fqdn
[
i
],
htons
(
epSet
->
port
[
i
]));
}
}
}
char
*
mnodeGetMnodeMasterEp
()
{
return
tsM
nodeInfos
.
mnodeInfos
[
tsMnode
Infos
.
inUse
].
mnodeEp
;
return
tsM
Infos
.
mnodeInfos
[
tsM
Infos
.
inUse
].
mnodeEp
;
}
void
mnodeGetMnodeInfos
(
void
*
mnodeI
nfos
)
{
void
mnodeGetMnodeInfos
(
void
*
pMi
nfos
)
{
mnodeMnodeRdLock
();
*
(
SM
nodeInfos
*
)
mnodeInfos
=
tsMnode
Infos
;
*
(
SM
Infos
*
)
pMinfos
=
tsM
Infos
;
mnodeMnodeUnLock
();
}
static
int32_t
mnodeSendCreateMnodeMsg
(
int32_t
dnodeId
,
char
*
dnodeEp
)
{
mDebug
(
"dnode:%d, send create mnode msg to dnode %s"
,
dnodeId
,
dnodeEp
);
SCreateMnodeMsg
*
pCreate
=
rpcMallocCont
(
sizeof
(
SCreateMnodeMsg
));
if
(
pCreate
==
NULL
)
{
return
TSDB_CODE_MND_OUT_OF_MEMORY
;
}
else
{
pCreate
->
dnodeId
=
htonl
(
dnodeId
);
tstrncpy
(
pCreate
->
dnodeEp
,
dnodeEp
,
sizeof
(
pCreate
->
dnodeEp
));
pCreate
->
mnodes
=
tsMnodeInfos
;
mnodeGetMnodeInfos
(
&
pCreate
->
mnodes
)
;
bool
found
=
false
;
for
(
int
i
=
0
;
i
<
pCreate
->
mnodes
.
mnodeNum
;
++
i
)
{
if
(
pCreate
->
mnodes
.
mnodeInfos
[
i
].
mnodeId
==
htonl
(
dnodeId
))
{
...
...
@@ -312,6 +336,11 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
}
}
mDebug
(
"dnode:%d, send create mnode msg to dnode %s, numOfMnodes:%d"
,
dnodeId
,
dnodeEp
,
pCreate
->
mnodes
.
mnodeNum
);
for
(
int32_t
i
=
0
;
i
<
pCreate
->
mnodes
.
mnodeNum
;
++
i
)
{
mDebug
(
"index:%d, mnodeId:%d ep:%s"
,
i
,
pCreate
->
mnodes
.
mnodeInfos
[
i
].
mnodeId
,
pCreate
->
mnodes
.
mnodeInfos
[
i
].
mnodeEp
);
}
SRpcMsg
rpcMsg
=
{
0
};
rpcMsg
.
pCont
=
pCreate
;
rpcMsg
.
contLen
=
sizeof
(
SCreateMnodeMsg
);
...
...
@@ -336,7 +365,7 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
mError
(
"failed to create mnode, reason:%s"
,
tstrerror
(
code
));
}
else
{
mDebug
(
"mnode is created successfully"
);
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
(
NULL
);
sdbUpdateAsync
();
}
...
...
@@ -380,7 +409,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
mnodeDecMnodeRef
(
pMnode
);
}
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
(
NULL
);
sdbUpdateAsync
();
}
...
...
@@ -400,7 +429,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
sdbDecRef
(
tsMnodeSdb
,
pMnode
);
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
(
NULL
);
sdbUpdateAsync
();
return
code
;
...
...
src/mnode/src/mnodeSdb.c
浏览文件 @
991cbc7d
...
...
@@ -224,11 +224,13 @@ void sdbUpdateMnodeRoles() {
sdbInfo
(
"vgId:1, mnode:%d, role:%s"
,
pMnode
->
mnodeId
,
syncRole
[
pMnode
->
role
]);
if
(
pMnode
->
mnodeId
==
dnodeGetDnodeId
())
tsSdbMgmt
.
role
=
pMnode
->
role
;
mnodeDecMnodeRef
(
pMnode
);
}
else
{
sdbDebug
(
"vgId:1, mnode:%d not found"
,
roles
.
nodeId
[
i
]);
}
}
mnodeUpdateClusterId
();
mnodeUpdateMnodeEpSet
();
mnodeUpdateMnodeEpSet
(
NULL
);
}
static
uint32_t
sdbGetFileInfo
(
int32_t
vgId
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int64_t
*
size
,
uint64_t
*
fversion
)
{
...
...
@@ -308,18 +310,20 @@ void sdbUpdateAsync() {
}
void
sdbUpdateSync
(
void
*
pMnodes
)
{
SM
nodeInfos
*
mnode
s
=
pMnodes
;
SM
Infos
*
pMinfo
s
=
pMnodes
;
if
(
!
mnodeIsRunning
())
{
mDebug
(
"vgId:1, mnode not start yet, update sync config later"
);
return
;
}
mDebug
(
"vgId:1, update sync config
in sync module, m
nodes:%p"
,
pMnodes
);
mDebug
(
"vgId:1, update sync config
, pM
nodes:%p"
,
pMnodes
);
SSyncCfg
syncCfg
=
{
0
};
int32_t
index
=
0
;
if
(
mnodes
==
NULL
)
{
if
(
pMinfos
==
NULL
)
{
mDebug
(
"vgId:1, mInfos not input, use mInfos in sdb, numOfMnodes:%d"
,
syncCfg
.
replica
);
void
*
pIter
=
NULL
;
while
(
1
)
{
SMnodeObj
*
pMnode
=
NULL
;
...
...
@@ -339,16 +343,17 @@ void sdbUpdateSync(void *pMnodes) {
mnodeDecMnodeRef
(
pMnode
);
}
syncCfg
.
replica
=
index
;
mDebug
(
"vgId:1, mnodes info not input, use infos in sdb, numOfMnodes:%d"
,
syncCfg
.
replica
);
}
else
{
for
(
index
=
0
;
index
<
mnodes
->
mnodeNum
;
++
index
)
{
SMnodeInfo
*
node
=
&
mnodes
->
mnodeInfos
[
index
];
mDebug
(
"vgId:1, mInfos input, numOfMnodes:%d"
,
pMinfos
->
mnodeNum
);
for
(
index
=
0
;
index
<
pMinfos
->
mnodeNum
;
++
index
)
{
SMInfo
*
node
=
&
pMinfos
->
mnodeInfos
[
index
];
syncCfg
.
nodeInfo
[
index
].
nodeId
=
node
->
mnodeId
;
taosGetFqdnPortFromEp
(
node
->
mnodeEp
,
syncCfg
.
nodeInfo
[
index
].
nodeFqdn
,
&
syncCfg
.
nodeInfo
[
index
].
nodePort
);
syncCfg
.
nodeInfo
[
index
].
nodePort
+=
TSDB_PORT_SYNC
;
}
syncCfg
.
replica
=
index
;
m
Debug
(
"vgId:1, mnodes info input, numOfMnodes:%d"
,
syncCfg
.
replica
);
m
nodeUpdateMnodeEpSet
(
pMnodes
);
}
syncCfg
.
quorum
=
(
syncCfg
.
replica
==
1
)
?
1
:
2
;
...
...
@@ -1103,3 +1108,7 @@ static void *sdbWorkerFp(void *pWorker) {
return
NULL
;
}
int32_t
sdbGetReplicaNum
()
{
return
tsSdbMgmt
.
cfg
.
replica
;
}
\ No newline at end of file
src/sync/src/syncMain.c
浏览文件 @
991cbc7d
...
...
@@ -548,7 +548,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer
->
pSyncNode
=
pNode
;
pPeer
->
refCount
=
1
;
sInfo
(
"%s, it is configured
"
,
pPeer
->
id
);
sInfo
(
"%s, it is configured
, ep:%s:%u"
,
pPeer
->
id
,
pPeer
->
fqdn
,
pPeer
->
port
);
int32_t
ret
=
strcmp
(
pPeer
->
fqdn
,
tsNodeFqdn
);
if
(
pPeer
->
nodeId
==
0
||
(
ret
>
0
)
||
(
ret
==
0
&&
pPeer
->
port
>
tsSyncPort
))
{
int32_t
checkMs
=
100
+
(
pNode
->
vgId
*
10
)
%
100
;
...
...
@@ -1134,7 +1134,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pPeer
=
(
i
<
pNode
->
replica
)
?
pNode
->
peerInfo
[
i
]
:
NULL
;
if
(
pPeer
==
NULL
)
{
sError
(
"vgId:%d, peer:%s
not configured"
,
pNode
->
vgId
,
firstPkt
.
fqdn
);
sError
(
"vgId:%d, peer:%s
:%u not configured"
,
pNode
->
vgId
,
firstPkt
.
fqdn
,
firstPkt
.
port
);
taosCloseSocket
(
connFd
);
// syncSendVpeerCfgMsg(sync);
}
else
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录