From 797d08915f1e1daf0054e2bb47ec589da16ef68d Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 8 Jan 2022 15:14:33 +0800 Subject: [PATCH] feature/qnode --- source/libs/catalog/src/catalog.c | 10 ++++++++ source/libs/parser/src/astValidate.c | 35 +++++++++++++++++++++++++- source/libs/planner/src/physicalPlan.c | 17 ++++++++----- source/libs/scheduler/src/scheduler.c | 17 +++++++------ 4 files changed, 64 insertions(+), 15 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index da0951fe1f..94f34b8e17 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tNameGetFullDbName(pTableName, db); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup)); + // REMOEV THIS .... + if (0 == tbMeta->vgId) { + SVgroupInfo vgroup = {0}; + + catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup); + + tbMeta->vgId = vgroup.vgId; + } + // REMOVE THIS .... + if (tbMeta->tableType == TSDB_SUPER_TABLE) { CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList)); } else { diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 3ca3d87a79..5cabbb5e3b 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf) return TSDB_CODE_SUCCESS; } +int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) { + SArray* vgroupList = NULL; + int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t vgroupNum = taosArrayGetSize(vgroupList); + + SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum); + + vgList->numOfVgroups = vgroupNum; + + for (int32_t i = 0; i < vgroupNum; ++i) { + SVgroupInfo *vg = taosArrayGet(vgroupList, i); + vgList->vgroups[i].vgId = vg->vgId; + vgList->vgroups[i].numOfEps = vg->numOfEps; + memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr)); + } + + *pVgList = vgList; + + taosArrayDestroy(vgroupList); + + return TSDB_CODE_SUCCESS; +} + int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) { assert(pCtx != NULL && pInfo != NULL); int32_t code = 0; @@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt if (code != TSDB_CODE_SUCCESS) { return code; } - + data.pTableMeta = taosArrayInit(1, POINTER_BYTES); taosArrayPush(data.pTableMeta, &pmt); @@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->numOfTables = 1; + code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroy(data.pTableMeta); + return code; + } + // evaluate the sqlnode STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0); assert(pTableMeta != NULL); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 461f16cdf0..bbb84223ac 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI return (SPhyNode*)node; } -static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) { - return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); -} static bool isSystemTable(SQueryTableInfo* pTable) { // todo @@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) { return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType); } +static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { + vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); + + return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); +} + + static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo; + if (needMultiNodeScan(pTable)) { return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable)); } - return createSingleTableScanNode(pPlanNode, pTable); + return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan); } static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { @@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) { if (QNODE_MODIFY == pRoot->info.type) { splitModificationOpSubPlan(pCxt, pRoot); } else { - SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE); + SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN); ++(pCxt->nextId.templateId); subplan->msgType = TDMT_VND_QUERY; subplan->pNode = createPhyNode(pCxt, pRoot); - subplan->pDataSink = createDataDispatcher(pCxt, pRoot); + subplan->pDataSink = createDataDispatcher(pCxt, pRoot); } // todo deal subquery } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 5190546cef..3f6d0f1702 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -242,18 +242,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } int32_t addNum = 0; + int32_t nodeNum = 0; if (pJob->nodeList) { - int32_t nodeNum = taosArrayGetSize(pJob->nodeList); - - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { - SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); + nodeNum = taosArrayGetSize(pJob->nodeList); - if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { - SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { + SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); + + if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { + SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } } } - } if (addNum <= 0) { SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); -- GitLab