提交 01447153 编写于 作者: X Xiaoyu Wang

TD-12678 qSetSubplanExecutionNode interface implement

上级 3077a244
......@@ -155,7 +155,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
......
......@@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan);
......
......@@ -75,24 +75,16 @@ int32_t dsinkNameToDsinkType(const char* name) {
return DSINK_Unknown;
}
static SDataSink* initDataSink(int32_t type, int32_t size) {
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type);
return sink;
}
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher));
return (SDataSink*)dispatcher;
}
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks) {
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter));
inserter->numOfTables = pBlocks->numOfTables;
inserter->size = pBlocks->size;
SWAP(inserter->pData, pBlocks->pData, char*);
return (SDataSink*)inserter;
static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) {
dst->pSchema = malloc(sizeof(SSlotSchema) * src->numOfCols);
if (NULL == dst->pSchema) {
return false;
}
memcpy(dst->pSchema, src->pSchema, sizeof(SSlotSchema) * src->numOfCols);
dst->numOfCols = src->numOfCols;
dst->resultRowSize = src->resultRowSize;
dst->precision = src->precision;
return true;
}
static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) {
......@@ -102,6 +94,10 @@ static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataB
return false;
}
memcpy(dataBlockSchema->pSchema, pPlanNode->pSchema, sizeof(SSlotSchema) * pPlanNode->numOfCols);
dataBlockSchema->resultRowSize = 0;
for (int32_t i = 0; i < dataBlockSchema->numOfCols; ++i) {
dataBlockSchema->resultRowSize += dataBlockSchema->pSchema[i].bytes;
}
return true;
}
......@@ -120,13 +116,37 @@ static bool cloneExprArray(SArray** dst, SArray* src) {
return (TSDB_CODE_SUCCESS == copyAllExprInfo(*dst, src, true) ? true : false);
}
static SDataSink* initDataSink(int32_t type, int32_t size, const SPhyNode* pRoot) {
SDataSink* sink = (SDataSink*)validPointer(calloc(1, size));
sink->info.type = type;
sink->info.name = dsinkTypeToDsinkName(type);
if (NULL !=pRoot && !copySchema(&sink->schema, &pRoot->targetSchema)) {
tfree(sink);
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return sink;
}
static SDataSink* createDataInserter(SPlanContext* pCxt, SVgDataBlocks* pBlocks, const SPhyNode* pRoot) {
SDataInserter* inserter = (SDataInserter*)initDataSink(DSINK_Insert, sizeof(SDataInserter), pRoot);
inserter->numOfTables = pBlocks->numOfTables;
inserter->size = pBlocks->size;
SWAP(inserter->pData, pBlocks->pData, char*);
return (SDataSink*)inserter;
}
static SDataSink* createDataDispatcher(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, const SPhyNode* pRoot) {
SDataDispatcher* dispatcher = (SDataDispatcher*)initDataSink(DSINK_Dispatch, sizeof(SDataDispatcher), pRoot);
return (SDataSink*)dispatcher;
}
static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t size) {
SPhyNode* node = (SPhyNode*)validPointer(calloc(1, size));
node->info.type = type;
node->info.name = opTypeToOpName(type);
if (!cloneExprArray(&node->pTargets, pPlanNode->pExpr) || !toDataBlockSchema(pPlanNode, &(node->targetSchema))) {
free(node);
return NULL;
THROW(TSDB_CODE_TSC_OUT_OF_MEMORY);
}
return node;
}
......@@ -239,7 +259,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_SCAN);
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[i]), &subplan->execNode);
subplan->pNode = createMultiTableScanNode(pPlanNode, pTable);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode);
subplan->pDataSink = createDataDispatcher(pCxt, pPlanNode, subplan->pNode);
RECOVERY_CURRENT_SUBPLAN(pCxt);
}
return pCxt->nextId.templateId++;
......@@ -248,6 +268,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
node->srcTemplateId = srcTemplateId;
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr)));
return (SPhyNode*)node;
}
......@@ -313,7 +334,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan
SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i);
vgroupInfoToEpSet(&blocks->vg, &subplan->execNode);
subplan->pDataSink = createDataInserter(pCxt, blocks);
subplan->pDataSink = createDataInserter(pCxt, blocks, NULL);
subplan->pNode = NULL;
subplan->type = QUERY_TYPE_MODIFY;
subplan->msgType = pPayload->msgType;
......@@ -332,7 +353,7 @@ static void createSubplanByLevel(SPlanContext* pCxt, SQueryPlanNode* pRoot) {
subplan->msgType = TDMT_VND_QUERY;
subplan->pNode = createPhyNode(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot);
subplan->pDataSink = createDataDispatcher(pCxt, pRoot, subplan->pNode);
}
// todo deal subquery
}
......@@ -359,6 +380,24 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return TSDB_CODE_SUCCESS;
}
int32_t setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
//todo
void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) {
if (NULL == pNode) {
return;
}
if (OP_Exchange == pNode->info.type) {
SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
if (templateId == pExchange->srcTemplateId) {
taosArrayPush(pExchange->pSrcEndPoints, pEp);
}
}
if (pNode->pChildren != NULL) {
size_t size = taosArrayGetSize(pNode->pChildren);
for(int32_t i = 0; i < size; ++i) {
setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i));
}
}
}
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) {
setExchangSourceNode(templateId, pEp, subplan->pNode);
}
......@@ -88,8 +88,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return TSDB_CODE_SUCCESS;
}
int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
return setSubplanExecutionNode(subplan, templateId, ep);
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
setSubplanExecutionNode(subplan, templateId, ep);
}
int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
......
......@@ -522,7 +522,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&par->childReady, 1);
SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr));
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->execAddr);
if (SCH_TASK_READY_TO_LUNCH(par)) {
SCH_ERR_RET(schLaunchTask(pJob, par));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册