Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c6e6ac21
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c6e6ac21
编写于
3月 10, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
c673ad93
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
500 addition
and
507 deletion
+500
-507
include/dnode/mnode/mnode.h
include/dnode/mnode/mnode.h
+13
-8
include/util/tprocess.h
include/util/tprocess.h
+1
-0
source/dnode/mgmt/impl/inc/dndEnv.h
source/dnode/mgmt/impl/inc/dndEnv.h
+5
-1
source/dnode/mgmt/impl/inc/dndInt.h
source/dnode/mgmt/impl/inc/dndInt.h
+2
-0
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
+14
-4
source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c
+151
-150
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
+64
-243
source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c
source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c
+174
-0
source/dnode/mgmt/impl/src/dndTransport.c
source/dnode/mgmt/impl/src/dndTransport.c
+68
-68
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+0
-10
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+8
-23
未找到文件。
include/dnode/mnode/mnode.h
浏览文件 @
c6e6ac21
...
...
@@ -32,6 +32,18 @@ typedef int32_t (*PutReqToMWriteQFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg);
typedef
int32_t
(
*
PutReqToMReadQFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
void
(
*
SendRedirectRspFp
)(
SDnode
*
pDnode
,
struct
SRpcMsg
*
rpcMsg
);
typedef
struct
SMnodeMsg
{
char
user
[
TSDB_USER_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int32_t
acctId
;
SMnode
*
pMnode
;
int64_t
createdTime
;
SRpcMsg
rpcMsg
;
int32_t
contLen
;
void
*
pCont
;
}
SMnodeMsg
;
typedef
struct
{
int32_t
dnodeId
;
int64_t
clusterId
;
...
...
@@ -111,14 +123,7 @@ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, cha
* @param pMsg The request rpc msg.
* @return int32_t The created mnode msg.
*/
SMnodeMsg
*
mndInitMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pRpcMsg
);
/**
* @brief Cleanup mnode msg.
*
* @param pMsg The request msg.
*/
void
mndCleanupMsg
(
SMnodeMsg
*
pMsg
);
int32_t
mndBuildMsg
(
SMnodeMsg
*
pMnodeMsg
,
SRpcMsg
*
pRpcMsg
);
/**
* @brief Cleanup mnode msg.
...
...
include/util/tprocess.h
浏览文件 @
c6e6ac21
...
...
@@ -48,6 +48,7 @@ typedef struct {
void
*
pParent
;
bool
stopFlag
;
bool
testFlag
;
bool
isChild
;
}
SProcObj
;
SProcObj
*
taosProcInit
(
const
SProcCfg
*
pCfg
);
...
...
source/dnode/mgmt/impl/inc/dndEnv.h
浏览文件 @
c6e6ac21
...
...
@@ -62,6 +62,8 @@ typedef struct {
SDnodeWorker
statusWorker
;
}
SDnodeMgmt
;
typedef
enum
{
SINGLE_PROC
,
MULTI_PROC_PARENT
,
MULTI_PROC_CHILD
}
EProcType
;
typedef
struct
{
int32_t
refCount
;
int8_t
deployed
;
...
...
@@ -76,8 +78,10 @@ typedef struct {
SReplica
replicas
[
TSDB_MAX_REPLICA
];
//
bool
multiProcess
;
MndMsgFp
msgFp
[
TDMT_MAX
]
;
SProcObj
*
pProcess
;
bool
singleProc
;
bool
isChild
;
}
SMnodeMgmt
;
typedef
struct
{
...
...
source/dnode/mgmt/impl/inc/dndInt.h
浏览文件 @
c6e6ac21
...
...
@@ -61,6 +61,8 @@ typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType;
typedef
enum
{
DND_ENV_INIT
=
0
,
DND_ENV_READY
=
1
,
DND_ENV_CLEANUP
=
2
}
EEnvStat
;
typedef
void
(
*
DndMsgFp
)(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEps
);
typedef
int32_t
(
*
MndMsgFp
)(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
EStat
dndGetStat
(
SDnode
*
pDnode
);
void
dndSetStat
(
SDnode
*
pDnode
,
EStat
stat
);
const
char
*
dndStatStr
(
EStat
stat
);
...
...
source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h
浏览文件 @
c6e6ac21
...
...
@@ -21,24 +21,34 @@ extern "C" {
#endif
#include "dndEnv.h"
// interface
int32_t
mmInit
(
SDnode
*
pDnode
);
void
mmCleanup
(
SDnode
*
pDnode
);
// internal
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
);
SMnode
*
mmAcquire
(
SDnode
*
pDnode
);
void
mmRelease
(
SDnode
*
pDnode
,
SMnode
*
pMnode
);
// mmFile
int32_t
mmReadFile
(
SDnode
*
pDnode
);
int32_t
mmWriteFile
(
SDnode
*
pDnode
);
// mmMsg
void
mmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// mmQueue
int32_t
mmWriteToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
);
////////////
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
void
dndProcessMnodeReadMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessMnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessMnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
dndProcessCreateMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndProcessAlterMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
dndProcessDropMnodeReq
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
);
SMonGrantInfo
*
pGrantInfo
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c
浏览文件 @
c6e6ac21
...
...
@@ -15,153 +15,154 @@
#define _DEFAULT_SOURCE
#include "mm.h"
#include "dndMgmt.h"
#include "dndTransport.h"
#include "dndWorker.h"
// int32_t mmReadFile(SMnodeMgmt *pMgmt) {
// int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR;
// int32_t len = 0;
// int32_t maxLen = 4096;
// char *content = calloc(1, maxLen + 1);
// cJSON *root = NULL;
// char file[PATH_MAX] = {0};
// snprintf(file, sizeof(file), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
// TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
// if (pFile == NULL) {
// dDebug("file %s not exist", file);
// code = 0;
// goto PRASE_MNODE_OVER;
// }
// len = (int32_t)taosReadFile(pFile, content, maxLen);
// if (len <= 0) {
// dError("failed to read %s since content is null", file);
// goto PRASE_MNODE_OVER;
// }
// content[len] = 0;
// root = cJSON_Parse(content);
// if (root == NULL) {
// dError("failed to read %s since invalid json format", file);
// goto PRASE_MNODE_OVER;
// }
// cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
// if (!deployed || deployed->type != cJSON_Number) {
// dError("failed to read %s since deployed not found", file);
// goto PRASE_MNODE_OVER;
// }
// pMgmt->deployed = deployed->valueint;
// cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
// if (!dropped || dropped->type != cJSON_Number) {
// dError("failed to read %s since dropped not found", file);
// goto PRASE_MNODE_OVER;
// }
// pMgmt->dropped = dropped->valueint;
// cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes");
// if (!mnodes || mnodes->type != cJSON_Array) {
// dError("failed to read %s since nodes not found", file);
// goto PRASE_MNODE_OVER;
// }
// pMgmt->replica = cJSON_GetArraySize(mnodes);
// if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) {
// dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica);
// goto PRASE_MNODE_OVER;
// }
// for (int32_t i = 0; i < pMgmt->replica; ++i) {
// cJSON *node = cJSON_GetArrayItem(mnodes, i);
// if (node == NULL) break;
// SReplica *pReplica = &pMgmt->replicas[i];
// cJSON *id = cJSON_GetObjectItem(node, "id");
// if (!id || id->type != cJSON_Number) {
// dError("failed to read %s since id not found", file);
// goto PRASE_MNODE_OVER;
// }
// pReplica->id = id->valueint;
// cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn");
// if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) {
// dError("failed to read %s since fqdn not found", file);
// goto PRASE_MNODE_OVER;
// }
// tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN);
// cJSON *port = cJSON_GetObjectItem(node, "port");
// if (!port || port->type != cJSON_Number) {
// dError("failed to read %s since port not found", file);
// goto PRASE_MNODE_OVER;
// }
// pReplica->port = port->valueint;
// }
// code = 0;
// dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
// PRASE_MNODE_OVER:
// if (content != NULL) free(content);
// if (root != NULL) cJSON_Delete(root);
// if (pFile != NULL) taosCloseFile(&pFile);
// terrno = code;
// return code;
// }
// int32_t mmWriteFile(SMnodeMgmt *pMgmt) {
// char file[PATH_MAX] = {0};
// snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
// TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
// if (pFile == NULL) {
// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
// dError("failed to write %s since %s", file, terrstr());
// return -1;
// }
// int32_t len = 0;
// int32_t maxLen = 4096;
// char *content = calloc(1, maxLen + 1);
// len += snprintf(content + len, maxLen - len, "{\n");
// len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
// len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped);
// len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n");
// for (int32_t i = 0; i < pMgmt->replica; ++i) {
// SReplica *pReplica = &pMgmt->replicas[i];
// len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id);
// len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn);
// len += snprintf(content + len, maxLen - len, " \"port\": %u\n", pReplica->port);
// if (i < pMgmt->replica - 1) {
// len += snprintf(content + len, maxLen - len, " },{\n");
// } else {
// len += snprintf(content + len, maxLen - len, " }]\n");
// }
// }
// len += snprintf(content + len, maxLen - len, "}\n");
// taosWriteFile(pFile, content, len);
// taosFsyncFile(pFile);
// taosCloseFile(&pFile);
// free(content);
// char realfile[PATH_MAX];
// snprintf(realfile, sizeof(realfile), "%s%smnode.json", pMgmt->path, TD_DIRSEP);
// if (taosRenameFile(file, realfile) != 0) {
// terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR;
// dError("failed to rename %s since %s", file, terrstr());
// return -1;
// }
// dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
// return 0;
// }
\ No newline at end of file
int32_t
mmReadFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
int32_t
code
=
TSDB_CODE_DND_MNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json"
,
pDnode
->
dir
.
dnode
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_MNODE_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_MNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_MNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
deployed
=
deployed
->
valueint
;
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
dropped
=
dropped
->
valueint
;
cJSON
*
mnodes
=
cJSON_GetObjectItem
(
root
,
"mnodes"
);
if
(
!
mnodes
||
mnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since nodes not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
replica
=
cJSON_GetArraySize
(
mnodes
);
if
(
pMgmt
->
replica
<=
0
||
pMgmt
->
replica
>
TSDB_MAX_REPLICA
)
{
dError
(
"failed to read %s since mnodes size %d invalid"
,
file
,
pMgmt
->
replica
);
goto
PRASE_MNODE_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
cJSON
*
node
=
cJSON_GetArrayItem
(
mnodes
,
i
);
if
(
node
==
NULL
)
break
;
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
cJSON
*
id
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
id
||
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since id not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pReplica
->
id
=
id
->
valueint
;
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
if
(
!
fqdn
||
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
port
=
cJSON_GetObjectItem
(
node
,
"port"
);
if
(
!
port
||
port
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since port not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pReplica
->
port
=
port
->
valueint
;
}
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
PRASE_MNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
terrno
=
code
;
return
code
;
}
int32_t
mmWriteFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
char
file
[
PATH_MAX
];
snprintf
(
file
,
sizeof
(
file
),
"%s%smnode.json.bak"
,
pDnode
->
dir
.
dnode
,
TD_DIRSEP
);
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR
;
dError
(
"failed to write %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d,
\n
"
,
pMgmt
->
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
pMgmt
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
mnodes
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
"
,
pReplica
->
port
);
if
(
i
<
pMgmt
->
replica
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
taosWriteFile
(
pFile
,
content
,
len
);
taosFsyncFile
(
pFile
);
taosCloseFile
(
&
pFile
);
free
(
content
);
char
realfile
[
PATH_MAX
+
20
];
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%smnode.json"
,
pDnode
->
dir
.
dnode
,
TD_DIRSEP
);
if
(
taosRenameFile
(
file
,
realfile
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR
;
dError
(
"failed to rename %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
dInfo
(
"successed to write %s, deployed:%d dropped:%d"
,
realfile
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
return
0
;
}
source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c
浏览文件 @
c6e6ac21
...
...
@@ -14,15 +14,17 @@
*/
#define _DEFAULT_SOURCE
#include "mm.h"
#include "dndMgmt.h"
#include "dndTransport.h"
#include "dndWorker.h"
#include "mm.h"
static
void
dndWriteMnodeMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
);
static
void
dndProcessMnodeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
);
static
SMnode
*
dndAcquireMnod
e
(
SDnode
*
pDnode
)
{
SMnode
*
mmAcquir
e
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
NULL
;
int32_t
refCount
=
0
;
...
...
@@ -42,7 +44,7 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) {
return
pMnode
;
}
static
void
dndReleaseMnod
e
(
SDnode
*
pDnode
,
SMnode
*
pMnode
)
{
void
mmReleas
e
(
SDnode
*
pDnode
,
SMnode
*
pMnode
)
{
if
(
pMnode
==
NULL
)
return
;
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
...
...
@@ -52,158 +54,6 @@ static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) {
dTrace
(
"release mnode, refCount:%d"
,
refCount
);
}
int32_t
mmReadFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
int32_t
code
=
TSDB_CODE_DND_MNODE_READ_FILE_ERROR
;
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
cJSON
*
root
=
NULL
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/mnode.json"
,
pDnode
->
dir
.
dnode
);
// FILE *fp = fopen(file, "r");
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_READ
);
if
(
pFile
==
NULL
)
{
dDebug
(
"file %s not exist"
,
file
);
code
=
0
;
goto
PRASE_MNODE_OVER
;
}
len
=
(
int32_t
)
taosReadFile
(
pFile
,
content
,
maxLen
);
if
(
len
<=
0
)
{
dError
(
"failed to read %s since content is null"
,
file
);
goto
PRASE_MNODE_OVER
;
}
content
[
len
]
=
0
;
root
=
cJSON_Parse
(
content
);
if
(
root
==
NULL
)
{
dError
(
"failed to read %s since invalid json format"
,
file
);
goto
PRASE_MNODE_OVER
;
}
cJSON
*
deployed
=
cJSON_GetObjectItem
(
root
,
"deployed"
);
if
(
!
deployed
||
deployed
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since deployed not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
deployed
=
deployed
->
valueint
;
cJSON
*
dropped
=
cJSON_GetObjectItem
(
root
,
"dropped"
);
if
(
!
dropped
||
dropped
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since dropped not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
dropped
=
dropped
->
valueint
;
cJSON
*
mnodes
=
cJSON_GetObjectItem
(
root
,
"mnodes"
);
if
(
!
mnodes
||
mnodes
->
type
!=
cJSON_Array
)
{
dError
(
"failed to read %s since nodes not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pMgmt
->
replica
=
cJSON_GetArraySize
(
mnodes
);
if
(
pMgmt
->
replica
<=
0
||
pMgmt
->
replica
>
TSDB_MAX_REPLICA
)
{
dError
(
"failed to read %s since mnodes size %d invalid"
,
file
,
pMgmt
->
replica
);
goto
PRASE_MNODE_OVER
;
}
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
cJSON
*
node
=
cJSON_GetArrayItem
(
mnodes
,
i
);
if
(
node
==
NULL
)
break
;
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
cJSON
*
id
=
cJSON_GetObjectItem
(
node
,
"id"
);
if
(
!
id
||
id
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since id not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pReplica
->
id
=
id
->
valueint
;
cJSON
*
fqdn
=
cJSON_GetObjectItem
(
node
,
"fqdn"
);
if
(
!
fqdn
||
fqdn
->
type
!=
cJSON_String
||
fqdn
->
valuestring
==
NULL
)
{
dError
(
"failed to read %s since fqdn not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
tstrncpy
(
pReplica
->
fqdn
,
fqdn
->
valuestring
,
TSDB_FQDN_LEN
);
cJSON
*
port
=
cJSON_GetObjectItem
(
node
,
"port"
);
if
(
!
port
||
port
->
type
!=
cJSON_Number
)
{
dError
(
"failed to read %s since port not found"
,
file
);
goto
PRASE_MNODE_OVER
;
}
pReplica
->
port
=
port
->
valueint
;
}
code
=
0
;
dDebug
(
"succcessed to read file %s, deployed:%d dropped:%d"
,
file
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
PRASE_MNODE_OVER:
if
(
content
!=
NULL
)
free
(
content
);
if
(
root
!=
NULL
)
cJSON_Delete
(
root
);
if
(
pFile
!=
NULL
)
taosCloseFile
(
&
pFile
);
terrno
=
code
;
return
code
;
}
int32_t
mmWriteFile
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
char
file
[
PATH_MAX
+
20
];
snprintf
(
file
,
PATH_MAX
+
20
,
"%s/mnode.json.bak"
,
pDnode
->
dir
.
dnode
);
// FILE *fp = fopen(file, "w");
TdFilePtr
pFile
=
taosOpenFile
(
file
,
TD_FILE_CTEATE
|
TD_FILE_WRITE
|
TD_FILE_TRUNC
);
if
(
pFile
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR
;
dError
(
"failed to write %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
int32_t
len
=
0
;
int32_t
maxLen
=
4096
;
char
*
content
=
calloc
(
1
,
maxLen
+
1
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"{
\n
"
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
deployed
\"
: %d,
\n
"
,
pMgmt
->
deployed
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
dropped
\"
: %d,
\n
"
,
pMgmt
->
dropped
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
mnodes
\"
: [{
\n
"
);
for
(
int32_t
i
=
0
;
i
<
pMgmt
->
replica
;
++
i
)
{
SReplica
*
pReplica
=
&
pMgmt
->
replicas
[
i
];
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
id
\"
: %d,
\n
"
,
pReplica
->
id
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
fqdn
\"
:
\"
%s
\"
,
\n
"
,
pReplica
->
fqdn
);
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"
\"
port
\"
: %u
\n
"
,
pReplica
->
port
);
if
(
i
<
pMgmt
->
replica
-
1
)
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" },{
\n
"
);
}
else
{
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
" }]
\n
"
);
}
}
len
+=
snprintf
(
content
+
len
,
maxLen
-
len
,
"}
\n
"
);
taosWriteFile
(
pFile
,
content
,
len
);
taosFsyncFile
(
pFile
);
taosCloseFile
(
&
pFile
);
free
(
content
);
char
realfile
[
PATH_MAX
+
20
];
snprintf
(
realfile
,
PATH_MAX
+
20
,
"%s/mnode.json"
,
pDnode
->
dir
.
dnode
);
if
(
taosRenameFile
(
file
,
realfile
)
!=
0
)
{
terrno
=
TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR
;
dError
(
"failed to rename %s since %s"
,
file
,
terrstr
());
return
-
1
;
}
dInfo
(
"successed to write %s, deployed:%d dropped:%d"
,
realfile
,
pMgmt
->
deployed
,
pMgmt
->
dropped
);
return
0
;
}
static
int32_t
mmStartWorker
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
if
(
dndInitWorker
(
pDnode
,
&
pMgmt
->
readWorker
,
DND_WORKER_SINGLE
,
"mnode-read"
,
0
,
1
,
dndProcessMnodeQueue
)
!=
0
)
{
...
...
@@ -256,14 +106,40 @@ static bool mmDeployRequired(SDnode *pDnode) {
return
true
;
}
static
int32_t
dndPutMsgToMWriteQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
dndWriteMnodeMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pRpcMsg
);
return
0
;
static
int32_t
mmPutMsgToQueue
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
contLen
=
sizeof
(
SMnodeMsg
)
+
pRpcMsg
->
contLen
;
SMnodeMsg
*
pMnodeMsg
=
taosAllocateQitem
(
contLen
);
if
(
pMnodeMsg
==
NULL
)
{
return
-
1
;
}
pMnodeMsg
->
contLen
=
pRpcMsg
->
contLen
;
pMnodeMsg
->
pCont
=
(
char
*
)
pMnodeMsg
+
sizeof
(
SMnodeMsg
);
memcpy
(
pMnodeMsg
->
pCont
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
rpcFreeCont
(
pRpcMsg
->
pCont
);
int32_t
code
=
mmWriteToWorker
(
pDnode
,
pWorker
,
pMnodeMsg
);
if
(
code
!=
0
)
{
taosFreeQitem
(
pMnodeMsg
);
}
return
code
;
}
static
int32_t
dndPutMsgToMReadQ
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
dndWriteMnodeMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pRpcMsg
);
return
0
;
static
int32_t
mmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutMsgToQueue
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pRpcMsg
);
}
static
int32_t
mmPutMsgToReadQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
)
{
return
mmPutMsgToQueue
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pRpcMsg
);
}
static
void
mmProcessChildQueue
(
SDnode
*
pDnode
,
SBlockItem
*
pBlock
)
{
SMnodeMsg
*
pMsg
=
(
SMnodeMsg
*
)
pBlock
->
pCont
;
if
(
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMsg
)
!=
0
)
{
//todo
}
}
static
void
mmInitOptionImp
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
...
...
@@ -271,8 +147,8 @@ static void mmInitOptionImp(SDnode *pDnode, SMnodeOpt *pOption) {
pOption
->
sendReqToDnodeFp
=
dndSendReqToDnode
;
pOption
->
sendReqToMnodeFp
=
dndSendReqToMnode
;
pOption
->
sendRedirectRspFp
=
dndSendRedirectRsp
;
pOption
->
putReqToMWriteQFp
=
dndPutMsgToMWriteQ
;
pOption
->
putReqToMReadQFp
=
dndPutMsgToMReadQ
;
pOption
->
putReqToMWriteQFp
=
mmPutMsgToWriteQueue
;
pOption
->
putReqToMReadQFp
=
mmPutMsgToReadQueue
;
pOption
->
dnodeId
=
dndGetDnodeId
(
pDnode
);
pOption
->
clusterId
=
dndGetClusterId
(
pDnode
);
}
...
...
@@ -364,23 +240,18 @@ static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) {
return
0
;
}
static
void
dndMnodeProcessChildQueue
(
SDnode
*
pDnode
,
SBlockItem
*
pBlock
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
pBlock
->
pCont
;
dndWriteMnodeMsgToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMsg
);
free
(
pBlock
);
}
static
void
dndMnodeProcessParentQueue
(
SMnodeMgmt
*
pMgmt
,
SBlockItem
*
pItem
)
{}
static
int32_t
mmOpen
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
pMgmt
->
multiProcess
=
tru
e
;
pMgmt
->
singleProc
=
fals
e
;
int32_t
code
=
mmOpenImp
(
pDnode
,
pOption
);
if
(
code
==
0
&&
pMgmt
->
multiProcess
)
{
if
(
code
==
0
&&
!
pMgmt
->
singleProc
)
{
SProcCfg
cfg
=
{
0
};
cfg
.
childFp
=
(
ProcFp
)
dndMnode
ProcessChildQueue
;
cfg
.
childFp
=
(
ProcFp
)
mm
ProcessChildQueue
;
cfg
.
parentFp
=
(
ProcFp
)
dndMnodeProcessParentQueue
;
cfg
.
childQueueSize
=
1024
*
1024
;
cfg
.
parentQueueSize
=
1024
*
1024
;
...
...
@@ -400,7 +271,7 @@ static int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) {
static
int32_t
dndAlterMnode
(
SDnode
*
pDnode
,
SMnodeOpt
*
pOption
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
{
dError
(
"failed to alter mnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -408,18 +279,18 @@ static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption) {
if
(
mndAlter
(
pMnode
,
pOption
)
!=
0
)
{
dError
(
"failed to alter mnode since %s"
,
terrstr
());
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
return
-
1
;
}
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
return
0
;
}
static
int32_t
dndDropMnode
(
SDnode
*
pDnode
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
{
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -434,12 +305,12 @@ static int32_t dndDropMnode(SDnode *pDnode) {
pMgmt
->
dropped
=
0
;
taosRUnLockLatch
(
&
pMgmt
->
latch
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
dError
(
"failed to drop mnode since %s"
,
terrstr
());
return
-
1
;
}
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
mmStopWorker
(
pDnode
);
pMgmt
->
deployed
=
0
;
mmWriteFile
(
pDnode
);
...
...
@@ -470,9 +341,9 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return
-
1
;
}
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
terrno
=
TSDB_CODE_DND_MNODE_ALREADY_DEPLOYED
;
dError
(
"failed to create mnode since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -502,7 +373,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return
-
1
;
}
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to alter mnode since %s"
,
terrstr
());
...
...
@@ -511,7 +382,7 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug
(
"start to alter mnode"
);
int32_t
code
=
dndAlterMnode
(
pDnode
,
&
option
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
return
code
;
}
...
...
@@ -529,7 +400,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return
-
1
;
}
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
dError
(
"failed to drop mnode since %s"
,
terrstr
());
...
...
@@ -538,7 +409,7 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
dDebug
(
"start to drop mnode"
);
int32_t
code
=
dndDropMnode
(
pDnode
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
return
code
;
}
...
...
@@ -546,66 +417,15 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
static
void
dndProcessMnodeQueue
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMsg
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
mndProcessMsg
(
pMsg
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
}
else
{
mndSendRsp
(
pMsg
,
terrno
);
}
mndCleanupMsg
(
pMsg
);
}
static
void
dndWriteMnodeMsgToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SRpcMsg
*
pRpcMsg
)
{
int32_t
code
=
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
;
SMnode
*
pMnode
=
dndAcquireMnode
(
pDnode
);
if
(
pMnode
!=
NULL
)
{
SMnodeMsg
*
pMsg
=
mndInitMsg
(
pMnode
,
pRpcMsg
);
if
(
pMsg
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
}
else
{
code
=
dndWriteMsgToWorker
(
pWorker
,
pMsg
,
0
);
if
(
code
!=
0
)
code
=
terrno
;
}
if
(
code
!=
0
)
{
mndCleanupMsg
(
pMsg
);
}
}
dndReleaseMnode
(
pDnode
,
pMnode
);
if
(
code
!=
0
)
{
if
(
pRpcMsg
->
msgType
&
1u
)
{
if
(
code
==
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
||
code
==
TSDB_CODE_APP_NOT_READY
)
{
dndSendRedirectRsp
(
pDnode
,
pRpcMsg
);
}
else
{
SRpcMsg
rsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
ahandle
=
pRpcMsg
->
ahandle
,
.
code
=
code
};
rpcSendResponse
(
&
rsp
);
}
}
rpcFreeCont
(
pRpcMsg
->
pCont
);
}
}
static
void
dndMnodeWriteToChildQueue
(
SMnodeMgmt
*
pMgmt
,
SRpcMsg
*
pMsg
)
{
taosProcPushChild
(
pMgmt
->
pProcess
,
pMsg
,
sizeof
(
SRpcMsg
));
}
void
dndProcessMnodeWriteMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dndMnodeWriteToChildQueue
(
&
pDnode
->
mmgmt
,
pMsg
);
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg);
}
void
dndProcessMnodeSyncMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dndMnodeWriteToChildQueue
(
&
pDnode
->
mmgmt
,
pMsg
);
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg);
}
void
dndProcessMnodeReadMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
)
{
dndMnodeWriteToChildQueue
(
&
pDnode
->
mmgmt
,
pMsg
);
// dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg);
// mndCleanupMsg(pMsg);
}
int32_t
mmInit
(
SDnode
*
pDnode
)
{
...
...
@@ -614,6 +434,7 @@ int32_t mmInit(SDnode *pDnode) {
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
taosInitRWLatch
(
&
pMgmt
->
latch
);
mmInitMsgFp
(
pMgmt
);
if
(
mmReadFile
(
pDnode
)
!=
0
)
{
goto
_OVER
;
...
...
@@ -670,7 +491,7 @@ void mmCleanup(SDnode *pDnode) {
int32_t
dndGetUserAuthFromMnode
(
SDnode
*
pDnode
,
char
*
user
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
{
terrno
=
TSDB_CODE_APP_NOT_READY
;
dTrace
(
"failed to get user auth since %s"
,
terrstr
());
...
...
@@ -678,18 +499,18 @@ int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *enc
}
int32_t
code
=
mndRetriveAuth
(
pMnode
,
user
,
spi
,
encrypt
,
secret
,
ckey
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
dTrace
(
"user:%s, retrieve auth spi:%d encrypt:%d"
,
user
,
*
spi
,
*
encrypt
);
return
code
;
}
int32_t
mmGetMonitorInfo
(
SDnode
*
pDnode
,
SMonClusterInfo
*
pClusterInfo
,
SMonVgroupInfo
*
pVgroupInfo
,
SMonGrantInfo
*
pGrantInfo
)
{
SMnode
*
pMnode
=
dndAcquireMnod
e
(
pDnode
);
SMonGrantInfo
*
pGrantInfo
)
{
SMnode
*
pMnode
=
mmAcquir
e
(
pDnode
);
if
(
pMnode
==
NULL
)
return
-
1
;
int32_t
code
=
mndGetMonitorInfo
(
pMnode
,
pClusterInfo
,
pVgroupInfo
,
pGrantInfo
);
dndReleaseMnod
e
(
pDnode
,
pMnode
);
mmReleas
e
(
pDnode
,
pMnode
);
return
code
;
}
source/dnode/mgmt/impl/mnodeMgmt/src/mmMsg.c
浏览文件 @
c6e6ac21
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "mm.h"
#include "dndMgmt.h"
#include "dndTransport.h"
#include "dndWorker.h"
static
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
static
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
);
void
mmInitMsgFp
(
SMnodeMgmt
*
pMgmt
)
{
// Requests handled by DNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_MNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_MNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_MNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_QNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_QNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_SNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_SNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_BNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_BNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_VNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_VNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_VNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_SYNC_VNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_COMPACT_VNODE_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CONFIG_DNODE_RSP
)]
=
mmProcessWriteMsg
;
// Requests handled by MNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_ACCT
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_ACCT
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_ACCT
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_USER
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_USER
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_USER
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONFIG_DNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_MNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_MNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_QNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_QNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_SNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_SNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_BNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_BNODE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SYNC_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_COMPACT_DB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_FUNC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_FUNC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_STB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_STB
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_VGROUP_LIST
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_QUERY
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_CONN
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_TRANS
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GRANT
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_AUTH
)]
=
mmProcessReadMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_TOPIC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_TOPIC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_TOPIC
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SUBSCRIBE
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_MQ_COMMIT_OFFSET
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_SUB_EP
)]
=
mmProcessReadMsg
;
// Requests handled by VNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_REB_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CREATE_STB_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_ALTER_STB_RSP
)]
=
mmProcessWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_STB_RSP
)]
=
mmProcessWriteMsg
;
}
void
mmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
,
SEpSet
*
pEpSet
)
{
SMnodeMgmt
*
pMgmt
=
&
pDnode
->
mmgmt
;
int32_t
code
=
-
1
;
SMnodeMsg
*
pMnodeMsg
=
NULL
;
MndMsgFp
msgFp
=
pMgmt
->
msgFp
[
TMSG_INDEX
(
pRpcMsg
->
msgType
)];
if
(
msgFp
==
NULL
)
{
terrno
=
TSDB_CODE_MSG_NOT_PROCESSED
;
goto
_OVER
;
}
int32_t
contLen
=
sizeof
(
SMnodeMsg
)
+
pRpcMsg
->
contLen
;
pMnodeMsg
=
taosAllocateQitem
(
contLen
);
if
(
pMnodeMsg
==
NULL
)
{
goto
_OVER
;
}
if
(
mndBuildMsg
(
pMnodeMsg
,
pRpcMsg
)
!=
0
)
{
goto
_OVER
;
}
if
(
pMgmt
->
singleProc
)
{
code
=
(
*
msgFp
)(
pDnode
,
pMnodeMsg
);
}
else
{
code
=
taosProcPushChild
(
pMgmt
->
pProcess
,
pMnodeMsg
,
contLen
);
}
_OVER:
if
(
code
==
0
)
{
if
(
!
pMgmt
->
singleProc
)
{
taosFreeQitem
(
pMnodeMsg
);
}
}
else
{
bool
isReq
=
(
pRpcMsg
->
msgType
&
1U
);
if
(
isReq
)
{
if
(
terrno
==
TSDB_CODE_DND_MNODE_NOT_DEPLOYED
||
terrno
==
TSDB_CODE_APP_NOT_READY
)
{
dndSendRedirectRsp
(
pDnode
,
pRpcMsg
);
}
else
{
SRpcMsg
rsp
=
{.
handle
=
pRpcMsg
->
handle
,
.
ahandle
=
pRpcMsg
->
ahandle
,
.
code
=
terrno
};
rpcSendResponse
(
&
rsp
);
}
}
taosFreeQitem
(
pMnodeMsg
);
}
rpcFreeCont
(
pRpcMsg
->
pCont
);
}
int32_t
mmProcessWriteMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
writeWorker
,
pMnodeMsg
);
}
int32_t
mmProcessSyncMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
syncWorker
,
pMnodeMsg
);
}
int32_t
mmProcessReadMsg
(
SDnode
*
pDnode
,
SMnodeMsg
*
pMnodeMsg
)
{
return
mmWriteToWorker
(
pDnode
,
&
pDnode
->
mmgmt
.
readWorker
,
pMnodeMsg
);
}
int32_t
mmWriteToWorker
(
SDnode
*
pDnode
,
SDnodeWorker
*
pWorker
,
SMnodeMsg
*
pMnodeMsg
)
{
SMnode
*
pMnode
=
mmAcquire
(
pDnode
);
if
(
pMnode
==
NULL
)
return
-
1
;
pMnodeMsg
->
pMnode
=
pMnode
;
int32_t
code
=
dndWriteMsgToWorker
(
pWorker
,
pMnodeMsg
,
0
);
mmRelease
(
pDnode
,
pMnode
);
return
code
;
}
source/dnode/mgmt/impl/src/dndTransport.c
浏览文件 @
c6e6ac21
...
...
@@ -32,91 +32,91 @@
static
void
dndInitMsgFp
(
STransMgmt
*
pMgmt
)
{
// Requests handled by DNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_MNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_MNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_MNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_MNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_MNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_MNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_MNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_MNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_MNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_QNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_QNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_QNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_QNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_QNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_QNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_SNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_SNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_SNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_SNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_SNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_SNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_BNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_BNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_BNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_BNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_BNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_BNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_VNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_VNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CREATE_VNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_VNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_VNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_ALTER_VNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_VNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_VNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_DROP_VNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_SYNC_VNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_SYNC_VNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_SYNC_VNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_COMPACT_VNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_COMPACT_VNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_COMPACT_VNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CONFIG_DNODE
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CONFIG_DNODE_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_CONFIG_DNODE_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_DND_NETWORK_TEST
)]
=
dndProcessMgmtMsg
;
// Requests handled by MNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_ACCT
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_ACCT
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_ACCT
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_USER
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_USER
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_USER
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONFIG_DNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_MNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_MNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_QNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_QNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_SNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_SNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_BNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_BNODE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SYNC_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_COMPACT_DB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_FUNC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_FUNC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_STB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_STB
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_VGROUP_LIST
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_QUERY
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_CONN
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONNECT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_ACCT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_ACCT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_ACCT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_USER
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_USER
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_USER
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_USER_AUTH
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CONFIG_DNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_MNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_MNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_QNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_QNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_SNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_SNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_BNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_BNODE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_USE_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SYNC_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_COMPACT_DB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_FUNC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_RETRIEVE_FUNC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_FUNC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_STB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_STB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_STB
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_TABLE_META
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_VGROUP_LIST
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_QUERY
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_CONN
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_HEARTBEAT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SHOW_RETRIEVE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_STATUS_RSP
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_TRANS
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GRANT
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_KILL_TRANS
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GRANT
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GRANT_RSP
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_AUTH
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_AUTH
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_AUTH_RSP
)]
=
dndProcessMgmtMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_TOPIC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_TOPIC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_TOPIC
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SUBSCRIBE
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_MQ_COMMIT_OFFSET
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_REB_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_SUB_EP
)]
=
dndProcessMnodeRead
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_CREATE_TOPIC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_ALTER_TOPIC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_DROP_TOPIC
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_SUBSCRIBE
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_MQ_COMMIT_OFFSET
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_SET_CONN_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_MQ_REB_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_MND_GET_SUB_EP
)]
=
mmProcessRpc
Msg
;
// Requests handled by VNODE
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_SUBMIT
)]
=
dndProcessVnodeWriteMsg
;
...
...
@@ -138,11 +138,11 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CANCEL_TASK
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_TASK
)]
=
dndProcessVnodeFetchMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CREATE_STB
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CREATE_STB_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CREATE_STB_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_ALTER_STB
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_ALTER_STB_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_ALTER_STB_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_STB
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_STB_RSP
)]
=
dndProcessMnodeWrite
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_STB_RSP
)]
=
mmProcessRpc
Msg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_CREATE_TABLE
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_ALTER_TABLE
)]
=
dndProcessVnodeWriteMsg
;
pMgmt
->
msgFp
[
TMSG_INDEX
(
TDMT_VND_DROP_TABLE
)]
=
dndProcessVnodeWriteMsg
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
c6e6ac21
...
...
@@ -664,16 +664,6 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
typedef
struct
{
}
SStreamScheduler
;
typedef
struct
SMnodeMsg
{
char
user
[
TSDB_USER_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int32_t
acctId
;
SMnode
*
pMnode
;
int64_t
createdTime
;
SRpcMsg
rpcMsg
;
int32_t
contLen
;
void
*
pCont
;
}
SMnodeMsg
;
#ifdef __cplusplus
}
...
...
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
c6e6ac21
...
...
@@ -390,41 +390,26 @@ void mndDestroy(const char *path) {
mDebug
(
"mnode is destroyed"
);
}
SMnodeMsg
*
mndInitMsg
(
SMnode
*
pMnode
,
SRpcMsg
*
pRpcMsg
)
{
SMnodeMsg
*
pMsg
=
taosAllocateQitem
(
sizeof
(
SMnodeMsg
));
if
(
pMsg
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
mError
(
"failed to create msg since %s, app:%p RPC:%p"
,
terrstr
(),
pRpcMsg
->
ahandle
,
pRpcMsg
->
handle
);
return
NULL
;
}
int32_t
mndBuildMsg
(
SMnodeMsg
*
pMnodeMsg
,
SRpcMsg
*
pRpcMsg
)
{
if
(
pRpcMsg
->
msgType
!=
TDMT_MND_TRANS_TIMER
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_TIMER
&&
pRpcMsg
->
msgType
!=
TDMT_MND_MQ_DO_REBALANCE
&&
pRpcMsg
->
msgType
!=
TDMT_MND_TELEM_TIMER
)
{
SRpcConnInfo
connInfo
=
{
0
};
if
((
pRpcMsg
->
msgType
&
1U
)
&&
rpcGetConnInfo
(
pRpcMsg
->
handle
,
&
connInfo
)
!=
0
)
{
taosFreeQitem
(
pMsg
);
terrno
=
TSDB_CODE_MND_NO_USER_FROM_CONN
;
mError
(
"failed to create msg since %s, app:%p RPC:%p"
,
terrstr
(),
pRpcMsg
->
ahandle
,
pRpcMsg
->
handle
);
return
NULL
;
return
-
1
;
}
memcpy
(
pMsg
->
user
,
connInfo
.
user
,
TSDB_USER_LEN
);
memcpy
(
pM
nodeM
sg
->
user
,
connInfo
.
user
,
TSDB_USER_LEN
);
}
pM
sg
->
pMnode
=
pMnode
;
pM
sg
->
rpcMsg
=
*
pRpcMsg
;
pM
sg
->
createdTime
=
taosGetTimestampSec
(
);
pM
nodeMsg
->
rpcMsg
=
*
pRpcMsg
;
pM
nodeMsg
->
createdTime
=
taosGetTimestampSec
()
;
pM
nodeMsg
->
pCont
=
(
char
*
)
pMnodeMsg
+
sizeof
(
pMnodeMsg
);
if
(
pRpcMsg
!=
NULL
)
{
mTrace
(
"msg:%p, is created, app:%p RPC:%p user:%s"
,
pM
sg
,
pRpcMsg
->
ahandle
,
pRpcMsg
->
handle
,
p
Msg
->
user
);
mTrace
(
"msg:%p, is created, app:%p RPC:%p user:%s"
,
pM
nodeMsg
,
pRpcMsg
->
ahandle
,
pRpcMsg
->
handle
,
pMnode
Msg
->
user
);
}
return
pMsg
;
}
void
mndCleanupMsg
(
SMnodeMsg
*
pMsg
)
{
mTrace
(
"msg:%p, is destroyed"
,
pMsg
);
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
pMsg
->
rpcMsg
.
pCont
=
NULL
;
taosFreeQitem
(
pMsg
);
return
0
;
}
void
mndSendRsp
(
SMnodeMsg
*
pMsg
,
int32_t
code
)
{
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录