Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
00644eef
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
00644eef
编写于
6月 14, 2022
作者:
S
Shengliang Guan
提交者:
GitHub
6月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #13791 from taosdata/fix/mnode
refactor: mnode worker
上级
f218e73f
fe6d46d2
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
267 addition
and
309 deletion
+267
-309
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+2
-3
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
+8
-16
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
+32
-54
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+115
-115
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
+14
-47
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
+70
-49
source/dnode/mgmt/node_mgmt/src/dmEnv.c
source/dnode/mgmt/node_mgmt/src/dmEnv.c
+5
-3
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+6
-8
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+2
-2
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+1
-3
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+9
-9
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+3
-0
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
00644eef
...
@@ -32,9 +32,7 @@ typedef struct {
...
@@ -32,9 +32,7 @@ typedef struct {
int32_t
dnodeId
;
int32_t
dnodeId
;
bool
standby
;
bool
standby
;
bool
deploy
;
bool
deploy
;
int8_t
replica
;
SReplica
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
SMsgCb
msgCb
;
SMsgCb
msgCb
;
}
SMnodeOpt
;
}
SMnodeOpt
;
...
@@ -83,6 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
...
@@ -83,6 +81,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
*/
*/
int32_t
mndProcessRpcMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessRpcMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndProcessSyncMsg
(
SRpcMsg
*
pMsg
);
int32_t
mndPreprocessQueryMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
);
/**
/**
* @brief Generate machine code
* @brief Generate machine code
...
...
source/dnode/mgmt/mgmt_mnode/inc/mmInt.h
浏览文件 @
00644eef
...
@@ -34,39 +34,31 @@ typedef struct SMnodeMgmt {
...
@@ -34,39 +34,31 @@ typedef struct SMnodeMgmt {
SSingleWorker
writeWorker
;
SSingleWorker
writeWorker
;
SSingleWorker
syncWorker
;
SSingleWorker
syncWorker
;
SSingleWorker
monitorWorker
;
SSingleWorker
monitorWorker
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
int8_t
replica
;
bool
stopped
;
bool
stopped
;
int32_t
refCount
;
int32_t
refCount
;
TdThreadRwlock
lock
;
TdThreadRwlock
lock
;
}
SMnodeMgmt
;
}
SMnodeMgmt
;
// mmFile.c
// mmFile.c
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
bool
*
pDeployed
);
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
SReplica
*
pReplica
,
bool
*
pDeployed
);
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
SDCreateMnodeReq
*
pMsg
,
bool
deployed
);
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
bool
deployed
);
// mmInt.c
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
);
void
mmRelease
(
SMnodeMgmt
*
pMgmt
);
// mmHandle.c
// mmHandle.c
SArray
*
mmGetMsgHandles
();
SArray
*
mmGetMsgHandles
();
int32_t
mmProcessCreateReq
(
const
SMgmtInputOpt
*
pInput
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessCreateReq
(
const
SMgmtInputOpt
*
pInput
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessDropReq
(
const
SMgmtInputOpt
*
pInput
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessDropReq
(
const
SMgmtInputOpt
*
pInput
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessAlterReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetMonitorInfoReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetMonitorInfoReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetLoadsReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmProcessGetLoadsReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mndPreprocessQueryMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
);
// mmWorker.c
// mmWorker.c
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
);
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
);
void
mmStopWorker
(
SMnodeMgmt
*
pMgmt
);
void
mmStopWorker
(
SMnodeMgmt
*
pMgmt
);
int32_t
mmPut
Node
MsgToWriteQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPutMsgToWriteQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPut
Node
MsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPutMsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPut
Node
MsgToReadQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPutMsgToReadQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPut
Node
MsgToQueryQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPutMsgToQueryQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPut
Node
MsgToMonitorQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPutMsgToMonitorQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
);
int32_t
mmPut
Rpc
MsgToQueue
(
SMnodeMgmt
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pRpc
);
int32_t
mmPutMsgToQueue
(
SMnodeMgmt
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pRpc
);
#ifdef __cplusplus
#ifdef __cplusplus
}
}
...
...
source/dnode/mgmt/mgmt_mnode/src/mmFile.c
浏览文件 @
00644eef
...
@@ -16,7 +16,7 @@
...
@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "mmInt.h"
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
bool
*
pDeployed
)
{
int32_t
mmReadFile
(
SMnodeMgmt
*
pMgmt
,
SReplica
*
pReplica
,
bool
*
pDeployed
)
{
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
code
=
TSDB_CODE_INVALID_JSON_FORMAT
;
int32_t
len
=
0
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
int32_t
maxLen
=
4096
;
...
@@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
...
@@ -52,61 +52,54 @@ int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed) {
}
}
*
pDeployed
=
deployed
->
valueint
;
*
pDeployed
=
deployed
->
valueint
;
cJSON
*
mnodes
=
cJSON_GetObjectItem
(
root
,
"mnodes
"
);
cJSON
*
id
=
cJSON_GetObjectItem
(
root
,
"id
"
);
if
(
mnodes
!=
NULL
)
{
if
(
id
)
{
if
(
!
mnodes
||
mnodes
->
type
!=
cJSON_Array
)
{
if
(
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since
nodes
not found"
,
file
);
dError
(
"failed to read %s since
id
not found"
,
file
);
goto
_OVER
;
goto
_OVER
;
}
}
if
(
pReplica
)
{
pReplica
->
id
=
id
->
valueint
;
}
}
pMgmt
->
replica
=
cJSON_GetArraySize
(
mnodes
);
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
root
,
"fqdn"
);
if
(
pMgmt
->
replica
<=
0
||
pMgmt
->
replica
>
TSDB_MAX_REPLICA
)
{
if
(
fqdn
)
{
dError
(
"failed to read %s since mnodes size %d invalid"
,
file
,
pMgmt
->
replica
);
if
(
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
_OVER
;
goto
_OVER
;
}
}
if
(
pReplica
)
{
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
cJSON
*
node
=
cJSON_GetArrayItem
(
mnodes
,
i
);
if
(
node
==
NULL
)
break
;
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
cJSON
*
id
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
id
||
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since id not found"
,
file
);
goto
_OVER
;
}
pReplica
->
id
=
id
->
valueint
;
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
if
(
!
fqdn
||
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
_OVER
;
}
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
}
}
cJSON
*
port
=
cJSON_GetObjectItem
(
node
,
"port"
);
cJSON
*
port
=
cJSON_GetObjectItem
(
root
,
"port"
);
if
(
!
port
||
port
->
type
!=
cJSON_Number
)
{
if
(
port
)
{
dError
(
"failed to read %s since port not found"
,
file
);
if
(
port
->
type
!=
cJSON_Number
)
{
goto
_OVER
;
dError
(
"failed to read %s since port not found"
,
file
);
}
goto
_OVER
;
pReplica
->
port
=
port
->
valueint
;
}
if
(
pReplica
)
{
pReplica
->
port
=
(
uint16_t
)
port
->
valueint
;
}
}
}
}
code
=
0
;
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
*
pDeployed
);
_OVER:
_OVER:
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
content
!=
NULL
)
taosMemoryFree
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
if
(
code
==
0
)
{
dDebug
(
"succcessed to read file %s, deployed:%d"
,
file
,
*
pDeployed
);
}
terrno
=
code
;
terrno
=
code
;
return
code
;
return
code
;
}
}
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
SDCreateMnodeReq
*
pMsg
,
bool
deployed
)
{
int32_t
mmWriteFile
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
bool
deployed
)
{
char
file
[
PATH_MAX
]
=
{
0
};
char
file
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json.bak"
,
pMgmt
->
path
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json.bak"
,
pMgmt
->
path
,
TD_DIRSEP
);
...
@@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
...
@@ -124,26 +117,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) {
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
char
*
content
=
taosMemoryCalloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
if
(
pReplica
!=
NULL
&&
pReplica
->
id
>
0
)
{
int8_t
replica
=
(
pMsg
!=
NULL
?
pMsg
->
replica
:
pMgmt
->
replica
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
if
(
replica
>
0
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
mnodes
\"
: [{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
,"
,
pReplica
->
port
);
for
(
int32_t
i
=
0
;
i
<
replica
;
++
i
)
{
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
if
(
pMsg
!=
NULL
)
{
pReplica
=
&
pMsg
->
replicas
[
i
];
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
"
,
pReplica
->
port
);
if
(
i
<
replica
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }],
\n
"
);
}
}
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d
\n
"
,
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d
\n
"
,
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
00644eef
...
@@ -20,11 +20,6 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
...
@@ -20,11 +20,6 @@ void mmGetMonitorInfo(SMnodeMgmt *pMgmt, SMonMmInfo *pInfo) {
mndGetMonitorInfo
(
pMgmt
->
pMnode
,
&
pInfo
->
cluster
,
&
pInfo
->
vgroup
,
&
pInfo
->
grant
);
mndGetMonitorInfo
(
pMgmt
->
pMnode
,
&
pInfo
->
cluster
,
&
pInfo
->
vgroup
,
&
pInfo
->
grant
);
}
}
void
mmGetMnodeLoads
(
SMnodeMgmt
*
pMgmt
,
SMonMloadInfo
*
pInfo
)
{
pInfo
->
isMnode
=
1
;
mndGetLoad
(
pMgmt
->
pMnode
,
&
pInfo
->
load
);
}
int32_t
mmProcessGetMonitorInfoReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmProcessGetMonitorInfoReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
SMonMmInfo
mmInfo
=
{
0
};
SMonMmInfo
mmInfo
=
{
0
};
mmGetMonitorInfo
(
pMgmt
,
&
mmInfo
);
mmGetMonitorInfo
(
pMgmt
,
&
mmInfo
);
...
@@ -50,6 +45,11 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
...
@@ -50,6 +45,11 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
0
;
return
0
;
}
}
void
mmGetMnodeLoads
(
SMnodeMgmt
*
pMgmt
,
SMonMloadInfo
*
pInfo
)
{
pInfo
->
isMnode
=
1
;
mndGetLoad
(
pMgmt
->
pMnode
,
&
pInfo
->
load
);
}
int32_t
mmProcessGetLoadsReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmProcessGetLoadsReq
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
SMonMloadInfo
mloads
=
{
0
};
SMonMloadInfo
mloads
=
{
0
};
mmGetMnodeLoads
(
pMgmt
,
&
mloads
);
mmGetMnodeLoads
(
pMgmt
,
&
mloads
);
...
@@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
...
@@ -90,7 +90,7 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SMnodeMgmt
mgmt
=
{
0
};
SMnodeMgmt
mgmt
=
{
0
};
mgmt
.
path
=
pInput
->
path
;
mgmt
.
path
=
pInput
->
path
;
mgmt
.
name
=
pInput
->
name
;
mgmt
.
name
=
pInput
->
name
;
if
(
mmWriteFile
(
&
mgmt
,
&
createReq
,
deployed
)
!=
0
)
{
if
(
mmWriteFile
(
&
mgmt
,
&
createReq
.
replicas
[
0
]
,
deployed
)
!=
0
)
{
dError
(
"failed to write mnode file since %s"
,
terrstr
());
dError
(
"failed to write mnode file since %s"
,
terrstr
());
return
-
1
;
return
-
1
;
}
}
...
@@ -126,117 +126,117 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
...
@@ -126,117 +126,117 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
SArray
*
mmGetMsgHandles
()
{
SArray
*
mmGetMsgHandles
()
{
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
SArray
*
pArray
=
taosArrayInit
(
64
,
sizeof
(
SMgmtHandle
));
SArray
*
pArray
=
taosArrayInit
(
128
,
sizeof
(
SMgmtHandle
));
if
(
pArray
==
NULL
)
goto
_OVER
;
if
(
pArray
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_MNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_MNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_MNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_MNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_QNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_QNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_QNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_QNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_SNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_SNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_SNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_SNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_BNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_BNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_BNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_BNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_VNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CREATE_VNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_VNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_DROP_VNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CONFIG_DNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_DND_CONFIG_DNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CONNECT
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CONNECT
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_ACCT
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_ACCT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_ACCT
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_ACCT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_ACCT
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_ACCT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_USER
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_USER
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_USER
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_USER
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_USER
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_USER
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_USER_AUTH
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_USER_AUTH
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_DNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_DNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CONFIG_DNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CONFIG_DNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_DNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_DNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_MNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_MNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_MNODE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_MNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_MNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_QNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_QNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_QNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_QNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_QNODE_LIST
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_QNODE_LIST
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_SNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_SNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_BNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_BNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_BNODE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_BNODE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_DB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_DB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_USE_DB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_USE_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_DB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_COMPACT_DB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_COMPACT_DB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_DB_CFG
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_DB_CFG
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_VGROUP_LIST
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_VGROUP_LIST
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_REDISTRIBUTE_VGROUP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_REDISTRIBUTE_VGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MERGE_VGROUP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MERGE_VGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_BALANCE_VGROUP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_BALANCE_VGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_FUNC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_FUNC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_RETRIEVE_FUNC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_RETRIEVE_FUNC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_FUNC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_FUNC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_STB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_STB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_STB
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_STB
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TABLE_META
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_TABLE_META
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_SMA
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_SMA
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SMA
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_SMA
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STREAM
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_STREAM
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_INDEX
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_INDEX
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_TABLE_INDEX
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GET_TABLE_INDEX
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_TOPIC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_CREATE_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_TOPIC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_ALTER_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_TOPIC
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_DROP_TOPIC
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SUBSCRIBE
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SUBSCRIBE
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_ASK_EP
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_ASK_EP
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_DROP_CGROUP_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_COMMIT_OFFSET
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_MQ_COMMIT_OFFSET
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_TRANS
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_TRANS
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_QUERY
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_QUERY
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_CONN
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_KILL_CONN
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_HEARTBEAT
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_HEARTBEAT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_STATUS
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_STATUS
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SYSTABLE_RETRIEVE
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SYSTABLE_RETRIEVE
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GRANT
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_GRANT
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_AUTH
,
mmPut
Node
MsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_AUTH
,
mmPutMsgToReadQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY
,
mmPut
Node
MsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY
,
mmPutMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_CONTINUE
,
mmPut
Node
MsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_CONTINUE
,
mmPutMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_HEARTBEAT
,
mmPut
Node
MsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_QUERY_HEARTBEAT
,
mmPutMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_FETCH
,
mmPut
Node
MsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_FETCH
,
mmPutMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_STB_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_STB_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_STB_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_STB_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_STB_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_STB_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_SMA_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_CREATE_SMA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_SMA_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_SMA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_DELETE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_MQ_VG_DELETE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
mmPut
Node
MsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_DROP_TASK
,
mmPutMsgToQueryQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_STREAM_TASK_DEPLOY_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIG_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_REPLICA_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_CONFIRM_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_HASHRANGE_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_ALTER_HASHRANGE_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT_RSP
,
mmPut
Node
MsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_VND_COMPACT_RSP
,
mmPutMsgToWriteQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MON_MM_INFO
,
mmPut
Node
MsgToMonitorQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MON_MM_INFO
,
mmPutMsgToMonitorQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MON_MM_LOAD
,
mmPut
Node
MsgToMonitorQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MON_MM_LOAD
,
mmPutMsgToMonitorQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_TIMEOUT
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_TIMEOUT
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING_REPLY
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_PING_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_REPLY
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_CLIENT_REQUEST_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE_REPLY
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_REQUEST_VOTE_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_SEND
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPut
Node
MsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_SYNC_SNAPSHOT_RSP
,
mmPutMsgToSyncQueue
,
1
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY
,
mmPut
Node
MsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
if
(
dmSetMgmtHandle
(
pArray
,
TDMT_MND_SET_STANDBY
,
mmPutMsgToSyncQueue
,
0
)
==
NULL
)
goto
_OVER
;
code
=
0
;
code
=
0
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmInt.c
浏览文件 @
00644eef
...
@@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
...
@@ -27,7 +27,7 @@ static bool mmDeployRequired(const SMgmtInputOpt *pInput) {
static
int32_t
mmRequire
(
const
SMgmtInputOpt
*
pInput
,
bool
*
required
)
{
static
int32_t
mmRequire
(
const
SMgmtInputOpt
*
pInput
,
bool
*
required
)
{
SMnodeMgmt
mgmt
=
{
0
};
SMnodeMgmt
mgmt
=
{
0
};
mgmt
.
path
=
pInput
->
path
;
mgmt
.
path
=
pInput
->
path
;
if
(
mmReadFile
(
&
mgmt
,
required
)
!=
0
)
{
if
(
mmReadFile
(
&
mgmt
,
NULL
,
required
)
!=
0
)
{
return
-
1
;
return
-
1
;
}
}
...
@@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
...
@@ -43,33 +43,19 @@ static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, const SMgmtInputOpt *pInpu
pOption
->
deploy
=
true
;
pOption
->
deploy
=
true
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
pOption
->
replica
.
id
=
1
;
pOption
->
replica
=
1
;
pOption
->
replica
.
port
=
tsServerPort
;
pOption
->
selfIndex
=
0
;
tstrncpy
(
pOption
->
replica
.
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
pReplica
->
id
=
1
;
pReplica
->
port
=
tsServerPort
;
tstrncpy
(
pReplica
->
fqdn
,
tsLocalFqdn
,
TSDB_FQDN_LEN
);
}
}
static
void
mmBuildOptionForOpen
(
SMnodeMgmt
*
pMgmt
,
SMnodeOpt
*
pOption
)
{
static
void
mmBuildOptionForOpen
(
SMnodeMgmt
*
pMgmt
,
const
SReplica
*
pReplica
,
SMnodeOpt
*
pOption
)
{
pOption
->
deploy
=
false
;
pOption
->
standby
=
false
;
pOption
->
standby
=
false
;
pOption
->
deploy
=
false
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
msgCb
=
pMgmt
->
msgCb
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
pOption
->
dnodeId
=
pMgmt
->
pData
->
dnodeId
;
if
(
pReplica
->
id
>
0
)
{
if
(
pMgmt
->
replica
>
0
)
{
pOption
->
standby
=
true
;
pOption
->
standby
=
true
;
pOption
->
replica
=
1
;
pOption
->
replica
=
*
pReplica
;
pOption
->
selfIndex
=
0
;
SReplica
*
pReplica
=
&
pOption
->
replicas
[
0
];
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
if
(
pMgmt
->
replicas
[
i
].
id
!=
pMgmt
->
pData
->
dnodeId
)
continue
;
pReplica
->
id
=
pMgmt
->
replicas
[
i
].
id
;
pReplica
->
port
=
pMgmt
->
replicas
[
i
].
port
;
memcpy
(
pReplica
->
fqdn
,
pMgmt
->
replicas
[
i
].
fqdn
,
TSDB_FQDN_LEN
);
}
}
}
}
}
...
@@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
...
@@ -105,12 +91,13 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt
->
path
=
pInput
->
path
;
pMgmt
->
path
=
pInput
->
path
;
pMgmt
->
name
=
pInput
->
name
;
pMgmt
->
name
=
pInput
->
name
;
pMgmt
->
msgCb
=
pInput
->
msgCb
;
pMgmt
->
msgCb
=
pInput
->
msgCb
;
pMgmt
->
msgCb
.
putToQueueFp
=
(
PutToQueueFp
)
mmPut
Rpc
MsgToQueue
;
pMgmt
->
msgCb
.
putToQueueFp
=
(
PutToQueueFp
)
mmPutMsgToQueue
;
pMgmt
->
msgCb
.
mgmt
=
pMgmt
;
pMgmt
->
msgCb
.
mgmt
=
pMgmt
;
taosThreadRwlockInit
(
&
pMgmt
->
lock
,
NULL
);
taosThreadRwlockInit
(
&
pMgmt
->
lock
,
NULL
);
bool
deployed
=
false
;
bool
deployed
=
false
;
if
(
mmReadFile
(
pMgmt
,
&
deployed
)
!=
0
)
{
SReplica
replica
=
{
0
};
if
(
mmReadFile
(
pMgmt
,
&
replica
,
&
deployed
)
!=
0
)
{
dError
(
"failed to read file since %s"
,
terrstr
());
dError
(
"failed to read file since %s"
,
terrstr
());
mmClose
(
pMgmt
);
mmClose
(
pMgmt
);
return
-
1
;
return
-
1
;
...
@@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
...
@@ -123,7 +110,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
mmBuildOptionForDeploy
(
pMgmt
,
pInput
,
&
option
);
mmBuildOptionForDeploy
(
pMgmt
,
pInput
,
&
option
);
}
else
{
}
else
{
dInfo
(
"mnode start to open"
);
dInfo
(
"mnode start to open"
);
mmBuildOptionForOpen
(
pMgmt
,
&
option
);
mmBuildOptionForOpen
(
pMgmt
,
&
replica
,
&
option
);
}
}
pMgmt
->
pMnode
=
mndOpen
(
pMgmt
->
path
,
&
option
);
pMgmt
->
pMnode
=
mndOpen
(
pMgmt
->
path
,
&
option
);
...
@@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
...
@@ -141,8 +128,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
}
}
tmsgReportStartup
(
"mnode-worker"
,
"initialized"
);
tmsgReportStartup
(
"mnode-worker"
,
"initialized"
);
if
(
!
deployed
||
pMgmt
->
replica
>
0
)
{
if
(
!
deployed
||
replica
.
id
>
0
)
{
pMgmt
->
replica
=
0
;
deployed
=
true
;
deployed
=
true
;
if
(
mmWriteFile
(
pMgmt
,
NULL
,
deployed
)
!=
0
)
{
if
(
mmWriteFile
(
pMgmt
,
NULL
,
deployed
)
!=
0
)
{
dError
(
"failed to write mnode file since %s"
,
terrstr
());
dError
(
"failed to write mnode file since %s"
,
terrstr
());
...
@@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() {
...
@@ -178,22 +164,3 @@ SMgmtFunc mmGetMgmtFunc() {
return
mgmtFunc
;
return
mgmtFunc
;
}
}
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
)
{
int32_t
code
=
0
;
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
if
(
pMgmt
->
stopped
)
{
code
=
-
1
;
}
else
{
atomic_add_fetch_32
(
&
pMgmt
->
refCount
,
1
);
}
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
return
code
;
}
void
mmRelease
(
SMnodeMgmt
*
pMgmt
)
{
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
atomic_sub_fetch_32
(
&
pMgmt
->
refCount
,
1
);
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
}
\ No newline at end of file
source/dnode/mgmt/mgmt_mnode/src/mmWorker.c
浏览文件 @
00644eef
...
@@ -16,6 +16,25 @@
...
@@ -16,6 +16,25 @@
#define _DEFAULT_SOURCE
#define _DEFAULT_SOURCE
#include "mmInt.h"
#include "mmInt.h"
static
inline
int32_t
mmAcquire
(
SMnodeMgmt
*
pMgmt
)
{
int32_t
code
=
0
;
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
if
(
pMgmt
->
stopped
)
{
code
=
-
1
;
}
else
{
atomic_add_fetch_32
(
&
pMgmt
->
refCount
,
1
);
}
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
return
code
;
}
static
inline
void
mmRelease
(
SMnodeMgmt
*
pMgmt
)
{
taosThreadRwlockRdlock
(
&
pMgmt
->
lock
);
atomic_sub_fetch_32
(
&
pMgmt
->
refCount
,
1
);
taosThreadRwlockUnlock
(
&
pMgmt
->
lock
);
}
static
inline
void
mmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
static
inline
void
mmSendRsp
(
SRpcMsg
*
pMsg
,
int32_t
code
)
{
SRpcMsg
rsp
=
{
SRpcMsg
rsp
=
{
.
code
=
code
,
.
code
=
code
,
...
@@ -26,7 +45,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
...
@@ -26,7 +45,7 @@ static inline void mmSendRsp(SRpcMsg *pMsg, int32_t code) {
tmsgSendRsp
(
&
rsp
);
tmsgSendRsp
(
&
rsp
);
}
}
static
void
mmProcess
Queue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
static
void
mmProcess
RpcMsg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
int32_t
code
=
-
1
;
int32_t
code
=
-
1
;
dTrace
(
"msg:%p, get from mnode queue"
,
pMsg
);
dTrace
(
"msg:%p, get from mnode queue"
,
pMsg
);
...
@@ -53,11 +72,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -53,11 +72,10 @@ static void mmProcessQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
taosFreeQitem
(
pMsg
);
}
}
static
void
mmProcessSync
Queue
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
static
void
mmProcessSync
Msg
(
SQueueInfo
*
pInfo
,
SRpcMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
SMnodeMgmt
*
pMgmt
=
pInfo
->
ahandle
;
dTrace
(
"msg:%p, get from mnode-sync queue"
,
pMsg
);
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
pMsg
->
info
.
node
=
pMgmt
->
pMnode
;
dTrace
(
"msg:%p, get from mnode-sync queue"
,
pMsg
);
SMsgHead
*
pHead
=
pMsg
->
pCont
;
SMsgHead
*
pHead
=
pMsg
->
pCont
;
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
pHead
->
contLen
=
ntohl
(
pHead
->
contLen
);
...
@@ -70,66 +88,69 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
...
@@ -70,66 +88,69 @@ static void mmProcessSyncQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
taosFreeQitem
(
pMsg
);
taosFreeQitem
(
pMsg
);
}
}
static
int32_t
mmPutNodeMsgToWorker
(
SSingleWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
static
inline
int32_t
mmPutMsgToWorker
(
SMnodeMgmt
*
pMgmt
,
SSingleWorker
*
pWorker
,
SRpcMsg
*
pMsg
)
{
dTrace
(
"msg:%p, put into %s queue, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pMsg
->
msgType
));
if
(
mmAcquire
(
pMgmt
)
==
0
)
{
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
dTrace
(
"msg:%p, put into %s queue, type:%s"
,
pMsg
,
pWorker
->
name
,
TMSG_INFO
(
pMsg
->
msgType
));
return
0
;
taosWriteQitem
(
pWorker
->
queue
,
pMsg
);
mmRelease
(
pMgmt
);
return
0
;
}
else
{
dTrace
(
"msg:%p, failed to put into %s queue since %s, type:%s"
,
pMsg
,
pWorker
->
name
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
));
return
-
1
;
}
}
}
int32_t
mmPut
Node
MsgToWriteQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmPutMsgToWriteQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPut
NodeMsgToWorker
(
&
pMgmt
->
writeWorker
,
pMsg
);
return
mmPut
MsgToWorker
(
pMgmt
,
&
pMgmt
->
writeWorker
,
pMsg
);
}
}
int32_t
mmPut
Node
MsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmPutMsgToSyncQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPut
NodeMsgToWorker
(
&
pMgmt
->
syncWorker
,
pMsg
);
return
mmPut
MsgToWorker
(
pMgmt
,
&
pMgmt
->
syncWorker
,
pMsg
);
}
}
int32_t
mmPut
Node
MsgToReadQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmPutMsgToReadQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPut
NodeMsgToWorker
(
&
pMgmt
->
readWorker
,
pMsg
);
return
mmPut
MsgToWorker
(
pMgmt
,
&
pMgmt
->
readWorker
,
pMsg
);
}
}
int32_t
mmPutNodeMsgToQueryQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmPutMsgToQueryQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
mndPreprocessQueryMsg
(
pMgmt
->
pMnode
,
pMsg
);
if
(
mndPreprocessQueryMsg
(
pMgmt
->
pMnode
,
pMsg
)
!=
0
)
{
dError
(
"msg:%p, failed to pre-process in mnode since %s, type:%s"
,
pMsg
,
terrstr
(),
TMSG_INFO
(
pMsg
->
msgType
));
return
mmPutNodeMsgToWorker
(
&
pMgmt
->
queryWorker
,
pMsg
);
return
-
1
;
}
return
mmPutMsgToWorker
(
pMgmt
,
&
pMgmt
->
queryWorker
,
pMsg
);
}
}
int32_t
mmPut
Node
MsgToMonitorQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
int32_t
mmPutMsgToMonitorQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
return
mmPut
NodeMsgToWorker
(
&
pMgmt
->
monitorWorker
,
pMsg
);
return
mmPut
MsgToWorker
(
pMgmt
,
&
pMgmt
->
monitorWorker
,
pMsg
);
}
}
int32_t
mmPutRpcMsgToQueue
(
SMnodeMgmt
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pRpc
)
{
int32_t
mmPutMsgToQueue
(
SMnodeMgmt
*
pMgmt
,
EQueueType
qtype
,
SRpcMsg
*
pRpc
)
{
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
SSingleWorker
*
pWorker
=
NULL
;
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
switch
(
qtype
)
{
switch
(
qtype
)
{
case
WRITE_QUEUE
:
case
WRITE_QUEUE
:
dTrace
(
"msg:%p, is created and will put into vnode-write queue"
,
pMsg
);
pWorker
=
&
pMgmt
->
writeWorker
;
taosWriteQitem
(
pMgmt
->
writeWorker
.
queue
,
pMsg
);
break
;
return
0
;
case
QUERY_QUEUE
:
case
QUERY_QUEUE
:
dTrace
(
"msg:%p, is created and will put into vnode-query queue"
,
pMsg
);
pWorker
=
&
pMgmt
->
queryWorker
;
taosWriteQitem
(
pMgmt
->
queryWorker
.
queue
,
pMsg
);
break
;
return
0
;
case
READ_QUEUE
:
case
READ_QUEUE
:
dTrace
(
"msg:%p, is created and will put into vnode-read queue"
,
pMsg
);
pWorker
=
&
pMgmt
->
readWorker
;
taosWriteQitem
(
pMgmt
->
readWorker
.
queue
,
pMsg
);
break
;
return
0
;
case
SYNC_QUEUE
:
case
SYNC_QUEUE
:
if
(
mmAcquire
(
pMgmt
)
==
0
)
{
pWorker
=
&
pMgmt
->
syncWorker
;
dTrace
(
"msg:%p, is created and will put into vnode-sync queue"
,
pMsg
);
break
;
taosWriteQitem
(
pMgmt
->
syncWorker
.
queue
,
pMsg
);
mmRelease
(
pMgmt
);
return
0
;
}
else
{
return
-
1
;
}
default:
default:
terrno
=
TSDB_CODE_INVALID_PARA
;
terrno
=
TSDB_CODE_INVALID_PARA
;
return
-
1
;
}
}
if
(
pWorker
==
NULL
)
return
-
1
;
SRpcMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SRpcMsg
),
RPC_QITEM
);
if
(
pMsg
==
NULL
)
return
-
1
;
memcpy
(
pMsg
,
pRpc
,
sizeof
(
SRpcMsg
));
dTrace
(
"msg:%p, is created and will put int %s queue"
,
pMsg
,
pWorker
->
name
);
return
mmPutMsgToWorker
(
pMgmt
,
pWorker
,
pMsg
);
}
}
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
)
{
int32_t
mmStartWorker
(
SMnodeMgmt
*
pMgmt
)
{
...
@@ -137,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...
@@ -137,7 +158,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
tsNumOfMnodeQueryThreads
,
.
min
=
tsNumOfMnodeQueryThreads
,
.
max
=
tsNumOfMnodeQueryThreads
,
.
max
=
tsNumOfMnodeQueryThreads
,
.
name
=
"mnode-query"
,
.
name
=
"mnode-query"
,
.
fp
=
(
FItem
)
mmProcess
Queue
,
.
fp
=
(
FItem
)
mmProcess
RpcMsg
,
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
queryWorker
,
&
qCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
queryWorker
,
&
qCfg
)
!=
0
)
{
...
@@ -149,7 +170,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...
@@ -149,7 +170,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
tsNumOfMnodeReadThreads
,
.
min
=
tsNumOfMnodeReadThreads
,
.
max
=
tsNumOfMnodeReadThreads
,
.
max
=
tsNumOfMnodeReadThreads
,
.
name
=
"mnode-read"
,
.
name
=
"mnode-read"
,
.
fp
=
(
FItem
)
mmProcess
Queue
,
.
fp
=
(
FItem
)
mmProcess
RpcMsg
,
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
readWorker
,
&
rCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
readWorker
,
&
rCfg
)
!=
0
)
{
...
@@ -161,7 +182,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...
@@ -161,7 +182,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
min
=
1
,
.
max
=
1
,
.
max
=
1
,
.
name
=
"mnode-write"
,
.
name
=
"mnode-write"
,
.
fp
=
(
FItem
)
mmProcess
Queue
,
.
fp
=
(
FItem
)
mmProcess
RpcMsg
,
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
writeWorker
,
&
wCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
writeWorker
,
&
wCfg
)
!=
0
)
{
...
@@ -173,7 +194,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...
@@ -173,7 +194,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
min
=
1
,
.
max
=
1
,
.
max
=
1
,
.
name
=
"mnode-sync"
,
.
name
=
"mnode-sync"
,
.
fp
=
(
FItem
)
mmProcessSync
Queue
,
.
fp
=
(
FItem
)
mmProcessSync
Msg
,
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncWorker
,
&
sCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
syncWorker
,
&
sCfg
)
!=
0
)
{
...
@@ -185,7 +206,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
...
@@ -185,7 +206,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
.
min
=
1
,
.
min
=
1
,
.
max
=
1
,
.
max
=
1
,
.
name
=
"mnode-monitor"
,
.
name
=
"mnode-monitor"
,
.
fp
=
(
FItem
)
mmProcess
Queue
,
.
fp
=
(
FItem
)
mmProcess
RpcMsg
,
.
param
=
pMgmt
,
.
param
=
pMgmt
,
};
};
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
if
(
tSingleWorkerInit
(
&
pMgmt
->
monitorWorker
,
&
mCfg
)
!=
0
)
{
...
...
source/dnode/mgmt/node_mgmt/src/dmEnv.c
浏览文件 @
00644eef
...
@@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
...
@@ -123,10 +123,12 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
dError
(
"node:%s, failed to create since %s"
,
pWrapper
->
name
,
terrstr
());
dError
(
"node:%s, failed to create since %s"
,
pWrapper
->
name
,
terrstr
());
}
else
{
}
else
{
dInfo
(
"node:%s, has been created"
,
pWrapper
->
name
);
dInfo
(
"node:%s, has been created"
,
pWrapper
->
name
);
(
void
)
dmOpenNode
(
pWrapper
);
code
=
dmOpenNode
(
pWrapper
);
(
void
)
dmStartNode
(
pWrapper
);
if
(
code
!=
0
)
{
pWrapper
->
required
=
true
;
code
=
dmStartNode
(
pWrapper
);
}
pWrapper
->
deployed
=
true
;
pWrapper
->
deployed
=
true
;
pWrapper
->
required
=
true
;
pWrapper
->
proc
.
ptype
=
pDnode
->
ptype
;
pWrapper
->
proc
.
ptype
=
pDnode
->
ptype
;
}
}
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
00644eef
...
@@ -76,11 +76,12 @@ typedef struct {
...
@@ -76,11 +76,12 @@ typedef struct {
}
STelemMgmt
;
}
STelemMgmt
;
typedef
struct
{
typedef
struct
{
sem_t
syncSem
;
sem_t
syncSem
;
int64_t
sync
;
int64_t
sync
;
bool
standby
;
bool
standby
;
int32_t
errCode
;
SReplica
replica
;
int32_t
transId
;
int32_t
errCode
;
int32_t
transId
;
}
SSyncMgmt
;
}
SSyncMgmt
;
typedef
struct
{
typedef
struct
{
...
@@ -98,9 +99,6 @@ typedef struct SMnode {
...
@@ -98,9 +99,6 @@ typedef struct SMnode {
bool
stopped
;
bool
stopped
;
bool
restored
;
bool
restored
;
bool
deploy
;
bool
deploy
;
int8_t
replica
;
int8_t
selfIndex
;
SReplica
replicas
[
TSDB_MAX_REPLICA
];
char
*
path
;
char
*
path
;
int64_t
checkTime
;
int64_t
checkTime
;
SSdb
*
pSdb
;
SSdb
*
pSdb
;
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
00644eef
...
@@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
...
@@ -95,8 +95,8 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
dnodeObj
.
id
=
1
;
dnodeObj
.
id
=
1
;
dnodeObj
.
createdTime
=
taosGetTimestampMs
();
dnodeObj
.
createdTime
=
taosGetTimestampMs
();
dnodeObj
.
updateTime
=
dnodeObj
.
createdTime
;
dnodeObj
.
updateTime
=
dnodeObj
.
createdTime
;
dnodeObj
.
port
=
pMnode
->
replicas
[
0
].
p
ort
;
dnodeObj
.
port
=
tsServerP
ort
;
memcpy
(
&
dnodeObj
.
fqdn
,
pMnode
->
replicas
[
0
].
f
qdn
,
TSDB_FQDN_LEN
);
memcpy
(
&
dnodeObj
.
fqdn
,
tsLocalF
qdn
,
TSDB_FQDN_LEN
);
snprintf
(
dnodeObj
.
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
dnodeObj
.
fqdn
,
dnodeObj
.
port
);
snprintf
(
dnodeObj
.
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
dnodeObj
.
fqdn
,
dnodeObj
.
port
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
00644eef
...
@@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) {
...
@@ -289,11 +289,9 @@ static int32_t mndExecSteps(SMnode *pMnode) {
}
}
static
void
mndSetOptions
(
SMnode
*
pMnode
,
const
SMnodeOpt
*
pOption
)
{
static
void
mndSetOptions
(
SMnode
*
pMnode
,
const
SMnodeOpt
*
pOption
)
{
pMnode
->
replica
=
pOption
->
replica
;
pMnode
->
selfIndex
=
pOption
->
selfIndex
;
memcpy
(
&
pMnode
->
replicas
,
pOption
->
replicas
,
sizeof
(
SReplica
)
*
TSDB_MAX_REPLICA
);
pMnode
->
msgCb
=
pOption
->
msgCb
;
pMnode
->
msgCb
=
pOption
->
msgCb
;
pMnode
->
selfDnodeId
=
pOption
->
dnodeId
;
pMnode
->
selfDnodeId
=
pOption
->
dnodeId
;
pMnode
->
syncMgmt
.
replica
=
pOption
->
replica
;
pMnode
->
syncMgmt
.
standby
=
pOption
->
standby
;
pMnode
->
syncMgmt
.
standby
=
pOption
->
standby
;
}
}
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
00644eef
...
@@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) {
...
@@ -188,15 +188,15 @@ int32_t mndInitSync(SMnode *pMnode) {
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
syncInfo
.
isStandBy
=
pMgmt
->
standby
;
syncInfo
.
snapshotEnable
=
true
;
syncInfo
.
snapshotEnable
=
true
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
mInfo
(
"start to open mnode sync, standby:%d"
,
pMgmt
->
standby
)
;
pCfg
->
replicaNum
=
pMnode
->
replica
;
if
(
pMgmt
->
standby
||
pMgmt
->
replica
.
id
>
0
)
{
pCfg
->
myIndex
=
pMnode
->
selfIndex
;
SSyncCfg
*
pCfg
=
&
syncInfo
.
syncCfg
;
mInfo
(
"start to open mnode sync, replica:%d myindex:%d standby:%d"
,
pCfg
->
replicaNum
,
pCfg
->
myIndex
,
pMgmt
->
standby
)
;
pCfg
->
replicaNum
=
1
;
for
(
int32_t
i
=
0
;
i
<
pMnode
->
replica
;
++
i
)
{
pCfg
->
myIndex
=
0
;
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
i
];
SNodeInfo
*
pNode
=
&
pCfg
->
nodeInfo
[
0
];
tstrncpy
(
pNode
->
nodeFqdn
,
pM
node
->
replicas
[
i
]
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
tstrncpy
(
pNode
->
nodeFqdn
,
pM
gmt
->
replica
.
fqdn
,
sizeof
(
pNode
->
nodeFqdn
));
pNode
->
nodePort
=
pM
node
->
replicas
[
i
]
.
port
;
pNode
->
nodePort
=
pM
gmt
->
replica
.
port
;
mInfo
(
"
index:%d, fqdn:%s port:%d"
,
i
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
mInfo
(
"
fqdn:%s port:%u"
,
pNode
->
nodeFqdn
,
pNode
->
nodePort
);
}
}
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
tsem_init
(
&
pMgmt
->
syncSem
,
0
,
0
);
...
...
source/libs/transport/src/transCli.c
浏览文件 @
00644eef
...
@@ -983,6 +983,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
...
@@ -983,6 +983,9 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
SEpSet
epSet
=
{
0
};
SEpSet
epSet
=
{
0
};
tDeserializeSEpSet
(
pResp
->
pCont
,
pResp
->
contLen
,
&
epSet
);
tDeserializeSEpSet
(
pResp
->
pCont
,
pResp
->
contLen
,
&
epSet
);
pCtx
->
epSet
=
epSet
;
pCtx
->
epSet
=
epSet
;
if
(
!
transEpSetIsEqual
(
&
epSet
,
&
pCtx
->
epSet
))
{
pCtx
->
retryCount
=
0
;
}
}
}
addConnToPool
(
pThrd
->
pool
,
pConn
);
addConnToPool
(
pThrd
->
pool
,
pConn
);
tTrace
(
"use remote epset, current in use: %d, retry count:%d, try limit: %d"
,
pEpSet
->
inUse
,
pCtx
->
retryCount
+
1
,
tTrace
(
"use remote epset, current in use: %d, retry count:%d, try limit: %d"
,
pEpSet
->
inUse
,
pCtx
->
retryCount
+
1
,
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录