From 6982f2b7bda6660bdfc1d04b8ebd688b211a741f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 30 Mar 2022 17:09:30 +0800 Subject: [PATCH] feature/qnode --- include/libs/nodes/nodes.h | 2 + include/libs/nodes/plannodes.h | 12 + include/libs/nodes/querynodes.h | 1 + include/libs/qcom/query.h | 9 +- include/util/tdef.h | 4 + source/libs/nodes/inc/nodesUtil.h | 16 +- source/libs/nodes/src/nodesToSQLFuncs.c | 30 +- source/libs/nodes/src/nodesUtilFuncs.c | 3 +- source/libs/qcom/CMakeLists.txt | 2 +- source/libs/qcom/inc/queryInt.h | 33 ++- source/libs/qcom/src/queryExplain.c | 333 +++++++++++++++-------- source/libs/scalar/inc/filterInt.h | 2 - source/libs/scheduler/inc/schedulerInt.h | 7 +- source/libs/scheduler/src/scheduler.c | 136 ++++++++- 14 files changed, 446 insertions(+), 144 deletions(-) diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 9b8739a4f3..031445ac79 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -210,6 +210,8 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode); int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen); int32_t nodesStringToList(const char* pStr, SNodeList** pList); +int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len); + #ifdef __cplusplus } #endif diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 08792b6f8f..fb4def4440 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -283,11 +283,23 @@ typedef struct SSubplan { SDataSinkNode* pDataSink; // data of the subplan flow into the datasink } SSubplan; +typedef enum EExplainMode { + EXPLAIN_MODE_DISABLE = 1, + EXPLAIN_MODE_STATIC, + EXPLAIN_MODE_ANALYZE +} EExplainMode; + +typedef struct SExplainInfo { + EExplainMode mode; + bool verbose; +} SExplainInfo; + typedef struct SQueryPlan { ENodeType type; uint64_t queryId; int32_t numOfSubplans; SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. + SExplainInfo explainInfo; } SQueryPlan; #ifdef __cplusplus diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index 66f60bde98..2f4f938652 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -300,6 +300,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery); bool nodesIsTimelineQuery(const SNode* pQuery); void* nodesGetValueFromNode(SValueNode *pNode); +char* nodesGetStrValueFromNode(SValueNode *pNode); #ifdef __cplusplus } diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index c0fc8c630e..2a9dd28819 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -53,11 +53,6 @@ typedef struct SIndexMeta { } SIndexMeta; -typedef struct SExplainResNode { - SNodeList* pChildren; - SPhysiNode* pNode; - void* pExecInfo; -} SExplainResNode; /* * ASSERT(sizeof(SCTableMeta) == 24) @@ -179,6 +174,10 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); char *jobTaskStatusStr(int32_t status); +int32_t qAppendTaskExplainResRows(void **pRowCtx, struct SSubplan *plan, void *pExecTree, int32_t level); +int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp); +void qFreeExplainRowCtx(void *ctx); + SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen); diff --git a/include/util/tdef.h b/include/util/tdef.h index 193be4a3e6..7db06bbafb 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -224,6 +224,7 @@ typedef enum ELogicConditionType { #define TSDB_APP_NAME_LEN TSDB_UNI_LEN #define TSDB_STB_COMMENT_LEN 1024 + /** * In some scenarios uint16_t (0~65535) is used to store the row len. * - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header. @@ -471,6 +472,9 @@ enum { #define QND_VGID 1 #define VND_VGID 0 +#define MAX_NUM_STR_SIZE 40 + + #ifdef __cplusplus } #endif diff --git a/source/libs/nodes/inc/nodesUtil.h b/source/libs/nodes/inc/nodesUtil.h index de00b6bca4..976044c16f 100644 --- a/source/libs/nodes/inc/nodesUtil.h +++ b/source/libs/nodes/inc/nodesUtil.h @@ -20,12 +20,16 @@ extern "C" { #endif -#define nodesFatal(param, ...) qFatal("NODES: " param, __VA_ARGS__) -#define nodesError(param, ...) qError("NODES: " param, __VA_ARGS__) -#define nodesWarn(param, ...) qWarn("NODES: " param, __VA_ARGS__) -#define nodesInfo(param, ...) qInfo("NODES: " param, __VA_ARGS__) -#define nodesDebug(param, ...) qDebug("NODES: " param, __VA_ARGS__) -#define nodesTrace(param, ...) qTrace("NODES: " param, __VA_ARGS__) +#define nodesFatal(...) qFatal("NODES: " __VA_ARGS__) +#define nodesError(...) qError("NODES: " __VA_ARGS__) +#define nodesWarn(...) qWarn("NODES: " __VA_ARGS__) +#define nodesInfo(...) qInfo("NODES: " __VA_ARGS__) +#define nodesDebug(...) qDebug("NODES: " __VA_ARGS__) +#define nodesTrace(...) qTrace("NODES: " __VA_ARGS__) + +#define NODES_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #ifdef __cplusplus } diff --git a/source/libs/nodes/src/nodesToSQLFuncs.c b/source/libs/nodes/src/nodesToSQLFuncs.c index 6e4cd0f04e..fa941cfe9c 100644 --- a/source/libs/nodes/src/nodesToSQLFuncs.c +++ b/source/libs/nodes/src/nodesToSQLFuncs.c @@ -50,7 +50,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { char *t = nodesGetStrValueFromNode(colNode); if (NULL == t) { nodesError("fail to get str value from valueNode"); - return TSDB_CODE_QRY_APP_ERROR; + NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } *len += snprintf(buf, bufSize - *len, "%s", t); @@ -62,18 +62,18 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { SOperatorNode* pOpNode = (SOperatorNode*)pNode; *len += snprintf(buf, bufSize - *len, "("); if (pOpNode->pLeft) { - QRY_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len)); + NODES_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len)); } if (pOpNode->opType >= (sizeof(gOperatorStr) / sizeof(gOperatorStr[0]))) { nodesError("unknown operation type:%d", pOpNode->opType); - return TSDB_CODE_QRY_APP_ERROR; + NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } *len += snprintf(buf, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]); if (pOpNode->pRight) { - QRY_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len)); + NODES_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len)); } *len += snprintf(buf, bufSize - *len, ")"); @@ -91,7 +91,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { if (!first) { *len += snprintf(buf, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]); } - QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); + NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); first = false; } @@ -110,7 +110,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { if (!first) { *len += snprintf(buf, bufSize - *len, ", "); } - QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); + NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); first = false; } @@ -120,8 +120,20 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { } case QUERY_NODE_NODE_LIST:{ SNodeListNode* pListNode = (SNodeListNode *)pNode; - - //TODO + SNode* node = NULL; + bool first = true; + + *len += snprintf(buf, bufSize - *len, "("); + + FOREACH(node, pListNode->pNodeList) { + if (!first) { + *len += snprintf(buf, bufSize - *len, ", "); + } + NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); + first = false; + } + + *len += snprintf(buf, bufSize - *len, ")"); return TSDB_CODE_SUCCESS; } @@ -130,5 +142,5 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { } nodesError("nodesNodeToSQL unknown node = %s", nodesNodeName(pNode->type)); - return TSDB_CODE_QRY_APP_ERROR; + NODES_RET(TSDB_CODE_QRY_APP_ERROR); } diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 7b2be5e355..2725ad85dd 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -469,7 +469,7 @@ void* nodesGetValueFromNode(SValueNode *pNode) { char* nodesGetStrValueFromNode(SValueNode *pNode) { switch (pNode->node.resType.type) { - case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_BOOL: { void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE); if (NULL == buf) { return NULL; @@ -477,6 +477,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) { sprintf(buf, "%s", pNode->datum.b ? "true" : "false"); return buf; + } case TSDB_DATA_TYPE_TINYINT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_INT: diff --git a/source/libs/qcom/CMakeLists.txt b/source/libs/qcom/CMakeLists.txt index a9bf0f5594..d50047e592 100644 --- a/source/libs/qcom/CMakeLists.txt +++ b/source/libs/qcom/CMakeLists.txt @@ -8,7 +8,7 @@ target_include_directories( target_link_libraries( qcom - PRIVATE os util transport + PRIVATE os util transport nodes ) if(${BUILD_TEST}) diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index 7b6b7d44ef..6fc15aacfc 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -19,9 +19,12 @@ #ifdef __cplusplus extern "C" { #endif +#include "nodes.h" +#include "plannodes.h" #define QUERY_EXPLAIN_MAX_RES_LEN 1024 +//newline area #define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d" #define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d" #define EXPLAIN_SYSTBL_SCAN_FORMAT "System table scan on %s columns=%d" @@ -30,29 +33,47 @@ extern "C" { #define EXPLAIN_AGG_FORMAT "Aggragate functions=%d groups=%d width=%d" #define EXPLAIN_EXCHANGE_FORMAT "Exchange %d:1 width=%d" #define EXPLAIN_SORT_FORMAT "Sort on %d columns width=%d" -#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%d%c offset=%d%c sliding=%d%c width=%d" +#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d" #define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d" - #define EXPLAIN_ORDER_FORMAT "Order: %s" #define EXPLAIN_FILTER_FORMAT "Filter: " #define EXPLAIN_FILL_FORMAT "Fill: %s" -#define EXPLAIN_ON_CONDITIONS_FORMAT "ON Conditions: " +#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: " #define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]" + +//append area #define EXPLAIN_LOOPS_FORMAT "loops %d" #define EXPLAIN_REVERSE_FORMAT "reverse %d" + +typedef struct SExplainResNode { + SNodeList* pChildren; + SPhysiNode* pNode; + void* pExecInfo; +} SExplainResNode; + typedef struct SQueryExplainRowInfo { int32_t level; int32_t len; char *buf; } SQueryExplainRowInfo; +typedef struct SExplainRowCtx { + int32_t totalSize; + SArray *rows; +} SExplainRowCtx; + #define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") #define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") -#define QUERY_EXPLAIN_NEWLINE(...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN, __VA_ARGS__) -#define QUERY_EXPLAIN_APPEND(...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__) - +#define EXPLAIN_ROW_NEW(level, ...) \ + do { \ + tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, QUERY_EXPLAIN_MAX_RES_LEN, "%*s", level, ""); \ + tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__); \ + } while (0) + +#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__) +#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0) #define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/qcom/src/queryExplain.c b/source/libs/qcom/src/queryExplain.c index d6da9ab0e2..d352a29e3c 100644 --- a/source/libs/qcom/src/queryExplain.c +++ b/source/libs/qcom/src/queryExplain.c @@ -15,12 +15,41 @@ #include "queryInt.h" #include "query.h" +#include "plannodes.h" -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wformat-truncation" +int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes); -void qFreeExplainRes(SExplainResNode *res) { +void qFreeExplainResTree(SExplainResNode *res) { + if (NULL == res) { + return; + } + + taosMemoryFreeClear(res->pExecInfo); + + SNode* node = NULL; + FOREACH(node, res->pChildren) { + qFreeExplainResTree((SExplainResNode *)node); + } + nodesDestroyList(res->pChildren); + + taosMemoryFreeClear(res); +} + +void qFreeExplainRowCtx(void *ctx) { + if (NULL == ctx) { + return; + } + + SExplainRowCtx *pCtx = (SExplainRowCtx *)ctx; + int32_t rowSize = taosArrayGetSize(pCtx->rows); + for (int32_t i = 0; i < rowSize; ++i) { + SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); + taosMemoryFreeClear(row->buf); + } + + taosArrayDestroy(pCtx->rows); + taosMemoryFree(pCtx); } char *qFillModeString(EFillMode mode) { @@ -50,7 +79,7 @@ char *qGetNameFromColumnNode(SNode *pNode) { return ((SColumnNode *)pNode)->colName; } -int32_t qMakeExplainResChildrenInfo(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) { +int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) { int32_t tlen = 0; SNodeList *pPhysiChildren = NULL; @@ -122,14 +151,14 @@ int32_t qMakeExplainResChildrenInfo(SPhysiNode *pNode, void *pExecInfo, SNodeLis SNode* node = NULL; SExplainResNode *pResNode = NULL; FOREACH(node, pPhysiChildren) { - QRY_ERR_RET(qMakeExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode)); + QRY_ERR_RET(qGenerateExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode)); QRY_ERR_RET(nodesListAppend(*pChildren, pResNode)); } return TSDB_CODE_SUCCESS; } -int32_t qMakeExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) { +int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) { if (NULL == pNode) { *pRes = NULL; qError("physical node is NULL"); @@ -145,7 +174,7 @@ int32_t qMakeExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode int32_t code = 0; res->pNode = pNode; res->pExecInfo = pExecInfo; - QRY_ERR_JRET(qMakeExplainResChildrenInfo(pNode, pExecInfo, &res->pChildren)); + QRY_ERR_JRET(qGenerateExplainResChildren(pNode, pExecInfo, &res->pChildren)); *pRes = res; @@ -153,31 +182,27 @@ int32_t qMakeExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode _return: - qFreeExplainRes(res); + qFreeExplainResTree(res); QRY_RET(code); } -int32_t qMakeTaskExplainResTree(struct SSubplan *plan, void *pExecTree, SExplainResNode **pRes) { - char *tbuf = taosMemoryMalloc(QUERY_EXPLAIN_MAX_RES_LEN); - if (NULL == tbuf) { - qError("malloc size %d failed", QUERY_EXPLAIN_MAX_RES_LEN); - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - +int32_t qGenerateExplainResNodeTree(struct SSubplan *plan, void *pExecTree, SExplainResNode **pRes) { void *pExecInfo = NULL; // TODO - int32_t code = qMakeExplainResNode(plan->pNode, pExecInfo, pRes); - - taosMemoryFree(tbuf); - - QRY_RET(code); + QRY_RET(qGenerateExplainResNode(plan->pNode, pExecInfo, pRes)); } -int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t tlen) { - +int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t *len) { + int32_t tlen = *len; + + EXPLAIN_ROW_APPEND("(exec info here)"); + + *len = tlen; + + return TSDB_CODE_SUCCESS; } -int32_t qExplainResAppendRow(SArray *pRows, char *tbuf, int32_t len, int32_t level) { +int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32_t level) { SQueryExplainRowInfo row = {0}; row.buf = strdup(tbuf); if (NULL == row.buf) { @@ -187,8 +212,9 @@ int32_t qExplainResAppendRow(SArray *pRows, char *tbuf, int32_t len, int32_t lev row.level = level; row.len = len; + ctx->totalSize += len; - if (taosArrayPush(pRows, &row)) { + if (taosArrayPush(ctx->rows, &row)) { qError("taosArrayPush row to explain res rows failed"); taosMemoryFree(row.buf); QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -198,7 +224,7 @@ int32_t qExplainResAppendRow(SArray *pRows, char *tbuf, int32_t len, int32_t lev } -int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SArray *pRows, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx, char *tbuf, int32_t level) { int32_t tlen = 0; SPhysiNode* pNode = pResNode->pNode; if (NULL == pNode) { @@ -209,174 +235,198 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SArray *pRows, char switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: { STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length); + EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count); + EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count); if (pTagScanNode->reverse) { - QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse); + EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:{ STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length); + EXPLAIN_ROW_NEW(level, EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count); + EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count); if (pTblScanNode->scan.reverse) { - QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse); + EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - QUERY_EXPLAIN_NEWLINE(EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); if (pTblScanNode->pScanConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->pScanConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:{ SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length); + EXPLAIN_ROW_NEW(level, EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count); + EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count); if (pSTblScanNode->scan.reverse) { - QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse); + EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pPrjNode->node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_JOIN:{ SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pJoinNode->node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } - QUERY_EXPLAIN_NEWLINE(EXPLAIN_ON_CONDITIONS_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_AGG:{ SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pAggNode->node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pExchNode->node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_SORT:{ SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pSortNode->node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, + EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pIntNode->pFill) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } if (pIntNode->window.node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{ SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); if (pIntNode->window.node.pConditions) { - QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); - QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } break; } @@ -389,33 +439,24 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SArray *pRows, char } -int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SArray *pRsp, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainRowCtx *ctx, char *tbuf, int32_t level) { if (NULL == pResNode) { qError("explain res node is NULL"); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } int32_t code = 0; - QRY_ERR_RET(qExplainResNodeToRowsImpl(pResNode, pRsp, tbuf, level)); + QRY_ERR_RET(qExplainResNodeToRowsImpl(pResNode, ctx, tbuf, level)); SNode* pNode = NULL; FOREACH(pNode, pResNode->pChildren) { - QRY_ERR_RET(qExplainResNodeToRows((SExplainResNode *)pNode, pRsp, tbuf, level + 1)); + QRY_ERR_RET(qExplainResNodeToRows((SExplainResNode *)pNode, ctx, tbuf, level + 1)); } return TSDB_CODE_SUCCESS; } -int32_t qExplainRowsToRsp(SArray *rows, SRetrieveTableRsp **pRsp) { - int32_t rspSize = sizeof(SRetrieveTableRsp) + ; - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); - if (NULL == rsp) { - qError("malloc SRetrieveTableRsp failed"); - QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } -} - -int32_t qMakeTaskExplainResRows(SExplainResNode *pResNode, SRetrieveTableRsp **pRsp) { +int32_t qGenerateExplainResRowsFromNode(SExplainResNode *pResNode, SExplainRowCtx *ctx, int32_t level) { if (NULL == pResNode) { qError("explain res node is NULL"); QRY_RET(TSDB_CODE_QRY_APP_ERROR); @@ -427,24 +468,92 @@ int32_t qMakeTaskExplainResRows(SExplainResNode *pResNode, SRetrieveTableRsp **p qError("malloc size %d failed", QUERY_EXPLAIN_MAX_RES_LEN); QRY_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + QRY_ERR_JRET(qExplainResNodeToRows(pResNode, ctx, tbuf, level)); - SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); - if (NULL == rows) { - qError("taosArrayInit SQueryExplainRowInfo failed"); - QRY_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } +_return: + + taosMemoryFree(tbuf); + + QRY_RET(code); +} + +int32_t qAppendTaskExplainResRows(void **pRowCtx, struct SSubplan *plan, void *pExecTree, int32_t level) { + SExplainResNode *node = NULL; + int32_t code = 0; - QRY_ERR_JRET(qExplainResNodeToRows(pResNode, rows, tbuf, 0)); + QRY_ERR_RET(qGenerateExplainResNodeTree(plan, pExecTree, &node)); - QRY_ERR_JRET(qExplainRowsToRsp(rows, pRsp)); + if (NULL == *pRowCtx) { + *pRowCtx = taosMemoryCalloc(1, sizeof(SExplainRowCtx)); + if (NULL == *pRowCtx) { + qError("calloc SExplainRowCtx failed"); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); + if (NULL == rows) { + qError("taosArrayInit SQueryExplainRowInfo failed"); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + (*pRowCtx)->rows = rows; + } + + QRY_ERR_JRET(qGenerateExplainResRowsFromNode(node, *pRowCtx, level)); _return: - taosMemoryFree(tbuf); - taosArrayDestroy(rows); + qFreeExplainResTree(node); + if (code) { + taosArrayDestroy((*pRowCtx)->rows); + taosMemoryFree(*pRowCtx); + *pRowCtx = NULL; + } + QRY_RET(code); } -#pragma GCC diagnostic pop +int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp) { + SExplainRowCtx *pCtx = (SExplainRowCtx *)ctx; + int32_t rowNum = taosArrayGetSize(pCtx->rows); + if (rowNum <= 0) { + qError("empty explain res rows"); + QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + int32_t colNum = 1; + int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->totalSize; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); + if (NULL == rsp) { + qError("malloc SRetrieveTableRsp failed, size:%d", rspSize); + QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + rsp->completed = 1; + rsp->numOfRows = rowNum; + + *(int32_t *)rsp->data = pCtx->totalSize; + + int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t)); + char *data = (char *)(offset + rowNum); + int32_t tOffset = 0; + + for (int32_t i = 0; i < rowNum; ++i) { + SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); + *offset = tOffset; + tOffset += row->len; + + memcpy(data, row->buf, row->len); + + ++offset; + data += row->len; + } + + return TSDB_CODE_SUCCESS; +} + + + diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index 834a722bd8..3e04e7b30a 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -36,8 +36,6 @@ extern "C" { #define FILTER_DUMMY_EMPTY_OPTR 127 -#define MAX_NUM_STR_SIZE 40 - #define FILTER_RM_UNIT_MIN_ROWS 100 enum { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 518da6e2b8..579a995056 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,6 +38,11 @@ enum { SCH_WRITE, }; +typedef struct SSchExplainGroup { + int32_t nodeNum; + SSubplan *plan; +} SSchExplainGroup; + typedef struct SSchTrans { void *transInst; void *transHandle; @@ -142,7 +147,7 @@ typedef struct SSchTask { } SSchTask; typedef struct SSchJobAttr { - bool needFetch; + bool analyzeExplain; bool syncSchedule; bool queryJob; bool needFlowCtrl; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index fdb0b6498c..376febcdf3 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -476,6 +476,90 @@ _return: SCH_RET(code); } +int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) { + int32_t code = 0; + pJob->queryId = pDag->queryId; + + if (pDag->numOfSubplans <= 0) { + SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans); + if (levelNum <= 0) { + SCH_JOB_ELOG("invalid level num:%d", levelNum); + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SHashObj *groupHash = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (NULL == groupHash) { + SCH_JOB_ELOG("groupHash %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->subPlans = pDag->pSubplans; + + SNodeListNode *plans = NULL; + int32_t taskNum = 0; + SSchExplainGroup *pGroup = NULL; + void *rowCtx = NULL; + + for (int32_t i = 0; i < levelNum; ++i) { + plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); + if (NULL == plans) { + SCH_JOB_ELOG("empty level plan, level:%d", i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + taskNum = (int32_t)LIST_LENGTH(plans->pNodeList); + if (taskNum <= 0) { + SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } + + for (int32_t n = 0; n < taskNum; ++n) { + SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); + pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId)); + if (pGroup) { + ++pGroup->nodeNum; + continue; + } + + SSchExplainGroup group = {.nodeNum = 1, .plan = plan}; + if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) { + SCH_TASK_ELOG("taosHashPut to explainGroupHash failed, taskIdx:%d", n); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + void *pIter = taosHashIterate(groupHash, NULL); + while (pIter) { + pGroup = (SSchExplainGroup *)pIter; + + SCH_ERR_JRET(qAppendTaskExplainResRows(&rowCtx, pGroup->plan, NULL, i)); + + pIter = taosHashIterate(groupHash, pIter); + } + + taosHashClear(groupHash); + + SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); + } + + SRetrieveTableRsp *pRsp = NULL; + SCH_ERR_JRET(qGetExplainRspFromRowCtx(rowCtx, &pRsp)); + + pJob->resData = pRsp; + +_return: + + taosHashCleanup(groupHash); + qFreeExplainRowCtx(rowCtx); + + SCH_RET(code); +} + + int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { if (NULL != pTask->candidateAddrs) { return TSDB_CODE_SUCCESS; @@ -2093,6 +2177,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + pJob->attr.analyzeExplain = (EXPLAIN_MODE_ANALYZE == pDag->explainInfo.mode); pJob->attr.syncSchedule = syncSchedule; pJob->transport = transport; pJob->sql = sql; @@ -2163,6 +2248,51 @@ _return: SCH_RET(code); } +int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + bool syncSchedule) { + qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); + + int32_t code = 0; + SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); + if (NULL == pJob) { + qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pJob->sql = sql; + pJob->attr.queryJob = true; + + SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob)); + + int64_t refId = taosAddRef(schMgmt.jobRef, pJob); + if (refId < 0) { + SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); + SCH_ERR_JRET(terrno); + } + + if (NULL == schAcquireJob(refId)) { + SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + pJob->refId = refId; + + SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); + + pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + *job = pJob->refId; + SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + + schReleaseJob(pJob->refId); + + return TSDB_CODE_SUCCESS; + +_return: + + schFreeJobImpl(pJob); + SCH_RET(code); +} + int32_t schedulerInit(SSchedulerCfg *cfg) { if (schMgmt.jobRef) { qError("scheduler already initialized"); @@ -2211,7 +2341,11 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schStaticExplain(transport, nodeList, pDag, pJob, sql, true)); + } else { + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true)); + } SSchJob *job = schAcquireJob(*pJob); -- GitLab