Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
326459ff
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
326459ff
编写于
3月 18, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
shm
上级
41880bac
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
259 addition
and
425 deletion
+259
-425
include/dnode/qnode/qnode.h
include/dnode/qnode/qnode.h
+0
-1
source/dnode/mgmt/bnode/inc/bmInt.h
source/dnode/mgmt/bnode/inc/bmInt.h
+0
-1
source/dnode/mgmt/qnode/inc/qm.h
source/dnode/mgmt/qnode/inc/qm.h
+31
-0
source/dnode/mgmt/qnode/inc/qmInt.h
source/dnode/mgmt/qnode/inc/qmInt.h
+19
-39
source/dnode/mgmt/qnode/src/qmFile.c
source/dnode/mgmt/qnode/src/qmFile.c
+0
-0
source/dnode/mgmt/qnode/src/qmInt.c
source/dnode/mgmt/qnode/src/qmInt.c
+100
-3
source/dnode/mgmt/qnode/src/qmMgmt.c
source/dnode/mgmt/qnode/src/qmMgmt.c
+0
-378
source/dnode/mgmt/qnode/src/qmMsg.c
source/dnode/mgmt/qnode/src/qmMsg.c
+36
-3
source/dnode/mgmt/qnode/src/qmWorker.c
source/dnode/mgmt/qnode/src/qmWorker.c
+73
-0
未找到文件。
include/dnode/qnode/qnode.h
浏览文件 @
326459ff
...
...
@@ -36,7 +36,6 @@ typedef struct {
}
SQnodeLoad
;
typedef
struct
{
int32_t
sver
;
int32_t
dnodeId
;
int64_t
clusterId
;
SMgmtWrapper
*
pWrapper
;
...
...
source/dnode/mgmt/bnode/inc/bmInt.h
浏览文件 @
326459ff
...
...
@@ -17,7 +17,6 @@
#define _TD_DND_BNODE_INT_H_
#include "bm.h"
#include "dm.h"
#ifdef __cplusplus
extern
"C"
{
...
...
source/dnode/mgmt/qnode/inc/qm.h
0 → 100644
浏览文件 @
326459ff
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_DND_QNODE_H_
#define _TD_DND_QNODE_H_
#include "dnd.h"
#ifdef __cplusplus
extern
"C"
{
#endif
void
qmGetMgmtFp
(
SMgmtWrapper
*
pMgmt
);
#ifdef __cplusplus
}
#endif
#endif
/*_TD_DND_QNODE_H_*/
\ No newline at end of file
source/dnode/mgmt/qnode/inc/qmInt.h
浏览文件 @
326459ff
...
...
@@ -16,55 +16,35 @@
#ifndef _TD_DND_QNODE_INT_H_
#define _TD_DND_QNODE_INT_H_
#include "
dnd
.h"
#include "
qm
.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
SQnodeMgmt
{
int32_t
refCount
;
int8_t
deployed
;
int8_t
dropped
;
SQnode
*
pQnode
;
SRWLatch
latch
;
SDnodeWorker
queryWorker
;
SDnodeWorker
fetchWorker
;
//
SProcObj
*
pProcess
;
bool
singleProc
;
SQnode
*
pQnode
;
SDnode
*
pDnode
;
SMgmtWrapper
*
pWrapper
;
const
char
*
path
;
SDnodeWorker
queryWorker
;
SDnodeWorker
fetchWorker
;
}
SQnodeMgmt
;
void
qmGetMgmtFp
(
SMgmtWrapper
*
pMgmt
);
int32_t
dndInitQnode
(
SDnode
*
pDnode
);
void
dndCleanupQnode
(
SDnode
*
pDnode
);
void
dndProcessQnodeQueryMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
void
dndProcessQnodeFetchMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
// qmHandle.h
int32_t
qmProcessCreateReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
qmProcessDropReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
// qmInt.c
int32_t
qmOpen
(
SMgmtWrapper
*
pWrapper
);
int32_t
qmDrop
(
SMgmtWrapper
*
pWrapper
);
// qmMsg.c
void
qmInitMsgHandles
(
SMgmtWrapper
*
pWrapper
);
int32_t
qmProcessCreateReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
qmProcessDropReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
qmStartWorker
(
SDnode
*
pDnode
);
void
qmStopWorker
(
SDnode
*
pDnode
);
void
qmInitMsgFp
(
SMnodeMgmt
*
pMgmt
);
void
qmProcessRpcMsg
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
int32_t
qmPutMsgToWriteQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
int32_t
qmPutMsgToReadQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pRpcMsg
);
void
qmConsumeChildQueue
(
SDnode
*
pDnode
,
SNodeMsg
*
pMsg
,
int32_t
msgLen
,
void
*
pCont
,
int32_t
contLen
);
void
qmConsumeParentQueue
(
SDnode
*
pDnode
,
SRpcMsg
*
pMsg
,
int32_t
msgLen
,
void
*
pCont
,
int32_t
contLen
);
void
qmProcessWriteMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
void
qmProcessSyncMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
void
qmProcessReadMsg
(
SDnode
*
pDnode
,
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
int32_t
qmProcessCreateReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
int32_t
qmProcessDropReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
);
// qmWorker.c
int32_t
qmStartWorker
(
SQnodeMgmt
*
pMgmt
);
void
qmStopWorker
(
SQnodeMgmt
*
pMgmt
);
int32_t
qmProcessQueryMsg
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
int32_t
qmProcessFetchMsg
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/dnode/mgmt/qnode/src/qmFile.c
已删除
100644 → 0
浏览文件 @
41880bac
source/dnode/mgmt/qnode/src/qmInt.c
浏览文件 @
326459ff
...
...
@@ -18,13 +18,110 @@
static
int32_t
qmRequire
(
SMgmtWrapper
*
pWrapper
,
bool
*
required
)
{
return
dndReadFile
(
pWrapper
,
required
);
}
static
void
qmInitOption
(
SQnodeMgmt
*
pMgmt
,
SQnodeOpt
*
pOption
)
{
SDnode
*
pDnode
=
pMgmt
->
pDnode
;
pOption
->
pWrapper
=
pMgmt
->
pWrapper
;
pOption
->
sendReqFp
=
dndSendReqToDnode
;
pOption
->
sendMnodeReqFp
=
dndSendReqToMnode
;
pOption
->
dnodeId
=
pDnode
->
dnodeId
;
pOption
->
clusterId
=
pDnode
->
clusterId
;
}
static
int32_t
qmOpenImp
(
SQnodeMgmt
*
pMgmt
)
{
SQnodeOpt
option
=
{
0
};
qmInitOption
(
pMgmt
,
&
option
);
pMgmt
->
pQnode
=
qndOpen
(
&
option
);
if
(
pMgmt
->
pQnode
==
NULL
)
{
dError
(
"failed to open qnode since %s"
,
terrstr
());
return
-
1
;
}
if
(
qmStartWorker
(
pMgmt
)
!=
0
)
{
dError
(
"failed to start qnode worker since %s"
,
terrstr
());
return
-
1
;
}
bool
deployed
=
true
;
if
(
dndWriteFile
(
pMgmt
->
pWrapper
,
deployed
)
!=
0
)
{
dError
(
"failed to write qnode file since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
static
void
qmCloseImp
(
SQnodeMgmt
*
pMgmt
)
{
if
(
pMgmt
->
pQnode
!=
NULL
)
{
qmStopWorker
(
pMgmt
);
qndClose
(
pMgmt
->
pQnode
);
pMgmt
->
pQnode
=
NULL
;
}
}
int32_t
qmDrop
(
SMgmtWrapper
*
pWrapper
)
{
SQnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
0
;
dInfo
(
"qnode-mgmt start to drop"
);
bool
deployed
=
false
;
if
(
dndWriteFile
(
pWrapper
,
deployed
)
!=
0
)
{
dError
(
"failed to drop qnode since %s"
,
terrstr
());
return
-
1
;
}
qmCloseImp
(
pMgmt
);
taosRemoveDir
(
pMgmt
->
path
);
pWrapper
->
pMgmt
=
NULL
;
free
(
pMgmt
);
dInfo
(
"qnode-mgmt is dropped"
);
return
0
;
}
static
void
qmClose
(
SMgmtWrapper
*
pWrapper
)
{
SQnodeMgmt
*
pMgmt
=
pWrapper
->
pMgmt
;
if
(
pMgmt
==
NULL
)
return
;
dInfo
(
"qnode-mgmt start to cleanup"
);
qmCloseImp
(
pMgmt
);
pWrapper
->
pMgmt
=
NULL
;
free
(
pMgmt
);
dInfo
(
"qnode-mgmt is cleaned up"
);
}
int32_t
qmOpen
(
SMgmtWrapper
*
pWrapper
)
{
dInfo
(
"qnode-mgmt start to init"
);
SQnodeMgmt
*
pMgmt
=
calloc
(
1
,
sizeof
(
SQnodeMgmt
));
if
(
pMgmt
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pMgmt
->
path
=
pWrapper
->
path
;
pMgmt
->
pDnode
=
pWrapper
->
pDnode
;
pMgmt
->
pWrapper
=
pWrapper
;
pWrapper
->
pMgmt
=
pMgmt
;
int32_t
code
=
qmOpenImp
(
pMgmt
);
if
(
code
!=
0
)
{
dError
(
"failed to init qnode-mgmt since %s"
,
terrstr
());
qmClose
(
pWrapper
);
}
else
{
dInfo
(
"qnode-mgmt is initialized"
);
}
return
code
;
}
void
qmGetMgmtFp
(
SMgmtWrapper
*
pWrapper
)
{
SMgmtFp
mgmtFp
=
{
0
};
mgmtFp
.
openFp
=
NULL
;
mgmtFp
.
closeFp
=
NULL
;
mgmtFp
.
openFp
=
qmOpen
;
mgmtFp
.
closeFp
=
qmClose
;
mgmtFp
.
createMsgFp
=
qmProcessCreateReq
;
mgmtFp
.
dropMsgFp
=
qmProcessDropReq
;
mgmtFp
.
requiredFp
=
qmRequire
;
//
qmInitMsgHandles(pWrapper);
qmInitMsgHandles
(
pWrapper
);
pWrapper
->
name
=
"qnode"
;
pWrapper
->
fp
=
mgmtFp
;
}
source/dnode/mgmt/qnode/src/qmMgmt.c
已删除
100644 → 0
浏览文件 @
41880bac
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
// #include "dndQnode.h"
// #include "dm.h"
// #include "dndTransport.h"
// #include "dndWorker.h"
#if 0
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg);
static SQnode *dndAcquireQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pQnode != NULL) {
refCount = atomic_add_fetch_32(&pMgmt->refCount, 1);
pQnode = pMgmt->pQnode;
} else {
terrno = TSDB_CODE_NODE_NOT_DEPLOYED;
}
taosRUnLockLatch(&pMgmt->latch);
if (pQnode != NULL) {
dTrace("acquire qnode, refCount:%d", refCount);
}
return pQnode;
}
static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) {
if (pQnode == NULL) return;
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosRLockLatch(&pMgmt->latch);
int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1);
taosRUnLockLatch(&pMgmt->latch);
dTrace("release qnode, refCount:%d", refCount);
}
static int32_t dndReadQnodeFile(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
// FILE *fp = fopen(file, "r");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ);
if (pFile == NULL) {
dDebug("file %s not exist", file);
code = 0;
goto PRASE_QNODE_OVER;
}
len = (int32_t)taosReadFile(pFile, content, maxLen);
if (len <= 0) {
dError("failed to read %s since content is null", file);
goto PRASE_QNODE_OVER;
}
content[len] = 0;
root = cJSON_Parse(content);
if (root == NULL) {
dError("failed to read %s since invalid json format", file);
goto PRASE_QNODE_OVER;
}
cJSON *deployed = cJSON_GetObjectItem(root, "deployed");
if (!deployed || deployed->type != cJSON_Number) {
dError("failed to read %s since deployed not found", file);
goto PRASE_QNODE_OVER;
}
pMgmt->deployed = deployed->valueint;
cJSON *dropped = cJSON_GetObjectItem(root, "dropped");
if (!dropped || dropped->type != cJSON_Number) {
dError("failed to read %s since dropped not found", file);
goto PRASE_QNODE_OVER;
}
pMgmt->dropped = dropped->valueint;
code = 0;
dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped);
PRASE_QNODE_OVER:
if (content != NULL) free(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
terrno = code;
return code;
}
static int32_t dndWriteQnodeFile(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
char file[PATH_MAX + 20];
snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
// FILE *fp = fopen(file, "w");
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR;
dError("failed to write %s since %s", file, terrstr());
return -1;
}
int32_t len = 0;
int32_t maxLen = 1024;
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed);
len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped);
len += snprintf(content + len, maxLen - len, "}\n");
taosWriteFile(pFile, content, len);
taosFsyncFile(pFile);
taosCloseFile(&pFile);
free(content);
char realfile[PATH_MAX + 20];
snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode);
if (taosRenameFile(file, realfile) != 0) {
terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR;
dError("failed to rename %s since %s", file, terrstr());
return -1;
}
dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped);
return 0;
}
static int32_t dndStartQnodeWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) {
dError("failed to start qnode query worker since %s", terrstr());
return -1;
}
if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) {
dError("failed to start qnode fetch worker since %s", terrstr());
return -1;
}
return 0;
}
static void dndStopQnodeWorker(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosWLockLatch(&pMgmt->latch);
pMgmt->deployed = 0;
taosWUnLockLatch(&pMgmt->latch);
while (pMgmt->refCount > 0) {
taosMsleep(10);
}
dndCleanupWorker(&pMgmt->queryWorker);
dndCleanupWorker(&pMgmt->fetchWorker);
}
static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) {
pOption->pDnode = pDnode;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRedirectRspFp = dndSendRedirectRsp;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
pOption->sver = tsVersion;
}
static int32_t dndOpenQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
dndReleaseQnode(pDnode, pQnode);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
dError("failed to create qnode since %s", terrstr());
return -1;
}
SQnodeOpt option = {0};
dndBuildQnodeOption(pDnode, &option);
pQnode = qndOpen(&option);
if (pQnode == NULL) {
dError("failed to open qnode since %s", terrstr());
return -1;
}
if (dndStartQnodeWorker(pDnode) != 0) {
dError("failed to start qnode worker since %s", terrstr());
qndClose(pQnode);
return -1;
}
pMgmt->deployed = 1;
if (dndWriteQnodeFile(pDnode) != 0) {
pMgmt->deployed = 0;
dError("failed to write qnode file since %s", terrstr());
dndStopQnodeWorker(pDnode);
qndClose(pQnode);
return -1;
}
taosWLockLatch(&pMgmt->latch);
pMgmt->pQnode = pQnode;
taosWUnLockLatch(&pMgmt->latch);
dInfo("qnode open successfully");
return 0;
}
static int32_t dndDropQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode == NULL) {
dError("failed to drop qnode since %s", terrstr());
return -1;
}
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 1;
taosRUnLockLatch(&pMgmt->latch);
if (dndWriteQnodeFile(pDnode) != 0) {
taosRLockLatch(&pMgmt->latch);
pMgmt->dropped = 0;
taosRUnLockLatch(&pMgmt->latch);
dndReleaseQnode(pDnode, pQnode);
dError("failed to drop qnode since %s", terrstr());
return -1;
}
dndReleaseQnode(pDnode, pQnode);
dndStopQnodeWorker(pDnode);
pMgmt->deployed = 0;
dndWriteQnodeFile(pDnode);
qndClose(pQnode);
pMgmt->pQnode = NULL;
return 0;
}
int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateQnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (createReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
} else {
return dndOpenQnode(pDnode);
}
}
int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropQnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (dropReq.dnodeId != pDnode->dnodeId) {
terrno = TSDB_CODE_NODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
} else {
return dndDropQnode(pDnode);
}
}
static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
SRpcMsg *pRsp = NULL;
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
code = qndProcessMsg(pQnode, pMsg, &pRsp);
}
dndReleaseQnode(pDnode, pQnode);
if (pMsg->msgType & 1u) {
if (pRsp != NULL) {
pRsp->ahandle = pMsg->ahandle;
rpcSendResponse(pRsp);
free(pRsp);
} else {
if (code != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rpcRsp);
}
}
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED;
SQnode *pQnode = dndAcquireQnode(pDnode);
if (pQnode != NULL) {
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseQnode(pDnode, pQnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
}
void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg);
}
int32_t dndInitQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
taosInitRWLatch(&pMgmt->latch);
if (dndReadQnodeFile(pDnode) != 0) {
return -1;
}
if (pMgmt->dropped) return 0;
if (!pMgmt->deployed) return 0;
return dndOpenQnode(pDnode);
}
void dndCleanupQnode(SDnode *pDnode) {
SQnodeMgmt *pMgmt = &pDnode->qmgmt;
if (pMgmt->pQnode) {
dndStopQnodeWorker(pDnode);
qndClose(pMgmt->pQnode);
pMgmt->pQnode = NULL;
}
}
#endif
\ No newline at end of file
source/dnode/mgmt/qnode/src/qmMsg.c
浏览文件 @
326459ff
...
...
@@ -16,9 +16,42 @@
#define _DEFAULT_SOURCE
#include "qmInt.h"
int32_t
qmProcessCreateReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
return
0
;}
int32_t
qmProcessDropReq
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
){
return
0
;}
int32_t
qmProcessCreateReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
void
qmInitMsgHandles
(
SMgmtWrapper
*
pWrapper
)
{
SDCreateQnodeReq
createReq
=
{
0
};
if
(
tDeserializeSMCreateDropQSBNodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
createReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
createReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
terrno
=
TSDB_CODE_NODE_INVALID_OPTION
;
dError
(
"failed to create qnode since %s, input:%d cur:%d"
,
terrstr
(),
createReq
.
dnodeId
,
pDnode
->
dnodeId
);
return
-
1
;
}
else
{
return
qmOpen
(
pWrapper
);
}
}
int32_t
qmProcessDropReq
(
SMgmtWrapper
*
pWrapper
,
SNodeMsg
*
pMsg
)
{
SDnode
*
pDnode
=
pWrapper
->
pDnode
;
SRpcMsg
*
pReq
=
&
pMsg
->
rpcMsg
;
SDDropQnodeReq
dropReq
=
{
0
};
if
(
tDeserializeSMCreateDropQSBNodeReq
(
pReq
->
pCont
,
pReq
->
contLen
,
&
dropReq
)
!=
0
)
{
terrno
=
TSDB_CODE_INVALID_MSG
;
return
-
1
;
}
if
(
dropReq
.
dnodeId
!=
pDnode
->
dnodeId
)
{
terrno
=
TSDB_CODE_NODE_INVALID_OPTION
;
dError
(
"failed to drop qnode since %s"
,
terrstr
());
return
-
1
;
}
else
{
return
qmDrop
(
pWrapper
);
}
}
void
qmInitMsgHandles
(
SMgmtWrapper
*
pWrapper
)
{}
source/dnode/mgmt/qnode/src/qmWorker.c
浏览文件 @
326459ff
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http:www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "qmInt.h"
static
void
qmProcessQueue
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
dTrace
(
"msg:%p, will be processed in qnode queue"
,
pMsg
);
SRpcMsg
*
pRsp
=
NULL
;
SRpcMsg
*
pRpc
=
&
pMsg
->
rpcMsg
;
int32_t
code
=
qndProcessMsg
(
pMgmt
->
pQnode
,
pRpc
,
&
pRsp
);
if
(
pRpc
->
msgType
&
1u
)
{
if
(
pRsp
!=
NULL
)
{
pRsp
->
ahandle
=
pRpc
->
ahandle
;
dndSendRsp
(
pMgmt
->
pWrapper
,
pRsp
);
free
(
pRsp
);
}
else
{
if
(
code
!=
0
)
code
=
terrno
;
SRpcMsg
rpcRsp
=
{.
handle
=
pRpc
->
handle
,
.
ahandle
=
pRpc
->
ahandle
,
.
code
=
code
};
dndSendRsp
(
pMgmt
->
pWrapper
,
&
rpcRsp
);
}
}
dTrace
(
"msg:%p, is freed"
,
pMsg
);
rpcFreeCont
(
pRpc
->
pCont
);
taosFreeQitem
(
pMsg
);
}
int32_t
qmProcessQueryMsg
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SDnodeWorker
*
pWorker
=
&
pMgmt
->
queryWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
return
dndWriteMsgToWorker
(
pWorker
,
pMsg
,
0
);
}
int32_t
qmProcessFetchMsg
(
SQnodeMgmt
*
pMgmt
,
SNodeMsg
*
pMsg
)
{
SDnodeWorker
*
pWorker
=
&
pMgmt
->
fetchWorker
;
dTrace
(
"msg:%p, put into worker:%s"
,
pMsg
,
pWorker
->
name
);
return
dndWriteMsgToWorker
(
pWorker
,
pMsg
,
0
);
}
int32_t
qmStartWorker
(
SQnodeMgmt
*
pMgmt
)
{
if
(
dndInitWorker
(
pMgmt
,
&
pMgmt
->
queryWorker
,
DND_WORKER_SINGLE
,
"qnode-query"
,
0
,
1
,
qmProcessQueue
)
!=
0
)
{
dError
(
"failed to start qnode query worker since %s"
,
terrstr
());
return
-
1
;
}
if
(
dndInitWorker
(
pMgmt
,
&
pMgmt
->
fetchWorker
,
DND_WORKER_SINGLE
,
"qnode-fetch"
,
0
,
1
,
qmProcessQueue
)
!=
0
)
{
dError
(
"failed to start qnode fetch worker since %s"
,
terrstr
());
return
-
1
;
}
return
0
;
}
void
qmStopWorker
(
SQnodeMgmt
*
pMgmt
)
{
dndCleanupWorker
(
&
pMgmt
->
queryWorker
);
dndCleanupWorker
(
&
pMgmt
->
fetchWorker
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录