提交 fad5a80b 编写于 作者: D dapan1121

feature/scheduler

上级 cc5672a4
......@@ -28,6 +28,7 @@ enum {
NODE_TYPE_VNODE = 1,
NODE_TYPE_QNODE,
NODE_TYPE_SNODE,
NODE_TYPE_MNODE,
};
......
......@@ -159,6 +159,6 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
}
......@@ -6,7 +6,7 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_link_libraries(
mnode scheduler sdb wal transport cjson sync monitor
mnode scheduler sdb wal transport cjson sync monitor executor qworker
)
if(${BUILD_TEST})
......
......@@ -59,6 +59,8 @@ typedef struct SMnodeLoad {
int64_t compStorage;
} SMnodeLoad;
typedef struct SQWorkerMgmt SQHandle;
typedef struct {
const char *name;
MndInitFp initFp;
......@@ -112,6 +114,7 @@ typedef struct SMnode {
SSdb *pSdb;
SMgmtWrapper *pWrapper;
SArray *pSteps;
SQHandle *pQuery;
SShowMgmt showMgmt;
SProfileMgmt profileMgmt;
STelemMgmt telemMgmt;
......@@ -119,7 +122,7 @@ typedef struct SMnode {
SHashObj *infosMeta;
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SMsgCb msgCb;
SMsgCb msgCb;
} SMnode;
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_QUERY_H_
#define _TD_MND_QUERY_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitQuery(SMnode *pMnode);
void mndCleanupQuery(SMnode *pMnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_QUERY_H_*/
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndQuery.h"
#include "mndMnode.h"
#include "executor.h"
#include "qworker.h"
int32_t mndProcessQueryMsg(SNodeMsg *pReq) {
mTrace("message in query queue is processing");
SMnode *pMnode = pReq->pNode;
SReadHandle handle = {0};
switch (pReq->rpcMsg.msgType) {
case TDMT_VND_QUERY:
return qWorkerProcessQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_QUERY_CONTINUE:
return qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, &pReq->rpcMsg);
default:
mError("unknown msg type:%d in query queue", pReq->rpcMsg.msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t mndProcessFetchMsg(SNodeMsg *pReq) {
mTrace("message in fetch queue is processing");
SMnode *pMnode = pReq->pNode;
switch (pReq->rpcMsg.msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_DROP_TASK:
return qWorkerProcessDropMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
case TDMT_VND_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pMnode, pMnode->pQuery, &pReq->rpcMsg);
default:
mError("unknown msg type:%d in fetch queue", pReq->rpcMsg.msgType);
return TSDB_CODE_VND_APP_ERROR;
}
}
int32_t mndInitQuery(SMnode *pMnode) {
int32_t code = qWorkerInit(NODE_TYPE_MNODE, MND_VGID, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb);
if (code) {
return code;
}
mndSetMsgHandle(pMnode, TDMT_VND_QUERY, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_CONTINUE, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH, mndProcessFetchMsg);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TASK, mndProcessFetchMsg);
mndSetMsgHandle(pMnode, TDMT_VND_QUERY_HEARTBEAT, mndProcessFetchMsg);
return 0;
}
void mndCleanupQuery(SMnode *pMnode) { qWorkerDestroy((void **)&pMnode->pQuery); }
......@@ -39,6 +39,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndQuery.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
......@@ -217,6 +218,7 @@ static int32_t mndInitSteps(SMnode *pMnode) {
// if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-sync", mndInitSync, mndCleanupSync) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-telem", mndInitTelem, mndCleanupTelem) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-timer", NULL, mndCleanupTimer) != 0) return -1;
......
......@@ -1480,13 +1480,14 @@ static int32_t jsonToDatum(const SJson* pJson, void* pObj) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pNode->datum.p = calloc(1, pNode->node.resType.bytes);
pNode->datum.p = calloc(1, pNode->node.resType.bytes + VARSTR_HEADER_SIZE + 1 + 100);
if (NULL == pNode->datum.p) {
code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
varDataSetLen(pNode->datum.p, pNode->node.resType.bytes);
code = tjsonGetStringValue(pJson, jkValueDatum, varDataVal(pNode->datum.p));
nodesDebug("varchar len:%d,string:%s", pNode->node.resType.bytes, varDataVal(pNode->datum.p));
break;
}
case TSDB_DATA_TYPE_JSON:
......
......@@ -383,12 +383,13 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY: {
pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE);
pVal->datum.p = calloc(1, pVal->node.resType.bytes + VARSTR_HEADER_SIZE + 1);
if (NULL == pVal->datum.p) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_OUT_OF_MEMORY);
}
varDataSetLen(pVal->datum.p, pVal->node.resType.bytes);
strcpy(varDataVal(pVal->datum.p), pVal->literal);
strncpy(varDataVal(pVal->datum.p), pVal->literal, pVal->node.resType.bytes);
parserDebug("varchar value:%s,len:%d", pVal->literal, pVal->node.resType.bytes);
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
......@@ -599,9 +600,9 @@ static int32_t toVgroupsInfo(SArray* pVgs, SVgroupsInfo** pVgsInfo) {
static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
// todo release
// if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// return TSDB_CODE_SUCCESS;
// }
if (0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SArray* vgroupList = NULL;
......@@ -613,9 +614,9 @@ static int32_t setSysTableVgroupList(STranslateContext* pCxt, SName* pName, SRea
if (TSDB_CODE_SUCCESS == code) {
// todo remove
if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1);
}
//if (NULL != vgroupList && taosArrayGetSize(vgroupList) > 0 && 0 != strcmp(pRealTable->table.tableName, TSDB_INS_TABLE_USER_TABLES)) {
// taosArrayPopTailBatch(vgroupList, taosArrayGetSize(vgroupList) - 1);
//}
code = toVgroupsInfo(vgroupList, &pRealTable->pVgroupList);
}
......
......@@ -307,11 +307,14 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
taosArrayPush(pCxt->pExecNodeList, &pSubplan->execNode);
} else {
for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
SQueryNodeAddr addr;
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
taosArrayPush(pCxt->pExecNodeList, &addr);
}
SQueryNodeAddr addr = { .nodeId = MND_VGID, .epSet = pCxt->pPlanCxt->mgmtEpSet };
taosArrayPush(pCxt->pExecNodeList, &addr);
//for (int32_t i = 0; i < pScanLogicNode->pVgroupList->numOfVgroups; ++i) {
// SQueryNodeAddr addr;
// vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups + i, &addr);
// taosArrayPush(pCxt->pExecNodeList, &addr);
//}
}
pScan->mgmtEpSet = pCxt->pPlanCxt->mgmtEpSet;
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册