提交 797d0891 编写于 作者: D dapan

feature/qnode

上级 85a16137
......@@ -1226,6 +1226,16 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S
tNameGetFullDbName(pTableName, db);
CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbVgroup));
// REMOEV THIS ....
if (0 == tbMeta->vgId) {
SVgroupInfo vgroup = {0};
catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pTableName, &vgroup);
tbMeta->vgId = vgroup.vgId;
}
// REMOVE THIS ....
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbVgroup, pVgroupList));
} else {
......
......@@ -3628,6 +3628,33 @@ int32_t evaluateSqlNode(SSqlNode* pNode, int32_t tsPrecision, SMsgBuf* pMsgBuf)
return TSDB_CODE_SUCCESS;
}
int32_t setTableVgroupList(SParseBasicCtx *pCtx, SName* name, SVgroupsInfo **pVgList) {
SArray* vgroupList = NULL;
int32_t code = catalogGetTableDistVgroup(pCtx->pCatalog, pCtx->pTransporter, &pCtx->mgmtEpSet, name, &vgroupList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t vgroupNum = taosArrayGetSize(vgroupList);
SVgroupsInfo *vgList = calloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupMsg) * vgroupNum);
vgList->numOfVgroups = vgroupNum;
for (int32_t i = 0; i < vgroupNum; ++i) {
SVgroupInfo *vg = taosArrayGet(vgroupList, i);
vgList->vgroups[i].vgId = vg->vgId;
vgList->vgroups[i].numOfEps = vg->numOfEps;
memcpy(vgList->vgroups[i].epAddr, vg->epAddr, sizeof(vgList->vgroups[i].epAddr));
}
*pVgList = vgList;
taosArrayDestroy(vgroupList);
return TSDB_CODE_SUCCESS;
}
int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmtInfo* pQueryInfo, char* msgBuf, int32_t msgBufLen) {
assert(pCtx != NULL && pInfo != NULL);
int32_t code = 0;
......@@ -3916,7 +3943,7 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
if (code != TSDB_CODE_SUCCESS) {
return code;
}
data.pTableMeta = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(data.pTableMeta, &pmt);
......@@ -3926,6 +3953,12 @@ int32_t qParserValidateSqlNode(SParseBasicCtx *pCtx, SSqlInfo* pInfo, SQueryStmt
pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1;
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(data.pTableMeta);
return code;
}
// evaluate the sqlnode
STableMeta* pTableMeta = (STableMeta*) taosArrayGetP(data.pTableMeta, 0);
assert(pTableMeta != NULL);
......
......@@ -160,9 +160,6 @@ static SPhyNode* createUserTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableI
return (SPhyNode*)node;
}
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}
static bool isSystemTable(SQueryTableInfo* pTable) {
// todo
......@@ -259,12 +256,20 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
return (TSDB_SUPER_TABLE == pTable->pMeta->pTableMeta->tableType);
}
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan);
}
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SQueryTableInfo* pTable = (SQueryTableInfo*)pPlanNode->pExtInfo;
if (needMultiNodeScan(pTable)) {
return createExchangeNode(pCxt, pPlanNode, splitSubplanByTable(pCxt, pPlanNode, pTable));
}
return createSingleTableScanNode(pPlanNode, pTable);
return createSingleTableScanNode(pPlanNode, pTable, pCxt->pCurrentSubplan);
}
static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
......@@ -322,12 +327,12 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
if (QNODE_MODIFY == pRoot->info.type) {
splitModificationOpSubPlan(pCxt, pRoot);
} else {
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MERGE);
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
++(pCxt->nextId.templateId);
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
}
// todo deal subquery
}
......
......@@ -242,18 +242,19 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
}
int32_t addNum = 0;
int32_t nodeNum = 0;
if (pJob->nodeList) {
int32_t nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
nodeNum = taosArrayGetSize(pJob->nodeList);
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
}
}
if (addNum <= 0) {
SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册