Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
01647deb
T
TDengine
项目概览
taosdata
/
TDengine
11 个月 前同步成功
通知
1178
Star
22014
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
前往新版Gitcode,体验更适合开发者的 AI 搜索 >>
未验证
提交
01647deb
编写于
1月 17, 2023
作者:
S
Shengliang Guan
提交者:
GitHub
1月 17, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #19581 from taosdata/enh/TD-20047
fix: update epset on dnode info changed
上级
f7ef9fdc
478eec76
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
285 addition
and
38 deletion
+285
-38
include/common/tmsgcb.h
include/common/tmsgcb.h
+3
-2
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
+7
-0
source/dnode/mgmt/node_util/inc/dmUtil.h
source/dnode/mgmt/node_util/inc/dmUtil.h
+3
-1
source/dnode/mgmt/node_util/src/dmEps.c
source/dnode/mgmt/node_util/src/dmEps.c
+207
-28
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+1
-0
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+4
-1
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+21
-0
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+1
-1
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+1
-0
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+16
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+12
-1
source/libs/transport/src/tmsgcb.c
source/libs/transport/src/tmsgcb.c
+8
-2
未找到文件。
include/common/tmsgcb.h
浏览文件 @
01647deb
...
...
@@ -39,7 +39,7 @@ typedef enum {
QUEUE_MAX
,
}
EQueueType
;
typedef
void
(
*
UpdateDnodeInfoFp
)(
void
*
pData
,
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
typedef
bool
(
*
UpdateDnodeInfoFp
)(
void
*
pData
,
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
typedef
int32_t
(
*
PutToQueueFp
)(
void
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pMsg
);
typedef
int32_t
(
*
GetQueueSizeFp
)(
void
*
pMgmt
,
int32_t
vgId
,
EQueueType
qtype
);
typedef
int32_t
(
*
SendReqFp
)(
const
SEpSet
*
pEpSet
,
SRpcMsg
*
pMsg
);
...
...
@@ -70,7 +70,8 @@ void tmsgSendRsp(SRpcMsg* pMsg);
void
tmsgRegisterBrokenLinkArg
(
SRpcMsg
*
pMsg
);
void
tmsgReleaseHandle
(
SRpcHandleInfo
*
pHandle
,
int8_t
type
);
void
tmsgReportStartup
(
const
char
*
name
,
const
char
*
desc
);
void
tmsgUpdateDnodeInfo
(
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
bool
tmsgUpdateDnodeInfo
(
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
void
tmsgUpdateDnodeEpSet
(
SEpSet
*
epset
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/node_mgmt/src/dmMgmt.c
浏览文件 @
01647deb
...
...
@@ -79,6 +79,13 @@ static void dmClearVars(SDnode *pDnode) {
SDnodeData
*
pData
=
&
pDnode
->
data
;
taosThreadRwlockWrlock
(
&
pData
->
lock
);
if
(
pData
->
oldDnodeEps
!=
NULL
)
{
if
(
dmWriteEps
(
pData
)
==
0
)
{
dmRemoveDnodePairs
(
pData
);
}
taosArrayDestroy
(
pData
->
oldDnodeEps
);
pData
->
oldDnodeEps
=
NULL
;
}
if
(
pData
->
dnodeEps
!=
NULL
)
{
taosArrayDestroy
(
pData
->
dnodeEps
);
pData
->
dnodeEps
=
NULL
;
...
...
source/dnode/mgmt/node_util/inc/dmUtil.h
浏览文件 @
01647deb
...
...
@@ -100,6 +100,7 @@ typedef struct {
bool
stopped
;
SEpSet
mnodeEps
;
SArray
*
dnodeEps
;
SArray
*
oldDnodeEps
;
SHashObj
*
dnodeHash
;
TdThreadRwlock
lock
;
SMsgCb
msgCb
;
...
...
@@ -167,7 +168,8 @@ void dmUpdateEps(SDnodeData *pData, SArray *pDnodeEps);
void
dmGetMnodeEpSet
(
SDnodeData
*
pData
,
SEpSet
*
pEpSet
);
void
dmGetMnodeEpSetForRedirect
(
SDnodeData
*
pData
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dmSetMnodeEpSet
(
SDnodeData
*
pData
,
SEpSet
*
pEpSet
);
void
dmUpdateDnodeInfo
(
void
*
pData
,
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
bool
dmUpdateDnodeInfo
(
void
*
pData
,
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
);
void
dmRemoveDnodePairs
(
SDnodeData
*
pData
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/node_util/src/dmEps.c
浏览文件 @
01647deb
...
...
@@ -18,9 +18,18 @@
#include "tjson.h"
#include "tmisce.h"
static
void
dmPrintEps
(
SDnodeData
*
pData
);
static
bool
dmIsEpChanged
(
SDnodeData
*
pData
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetEps
(
SDnodeData
*
pData
,
SArray
*
dnodeEps
);
typedef
struct
{
int32_t
id
;
uint16_t
oldPort
;
uint16_t
newPort
;
char
oldFqdn
[
TSDB_FQDN_LEN
];
char
newFqdn
[
TSDB_FQDN_LEN
];
}
SDnodeEpPair
;
static
void
dmPrintEps
(
SDnodeData
*
pData
);
static
bool
dmIsEpChanged
(
SDnodeData
*
pData
,
int32_t
dnodeId
,
const
char
*
ep
);
static
void
dmResetEps
(
SDnodeData
*
pData
,
SArray
*
dnodeEps
);
static
int32_t
dmReadDnodePairs
(
SDnodeData
*
pData
);
static
void
dmGetDnodeEp
(
SDnodeData
*
pData
,
int32_t
dnodeId
,
char
*
pEp
,
char
*
pFqdn
,
uint16_t
*
pPort
)
{
taosThreadRwlockRdlock
(
&
pData
->
lock
);
...
...
@@ -137,7 +146,7 @@ int32_t dmReadEps(SDnodeData *pData) {
}
code
=
0
;
dInfo
(
"succceed to read
m
node file %s"
,
file
);
dInfo
(
"succceed to read
d
node file %s"
,
file
);
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
...
...
@@ -146,6 +155,7 @@ _OVER:
if
(
code
!=
0
)
{
dError
(
"failed to read dnode file:%s since %s"
,
file
,
terrstr
());
return
code
;
}
if
(
taosArrayGetSize
(
pData
->
dnodeEps
)
==
0
)
{
...
...
@@ -155,10 +165,14 @@ _OVER:
taosArrayPush
(
pData
->
dnodeEps
,
&
dnodeEp
);
}
if
(
dmReadDnodePairs
(
pData
)
!=
0
)
{
return
-
1
;
}
dDebug
(
"reset dnode list on startup"
);
dmResetEps
(
pData
,
pData
->
dnodeEps
);
if
(
dmIsEpChanged
(
pData
,
pData
->
dnodeId
,
tsLocalEp
))
{
if
(
pData
->
dnodeEps
==
NULL
&&
dmIsEpChanged
(
pData
,
pData
->
dnodeId
,
tsLocalEp
))
{
dError
(
"localEp %s different with %s and need reconfigured"
,
tsLocalEp
,
file
);
return
-
1
;
}
...
...
@@ -222,7 +236,8 @@ int32_t dmWriteEps(SDnodeData *pData) {
code
=
0
;
pData
->
updateTime
=
taosGetTimestampMs
();
dInfo
(
"succeed to write dnode file:%s, dnodeVer:%"
PRId64
,
realfile
,
pData
->
dnodeVer
);
dInfo
(
"succeed to write dnode file:%s, num:%d ver:%"
PRId64
,
realfile
,
(
int32_t
)
taosArrayGetSize
(
pData
->
dnodeEps
),
pData
->
dnodeVer
);
_OVER:
if
(
pJson
!=
NULL
)
tjsonDelete
(
pJson
);
...
...
@@ -332,40 +347,204 @@ void dmSetMnodeEpSet(SDnodeData *pData, SEpSet *pEpSet) {
}
}
void
dmUpdateDnodeInfo
(
void
*
data
,
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
)
{
bool
dmUpdateDnodeInfo
(
void
*
data
,
int32_t
*
did
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
)
{
bool
updated
=
false
;
SDnodeData
*
pData
=
data
;
int32_t
ret
=
-
1
;
int32_t
dnodeId
=
-
1
;
if
(
did
!=
NULL
)
dnodeId
=
*
did
;
taosThreadRwlockRdlock
(
&
pData
->
lock
);
if
(
*
dnodeId
<=
0
)
{
for
(
int32_t
i
=
0
;
i
<
(
int32_t
)
taosArrayGetSize
(
pData
->
dnodeEps
);
++
i
)
{
if
(
pData
->
oldDnodeEps
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pData
->
oldDnodeEps
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SDnodeEpPair
*
pair
=
taosArrayGet
(
pData
->
oldDnodeEps
,
i
);
if
(
strcmp
(
pair
->
oldFqdn
,
fqdn
)
==
0
&&
pair
->
oldPort
==
*
port
)
{
dInfo
(
"dnode:%d, update ep:%s:%u to %s:%u"
,
dnodeId
,
fqdn
,
*
port
,
pair
->
newFqdn
,
pair
->
newPort
);
tstrncpy
(
fqdn
,
pair
->
newFqdn
,
TSDB_FQDN_LEN
);
*
port
=
pair
->
newPort
;
updated
=
true
;
}
}
}
if
(
did
!=
NULL
&&
dnodeId
<=
0
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pData
->
dnodeEps
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
pData
->
dnodeEps
,
i
);
if
(
strcmp
(
pDnodeEp
->
ep
.
fqdn
,
fqdn
)
==
0
&&
pDnodeEp
->
ep
.
port
==
*
port
)
{
dInfo
(
"dnode:%s:%u, update dnodeId
from %d to %d"
,
fqdn
,
*
port
,
*
dnodeId
,
pDnodeEp
->
id
);
*
d
nodeI
d
=
pDnodeEp
->
id
;
dInfo
(
"dnode:%s:%u, update dnodeId
to dnode:%d"
,
fqdn
,
*
port
,
pDnodeEp
->
id
);
*
d
i
d
=
pDnodeEp
->
id
;
if
(
clusterId
!=
NULL
)
*
clusterId
=
pData
->
clusterId
;
ret
=
0
;
}
}
if
(
ret
!=
0
)
{
dInfo
(
"dnode:%s:%u, failed to update dnodeId:%d"
,
fqdn
,
*
port
,
*
dnodeId
);
}
}
else
{
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pData
->
dnodeHash
,
dnodeId
,
sizeof
(
int32_t
));
}
if
(
dnodeId
>
0
)
{
SDnodeEp
*
pDnodeEp
=
taosHashGet
(
pData
->
dnodeHash
,
&
dnodeId
,
sizeof
(
int32_t
));
if
(
pDnodeEp
)
{
if
(
strcmp
(
pDnodeEp
->
ep
.
fqdn
,
fqdn
)
!=
0
)
{
dInfo
(
"dnode:%d, update
port from %s to %s"
,
*
dnodeId
,
fqdn
,
pDnodeEp
->
ep
.
fqdn
);
if
(
strcmp
(
pDnodeEp
->
ep
.
fqdn
,
fqdn
)
!=
0
||
pDnodeEp
->
ep
.
port
!=
*
port
)
{
dInfo
(
"dnode:%d, update
ep:%s:%u to %s:%u"
,
dnodeId
,
fqdn
,
*
port
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
);
tstrncpy
(
fqdn
,
pDnodeEp
->
ep
.
fqdn
,
TSDB_FQDN_LEN
);
}
if
(
pDnodeEp
->
ep
.
port
!=
*
port
)
{
dInfo
(
"dnode:%d, update port from %u to %u"
,
*
dnodeId
,
*
port
,
pDnodeEp
->
ep
.
port
);
*
port
=
pDnodeEp
->
ep
.
port
;
updated
=
true
;
}
if
(
clusterId
!=
NULL
)
*
clusterId
=
pData
->
clusterId
;
ret
=
0
;
}
else
{
dInfo
(
"dnode:%d, failed to update dnode info"
,
*
dnodeId
);
}
}
taosThreadRwlockUnlock
(
&
pData
->
lock
);
// return ret;
}
\ No newline at end of file
return
updated
;
}
static
int32_t
dmDecodeEpPairs
(
SJson
*
pJson
,
SDnodeData
*
pData
)
{
int32_t
code
=
0
;
SJson
*
dnodes
=
tjsonGetObjectItem
(
pJson
,
"dnodes"
);
if
(
dnodes
==
NULL
)
return
0
;
int32_t
numOfDnodes
=
tjsonGetArraySize
(
dnodes
);
for
(
int32_t
i
=
0
;
i
<
numOfDnodes
;
++
i
)
{
SJson
*
dnode
=
tjsonGetArrayItem
(
dnodes
,
i
);
if
(
dnode
==
NULL
)
return
-
1
;
SDnodeEpPair
pair
=
{
0
};
tjsonGetInt32ValueFromDouble
(
dnode
,
"id"
,
pair
.
id
,
code
);
if
(
code
<
0
)
return
-
1
;
code
=
tjsonGetStringValue
(
dnode
,
"fqdn"
,
pair
.
oldFqdn
);
if
(
code
<
0
)
return
-
1
;
tjsonGetUInt16ValueFromDouble
(
dnode
,
"port"
,
pair
.
oldPort
,
code
);
if
(
code
<
0
)
return
-
1
;
code
=
tjsonGetStringValue
(
dnode
,
"new_fqdn"
,
pair
.
newFqdn
);
if
(
code
<
0
)
return
-
1
;
tjsonGetUInt16ValueFromDouble
(
dnode
,
"new_port"
,
pair
.
newPort
,
code
);
if
(
code
<
0
)
return
-
1
;
if
(
taosArrayPush
(
pData
->
oldDnodeEps
,
&
pair
)
==
NULL
)
return
-
1
;
}
return
code
;
}
void
dmRemoveDnodePairs
(
SDnodeData
*
pData
)
{
char
file
[
PATH_MAX
]
=
{
0
};
char
bak
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode%sep.json"
,
tsDataDir
,
TD_DIRSEP
,
TD_DIRSEP
);
snprintf
(
bak
,
sizeof
(
bak
),
"%s%sdnode%sep.json.bak"
,
tsDataDir
,
TD_DIRSEP
,
TD_DIRSEP
);
dInfo
(
"dnode file:%s is rename to bak file"
,
file
);
(
void
)
taosRenameFile
(
file
,
bak
);
}
static
int32_t
dmReadDnodePairs
(
SDnodeData
*
pData
)
{
int32_t
code
=
-
1
;
TdFilePtr
pFile
=
NULL
;
char
*
content
=
NULL
;
SJson
*
pJson
=
NULL
;
char
file
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%sdnode%sep.json"
,
tsDataDir
,
TD_DIRSEP
,
TD_DIRSEP
);
if
(
taosStatFile
(
file
,
NULL
,
NULL
)
<
0
)
{
dDebug
(
"dnode file:%s not exist"
,
file
);
code
=
0
;
goto
_OVER
;
}
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to open dnode file:%s since %s"
,
file
,
terrstr
());
goto
_OVER
;
}
int64_t
size
=
0
;
if
(
taosFStatFile
(
pFile
,
&
size
,
NULL
)
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to fstat dnode file:%s since %s"
,
file
,
terrstr
());
goto
_OVER
;
}
content
=
taosMemoryMalloc
(
size
+
1
);
if
(
content
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
_OVER
;
}
if
(
taosReadFile
(
pFile
,
content
,
size
)
!=
size
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
dError
(
"failed to read dnode file:%s since %s"
,
file
,
terrstr
());
goto
_OVER
;
}
content
[
size
]
=
'\0'
;
pJson
=
tjsonParse
(
content
);
if
(
pJson
==
NULL
)
{
terrno
=
TSDB_CODE_INVALID_JSON_FORMAT
;
goto
_OVER
;
}
pData
->
oldDnodeEps
=
taosArrayInit
(
1
,
sizeof
(
SDnodeEpPair
));
if
(
pData
->
oldDnodeEps
==
NULL
)
{
dError
(
"failed to calloc dnodeEp array since %s"
,
strerror
(
errno
));
goto
_OVER
;
}
if
(
dmDecodeEpPairs
(
pJson
,
pData
)
<
0
)
{
taosArrayDestroy
(
pData
->
oldDnodeEps
);
pData
->
oldDnodeEps
=
NULL
;
terrno
=
TSDB_CODE_INVALID_JSON_FORMAT
;
goto
_OVER
;
}
code
=
0
;
dInfo
(
"succceed to read dnode file %s"
,
file
);
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
pJson
!=
NULL
)
cJSON_Delete
(
pJson
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
code
!=
0
)
{
dError
(
"failed to read dnode file:%s since %s"
,
file
,
terrstr
());
}
for
(
int32_t
i
=
0
;
i
<
(
int32_t
)
taosArrayGetSize
(
pData
->
oldDnodeEps
);
++
i
)
{
SDnodeEpPair
*
pair
=
taosArrayGet
(
pData
->
oldDnodeEps
,
i
);
for
(
int32_t
j
=
0
;
j
<
(
int32_t
)
taosArrayGetSize
(
pData
->
dnodeEps
);
++
j
)
{
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
pData
->
dnodeEps
,
j
);
if
(
pDnodeEp
->
id
!=
pair
->
id
&&
(
strcmp
(
pDnodeEp
->
ep
.
fqdn
,
pair
->
newFqdn
)
==
0
&&
pDnodeEp
->
ep
.
port
==
pair
->
newPort
))
{
dError
(
"dnode:%d, can't update ep:%s:%u to %s:%u since already exists as dnode:%d"
,
pair
->
id
,
pair
->
oldFqdn
,
pair
->
oldPort
,
pair
->
newFqdn
,
pair
->
newPort
,
pDnodeEp
->
id
);
tstrncpy
(
pDnodeEp
->
ep
.
fqdn
,
pair
->
newFqdn
,
TSDB_FQDN_LEN
);
pDnodeEp
->
ep
.
port
=
pair
->
newPort
;
}
#if 0
if (pDnodeEp->id == pair->id &&
(strcmp(pDnodeEp->ep.fqdn, pair->oldFqdn) == 0 && pDnodeEp->ep.port == pair->oldPort)) {
dError("dnode:%d, can't update ep:%s:%u to %s:%u since endpoint not matched", pair->id, pair->oldFqdn,
pair->oldPort, pair->newFqdn, pair->newPort, pDnodeEp->id);
tstrncpy(pDnodeEp->ep.fqdn, pair->newFqdn, TSDB_FQDN_LEN);
pDnodeEp->ep.port = pair->newPort;
}
#endif
}
}
for
(
int32_t
i
=
0
;
i
<
(
int32_t
)
taosArrayGetSize
(
pData
->
oldDnodeEps
);
++
i
)
{
SDnodeEpPair
*
pair
=
taosArrayGet
(
pData
->
oldDnodeEps
,
i
);
for
(
int32_t
j
=
0
;
j
<
(
int32_t
)
taosArrayGetSize
(
pData
->
dnodeEps
);
++
j
)
{
SDnodeEp
*
pDnodeEp
=
taosArrayGet
(
pData
->
dnodeEps
,
j
);
if
(
strcmp
(
pDnodeEp
->
ep
.
fqdn
,
pair
->
oldFqdn
)
==
0
&&
pDnodeEp
->
ep
.
port
==
pair
->
oldPort
)
{
dInfo
(
"dnode:%d, will update ep:%s:%u to %s:%u"
,
pDnodeEp
->
id
,
pDnodeEp
->
ep
.
fqdn
,
pDnodeEp
->
ep
.
port
,
pair
->
newFqdn
,
pair
->
newPort
);
tstrncpy
(
pDnodeEp
->
ep
.
fqdn
,
pair
->
newFqdn
,
TSDB_FQDN_LEN
);
pDnodeEp
->
ep
.
port
=
pair
->
newPort
;
}
}
}
pData
->
dnodeVer
=
0
;
return
code
;
}
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
01647deb
...
...
@@ -745,6 +745,7 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
if
(
tDecodeSMqConsumerObj
(
buf
,
pConsumer
)
==
NULL
)
{
goto
CM_DECODE_OVER
;
}
tmsgUpdateDnodeEpSet
(
&
pConsumer
->
ep
);
terrno
=
TSDB_CODE_SUCCESS
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
01647deb
...
...
@@ -180,6 +180,9 @@ static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) {
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
TSDB_DNODE_RESERVE_SIZE
,
_OVER
)
terrno
=
0
;
if
(
tmsgUpdateDnodeInfo
(
&
pDnode
->
id
,
NULL
,
pDnode
->
fqdn
,
&
pDnode
->
port
))
{
mInfo
(
"dnode:%d, endpoint changed"
,
pDnode
->
id
);
}
_OVER:
if
(
terrno
!=
0
)
{
...
...
@@ -188,7 +191,7 @@ _OVER:
return
NULL
;
}
mTrace
(
"dnode:%d, decode from raw:%p, row:%p
"
,
pDnode
->
id
,
pRaw
,
pDnode
);
mTrace
(
"dnode:%d, decode from raw:%p, row:%p
ep:%s:%u"
,
pDnode
->
id
,
pRaw
,
pDnode
,
pDnode
->
fqdn
,
pDnode
->
port
);
return
pRow
;
}
...
...
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
01647deb
...
...
@@ -747,7 +747,7 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
pNode
->
clusterId
=
mndGetClusterId
(
pMnode
);
pNode
->
nodePort
=
pObj
->
pDnode
->
port
;
tstrncpy
(
pNode
->
nodeFqdn
,
pObj
->
pDnode
->
fqdn
,
TSDB_FQDN_LEN
);
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
(
void
)
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
mInfo
(
"vgId:1, ep:%s:%u dnode:%d"
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
,
pNode
->
nodeId
);
if
(
pObj
->
pDnode
->
id
==
pMnode
->
selfDnodeId
)
{
cfg
.
myIndex
=
cfg
.
replicaNum
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
01647deb
...
...
@@ -760,6 +760,27 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
goto
SUB_DECODE_OVER
;
}
// update epset saved in mnode
if
(
pSub
->
unassignedVgs
!=
NULL
)
{
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pSub
->
unassignedVgs
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SMqVgEp
*
pMqVgEp
=
taosArrayGet
(
pSub
->
unassignedVgs
,
i
);
tmsgUpdateDnodeEpSet
(
&
pMqVgEp
->
epSet
);
}
}
if
(
pSub
->
consumerHash
!=
NULL
)
{
void
*
pIter
=
taosHashIterate
(
pSub
->
consumerHash
,
NULL
);
while
(
pIter
)
{
SMqConsumerEp
*
pConsumerEp
=
pIter
;
int32_t
size
=
(
int32_t
)
taosArrayGetSize
(
pConsumerEp
->
vgs
);
for
(
int32_t
i
=
0
;
i
<
size
;
++
i
)
{
SMqVgEp
*
pMqVgEp
=
taosArrayGet
(
pConsumerEp
->
vgs
,
i
);
tmsgUpdateDnodeEpSet
(
&
pMqVgEp
->
epSet
);
}
pIter
=
taosHashIterate
(
pSub
->
consumerHash
,
pIter
);
}
}
terrno
=
TSDB_CODE_SUCCESS
;
SUB_DECODE_OVER:
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
01647deb
...
...
@@ -303,7 +303,7 @@ int32_t mndInitSync(SMnode *pMnode) {
pNode
->
nodeId
=
pMgmt
->
replicas
[
i
].
id
;
pNode
->
nodePort
=
pMgmt
->
replicas
[
i
].
port
;
tstrncpy
(
pNode
->
nodeFqdn
,
pMgmt
->
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
(
void
)
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
mInfo
(
"vgId:1, index:%d ep:%s:%u dnode:%d cluster:%"
PRId64
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
,
pNode
->
nodeId
,
pNode
->
clusterId
);
}
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
01647deb
...
...
@@ -329,6 +329,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
action
.
pRaw
=
NULL
;
}
else
if
(
action
.
actionType
==
TRANS_ACTION_MSG
)
{
SDB_GET_BINARY
(
pRaw
,
dataPos
,
(
void
*
)
&
action
.
epSet
,
sizeof
(
SEpSet
),
_OVER
);
tmsgUpdateDnodeEpSet
(
&
action
.
epSet
);
SDB_GET_INT16
(
pRaw
,
dataPos
,
&
action
.
msgType
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
unused
/*&action.msgSent*/
,
_OVER
)
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
unused
/*&action.msgReceived*/
,
_OVER
)
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
01647deb
...
...
@@ -86,7 +86,7 @@ int32_t vnodeAlter(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs) {
pNode
->
nodeId
=
pReq
->
replicas
[
i
].
id
;
pNode
->
nodePort
=
pReq
->
replicas
[
i
].
port
;
tstrncpy
(
pNode
->
nodeFqdn
,
pReq
->
replicas
[
i
].
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
(
void
)
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
vInfo
(
"vgId:%d, replica:%d ep:%s:%u dnode:%d"
,
pReq
->
vgId
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
,
pNode
->
nodeId
);
}
...
...
@@ -134,6 +134,21 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
return
NULL
;
}
// save vnode info on dnode ep changed
bool
updated
=
false
;
SSyncCfg
*
pCfg
=
&
info
.
config
.
syncCfg
;
for
(
int32_t
i
=
0
;
i
<
pCfg
->
replicaNum
;
++
i
)
{
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
i
];
if
(
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
))
{
updated
=
true
;
}
}
if
(
updated
)
{
vInfo
(
"vgId:%d, save vnode info since dnode info changed"
,
info
.
config
.
vgId
);
(
void
)
vnodeSaveInfo
(
dir
,
&
info
);
(
void
)
vnodeCommitInfo
(
dir
,
&
info
);
}
// create handle
pVnode
=
taosMemoryCalloc
(
1
,
sizeof
(
*
pVnode
)
+
strlen
(
path
)
+
1
);
if
(
pVnode
==
NULL
)
{
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
01647deb
...
...
@@ -895,14 +895,25 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
// init by SSyncInfo
pSyncNode
->
vgId
=
pSyncInfo
->
vgId
;
SSyncCfg
*
pCfg
=
&
pSyncNode
->
raftCfg
.
cfg
;
bool
updated
=
false
;
sInfo
(
"vgId:%d, start to open sync node, replica:%d selfIndex:%d"
,
pSyncNode
->
vgId
,
pCfg
->
replicaNum
,
pCfg
->
myIndex
);
for
(
int32_t
i
=
0
;
i
<
pCfg
->
replicaNum
;
++
i
)
{
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
i
];
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
);
if
(
tmsgUpdateDnodeInfo
(
&
pNode
->
nodeId
,
&
pNode
->
clusterId
,
pNode
->
nodeFqdn
,
&
pNode
->
nodePort
))
{
updated
=
true
;
}
sInfo
(
"vgId:%d, index:%d ep:%s:%u dnode:%d cluster:%"
PRId64
,
pSyncNode
->
vgId
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
,
pNode
->
nodeId
,
pNode
->
clusterId
);
}
if
(
updated
)
{
sInfo
(
"vgId:%d, save config info since dnode info changed"
,
pSyncNode
->
vgId
);
if
(
syncWriteCfgFile
(
pSyncNode
)
!=
0
)
{
sError
(
"vgId:%d, failed to write sync cfg file on dnode info updated"
,
pSyncNode
->
vgId
);
goto
_error
;
}
}
pSyncNode
->
pWal
=
pSyncInfo
->
pWal
;
pSyncNode
->
msgcb
=
pSyncInfo
->
msgcb
;
pSyncNode
->
syncSendMSg
=
pSyncInfo
->
syncSendMSg
;
...
...
source/libs/transport/src/tmsgcb.c
浏览文件 @
01647deb
...
...
@@ -59,6 +59,12 @@ void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type) { (*defaultMsgCb.re
void
tmsgReportStartup
(
const
char
*
name
,
const
char
*
desc
)
{
(
*
defaultMsgCb
.
reportStartupFp
)(
name
,
desc
);
}
void
tmsgUpdateDnodeInfo
(
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
)
{
(
*
defaultMsgCb
.
updateDnodeInfoFp
)(
defaultMsgCb
.
data
,
dnodeId
,
clusterId
,
fqdn
,
port
);
bool
tmsgUpdateDnodeInfo
(
int32_t
*
dnodeId
,
int64_t
*
clusterId
,
char
*
fqdn
,
uint16_t
*
port
)
{
return
(
*
defaultMsgCb
.
updateDnodeInfoFp
)(
defaultMsgCb
.
data
,
dnodeId
,
clusterId
,
fqdn
,
port
);
}
void
tmsgUpdateDnodeEpSet
(
SEpSet
*
epset
)
{
for
(
int32_t
i
=
0
;
i
<
epset
->
numOfEps
;
++
i
)
{
tmsgUpdateDnodeInfo
(
NULL
,
NULL
,
epset
->
eps
[
i
].
fqdn
,
&
epset
->
eps
[
i
].
port
);
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录