提交 d2d76128 编写于 作者: D dapan1121

enh: support multiple exchange operators in explain

上级 a85a73e2
...@@ -99,8 +99,10 @@ extern "C" { ...@@ -99,8 +99,10 @@ extern "C" {
typedef struct SExplainGroup { typedef struct SExplainGroup {
int32_t nodeNum; int32_t nodeNum;
int32_t nodeIdx;
int32_t physiPlanExecNum; int32_t physiPlanExecNum;
int32_t physiPlanExecIdx; int32_t physiPlanExecIdx;
bool singleChannel;
SRWLatch lock; SRWLatch lock;
SSubplan *plan; SSubplan *plan;
SArray *nodeExecInfo; //Array<SExplainRsp> SArray *nodeExecInfo; //Array<SExplainRsp>
......
...@@ -21,7 +21,7 @@ ...@@ -21,7 +21,7 @@
#include "tdatablock.h" #include "tdatablock.h"
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level); int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
void qExplainFreeResNode(SExplainResNode *resNode) { void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) { if (NULL == resNode) {
...@@ -250,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo ...@@ -250,7 +250,7 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) { int32_t qExplainGenerateResNodeExecInfo(SPhysiNode *pNode, SArray **pExecInfo, SExplainGroup *group) {
*pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo)); *pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo));
if (NULL == (*pExecInfo)) { if (NULL == (*pExecInfo)) {
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum); qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
...@@ -258,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group ...@@ -258,17 +258,28 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
} }
SExplainRsp *rsp = NULL; SExplainRsp *rsp = NULL;
if (group->singleChannel) {
if (0 == group->physiPlanExecIdx) {
group->nodeIdx = 0;
}
rsp = taosArrayGet(group->nodeExecInfo, group->nodeIdx++);
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR;
}
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
} else {
for (int32_t i = 0; i < group->nodeNum; ++i) { for (int32_t i = 0; i < group->nodeNum; ++i) {
rsp = taosArrayGet(group->nodeExecInfo, i); rsp = taosArrayGet(group->nodeExecInfo, i);
/*
if (group->physiPlanExecIdx >= rsp->numOfPlans) { if (group->physiPlanExecIdx >= rsp->numOfPlans) {
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans); qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
return TSDB_CODE_QRY_APP_ERROR; return TSDB_CODE_QRY_APP_ERROR;
} }
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx); taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
*/ }
taosArrayPush(*pExecInfo, rsp->subplanInfo);
} }
++group->physiPlanExecIdx; ++group->physiPlanExecIdx;
...@@ -293,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai ...@@ -293,7 +304,7 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai
resNode->pNode = pNode; resNode->pNode = pNode;
if (group->nodeExecInfo) { if (group->nodeExecInfo) {
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group)); QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(pNode, &resNode->pExecInfo, group));
} }
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren)); QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren));
...@@ -803,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -803,7 +814,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
} }
} }
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1)); QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1, pExchNode->singleChannel));
break; break;
} }
case QUERY_NODE_PHYSICAL_PLAN_SORT: { case QUERY_NODE_PHYSICAL_PLAN_SORT: {
...@@ -1535,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32 ...@@ -1535,7 +1546,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel) {
SExplainResNode *node = NULL; SExplainResNode *node = NULL;
int32_t code = 0; int32_t code = 0;
SExplainCtx *ctx = (SExplainCtx *)pCtx; SExplainCtx *ctx = (SExplainCtx *)pCtx;
...@@ -1546,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { ...@@ -1546,6 +1557,9 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
} }
group->singleChannel = singleChannel;
group->physiPlanExecIdx = 0;
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node)); QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node));
QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level)); QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level));
...@@ -1709,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) { ...@@ -1709,7 +1723,7 @@ int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
} }
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) { int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0)); QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0, false));
QRY_ERR_RET(qExplainAppendPlanRows(pCtx)); QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp)); QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
#include "qworker.h" #include "qworker.h"
#include "tglobal.h"
void schFreeTask(SSchJob *pJob, SSchTask *pTask) { void schFreeTask(SSchJob *pJob, SSchTask *pTask) {
schDeregisterTaskHb(pJob, pTask); schDeregisterTaskHb(pJob, pTask);
...@@ -897,9 +898,12 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { ...@@ -897,9 +898,12 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg, SCH_TASK_ELOG("failed to create physical plan, code:%s, msg:%p, len:%d", tstrerror(code), pTask->msg,
pTask->msgLen); pTask->msgLen);
SCH_ERR_RET(code); SCH_ERR_RET(code);
} else { } else if (tsQueryPlannerTrace) {
//binary msg char *msg = NULL;
//SCH_TASK_DLOGL("physical plan len:%d, %s", pTask->msgLen, pTask->msg); int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen);
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
taosMemoryFree(msg);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册