From c8e132052a02fc787205a76f67273b469cea7d3d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 29 Mar 2022 19:51:11 +0800 Subject: [PATCH] feature/qnode --- include/libs/qcom/query.h | 4 +- source/libs/nodes/src/nodesToSQLFuncs.c | 134 +++++++++ source/libs/nodes/src/nodesTraverseFuncs.c | 3 +- source/libs/nodes/src/nodesUtilFuncs.c | 63 ++++ source/libs/qcom/inc/queryInt.h | 26 +- source/libs/qcom/src/queryExplain.c | 321 ++++++++++++++++++--- source/libs/scheduler/src/scheduler.c | 15 +- 7 files changed, 519 insertions(+), 47 deletions(-) create mode 100644 source/libs/nodes/src/nodesToSQLFuncs.c diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index c3870665a6..c0fc8c630e 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -53,11 +53,11 @@ typedef struct SIndexMeta { } SIndexMeta; -typedef struct SPhysiNodeExplainResNode { +typedef struct SExplainResNode { SNodeList* pChildren; SPhysiNode* pNode; void* pExecInfo; -} SPhysiNodeExplainResNode; +} SExplainResNode; /* * ASSERT(sizeof(SCTableMeta) == 24) diff --git a/source/libs/nodes/src/nodesToSQLFuncs.c b/source/libs/nodes/src/nodesToSQLFuncs.c new file mode 100644 index 0000000000..6e4cd0f04e --- /dev/null +++ b/source/libs/nodes/src/nodesToSQLFuncs.c @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "cmdnodes.h" +#include "nodesUtil.h" +#include "plannodes.h" +#include "querynodes.h" +#include "taos.h" +#include "taoserror.h" +#include "thash.h" + +char *gOperatorStr[] = {NULL, "+", "-", "*", "/", "%", "&", "|", ">", ">=", "<", "<=", "=", "<>", + "IN", "NOT IN", "LIKE", "NOT LIKE", "MATCH", "NMATCH", "IS NULL", "IS NOT NULL", + "IS TRUE", "IS FALSE", "IS UNKNOWN", "IS NOT TRUE", "IS NOT FALSE", "IS NOT UNKNOWN"}; +char *gLogicConditionStr[] = {"AND", "OR", "NOT"}; + +int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) { + switch (pNode->type) { + case QUERY_NODE_COLUMN: { + SColumnNode *colNode = (SColumnNode *)pNode; + *len = 0; + if (colNode->dbName[0]) { + *len += snprintf(buf, bufSize - *len, "`%s`.", colNode->dbName); + } + + if (colNode->tableAlias[0]) { + *len += snprintf(buf, bufSize - *len, "`%s`.", colNode->tableAlias); + } else if (colNode->tableName[0]) { + *len += snprintf(buf, bufSize - *len, "`%s`.", colNode->tableName); + } + + *len += snprintf(buf, bufSize - *len, "`%s`", colNode->colName); + + return TSDB_CODE_SUCCESS; + } + case QUERY_NODE_VALUE:{ + SValueNode *colNode = (SValueNode *)pNode; + char *t = nodesGetStrValueFromNode(colNode); + if (NULL == t) { + nodesError("fail to get str value from valueNode"); + return TSDB_CODE_QRY_APP_ERROR; + } + + *len += snprintf(buf, bufSize - *len, "%s", t); + taosMemoryFree(t); + + return TSDB_CODE_SUCCESS; + } + case QUERY_NODE_OPERATOR: { + SOperatorNode* pOpNode = (SOperatorNode*)pNode; + *len += snprintf(buf, bufSize - *len, "("); + if (pOpNode->pLeft) { + QRY_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len)); + } + + if (pOpNode->opType >= (sizeof(gOperatorStr) / sizeof(gOperatorStr[0]))) { + nodesError("unknown operation type:%d", pOpNode->opType); + return TSDB_CODE_QRY_APP_ERROR; + } + + *len += snprintf(buf, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]); + + if (pOpNode->pRight) { + QRY_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len)); + } + + *len += snprintf(buf, bufSize - *len, ")"); + + return TSDB_CODE_SUCCESS; + } + case QUERY_NODE_LOGIC_CONDITION:{ + SLogicConditionNode* pLogicNode = (SLogicConditionNode*)pNode; + SNode* node = NULL; + bool first = true; + + *len += snprintf(buf, bufSize - *len, "("); + + FOREACH(node, pLogicNode->pParameterList) { + if (!first) { + *len += snprintf(buf, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]); + } + QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); + first = false; + } + + *len += snprintf(buf, bufSize - *len, ")"); + + return TSDB_CODE_SUCCESS; + } + case QUERY_NODE_FUNCTION:{ + SFunctionNode* pFuncNode = (SFunctionNode*)pNode; + SNode* node = NULL; + bool first = true; + + *len += snprintf(buf, bufSize - *len, "%s(", pFuncNode->functionName); + + FOREACH(node, pFuncNode->pParameterList) { + if (!first) { + *len += snprintf(buf, bufSize - *len, ", "); + } + QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len)); + first = false; + } + + *len += snprintf(buf, bufSize - *len, ")"); + + return TSDB_CODE_SUCCESS; + } + case QUERY_NODE_NODE_LIST:{ + SNodeListNode* pListNode = (SNodeListNode *)pNode; + + //TODO + + return TSDB_CODE_SUCCESS; + } + default: + break; + } + + nodesError("nodesNodeToSQL unknown node = %s", nodesNodeName(pNode->type)); + return TSDB_CODE_QRY_APP_ERROR; +} diff --git a/source/libs/nodes/src/nodesTraverseFuncs.c b/source/libs/nodes/src/nodesTraverseFuncs.c index 7eaa049946..bbd0473edd 100644 --- a/source/libs/nodes/src/nodesTraverseFuncs.c +++ b/source/libs/nodes/src/nodesTraverseFuncs.c @@ -17,7 +17,8 @@ typedef enum ETraversalOrder { TRAVERSAL_PREORDER = 1, - TRAVERSAL_POSTORDER + TRAVERSAL_INORDER, + TRAVERSAL_POSTORDER, } ETraversalOrder; static EDealRes walkList(SNodeList* pNodeList, ETraversalOrder order, FNodeWalker walker, void* pContext); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 035a2f1caa..7b2be5e355 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -467,6 +467,69 @@ void* nodesGetValueFromNode(SValueNode *pNode) { return NULL; } +char* nodesGetStrValueFromNode(SValueNode *pNode) { + switch (pNode->node.resType.type) { + case TSDB_DATA_TYPE_BOOL: + void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE); + if (NULL == buf) { + return NULL; + } + + sprintf(buf, "%s", pNode->datum.b ? "true" : "false"); + return buf; + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_BIGINT: + case TSDB_DATA_TYPE_TIMESTAMP: { + void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE); + if (NULL == buf) { + return NULL; + } + + sprintf(buf, "%" PRId64, pNode->datum.i); + return buf; + } + case TSDB_DATA_TYPE_UTINYINT: + case TSDB_DATA_TYPE_USMALLINT: + case TSDB_DATA_TYPE_UINT: + case TSDB_DATA_TYPE_UBIGINT: { + void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE); + if (NULL == buf) { + return NULL; + } + + sprintf(buf, "%" PRIu64, pNode->datum.u); + return buf; + } + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE); + if (NULL == buf) { + return NULL; + } + + sprintf(buf, "%e", pNode->datum.d); + return buf; + } + case TSDB_DATA_TYPE_NCHAR: + case TSDB_DATA_TYPE_VARCHAR: + case TSDB_DATA_TYPE_VARBINARY: { + void *buf = taosMemoryMalloc(varDataLen(pNode->datum.p) + 1); + if (NULL == buf) { + return NULL; + } + + strncpy(buf, varDataVal(pNode->datum.p), varDataLen(pNode->datum.p) + 1); + return buf; + } + default: + break; + } + + return NULL; +} + bool nodesIsExprNode(const SNode* pNode) { ENodeType type = nodeType(pNode); return (QUERY_NODE_COLUMN == type || QUERY_NODE_VALUE == type || QUERY_NODE_OPERATOR == type || QUERY_NODE_FUNCTION == type); diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index 2de0c89a13..7b6b7d44ef 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -22,8 +22,24 @@ extern "C" { #define QUERY_EXPLAIN_MAX_RES_LEN 1024 -#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s" -#define EXPLAIN_ORDER_FORMAT "Order: " +#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d" +#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d" +#define EXPLAIN_SYSTBL_SCAN_FORMAT "System table scan on %s columns=%d" +#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d" +#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d" +#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d groups=%d width=%d" +#define EXPLAIN_EXCHANGE_FORMAT "Exchange %d:1 width=%d" +#define EXPLAIN_SORT_FORMAT "Sort on %d columns width=%d" +#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%d%c offset=%d%c sliding=%d%c width=%d" +#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d" + +#define EXPLAIN_ORDER_FORMAT "Order: %s" +#define EXPLAIN_FILTER_FORMAT "Filter: " +#define EXPLAIN_FILL_FORMAT "Fill: %s" +#define EXPLAIN_ON_CONDITIONS_FORMAT "ON Conditions: " +#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]" +#define EXPLAIN_LOOPS_FORMAT "loops %d" +#define EXPLAIN_REVERSE_FORMAT "reverse %d" typedef struct SQueryExplainRowInfo { int32_t level; @@ -31,9 +47,11 @@ typedef struct SQueryExplainRowInfo { char *buf; } SQueryExplainRowInfo; +#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") +#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") -#define QUERY_EXPLAIN_NEWLINE(_format, ...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN, _format, __VA_ARGS__) -#define QUERY_EXPLAIN_APPEND(_format, ...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, _format, __VA_ARGS__) +#define QUERY_EXPLAIN_NEWLINE(...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN, __VA_ARGS__) +#define QUERY_EXPLAIN_APPEND(...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__) #define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) diff --git a/source/libs/qcom/src/queryExplain.c b/source/libs/qcom/src/queryExplain.c index d547e67170..d6da9ab0e2 100644 --- a/source/libs/qcom/src/queryExplain.c +++ b/source/libs/qcom/src/queryExplain.c @@ -19,22 +19,124 @@ #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wformat-truncation" -void qFreeExplainRes(SPhysiNodeExplainRes *res) { +void qFreeExplainRes(SExplainResNode *res) { } +char *qFillModeString(EFillMode mode) { + switch (mode) { + case FILL_MODE_NONE: + return "none"; + case FILL_MODE_VALUE: + return "value"; + case FILL_MODE_PREV: + return "prev"; + case FILL_MODE_NULL: + return "null"; + case FILL_MODE_LINEAR: + return "linear"; + case FILL_MODE_NEXT: + return "next"; + default: + return "unknown"; + } +} + +char *qGetNameFromColumnNode(SNode *pNode) { + if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) { + return "NULL"; + } + + return ((SColumnNode *)pNode)->colName; +} + int32_t qMakeExplainResChildrenInfo(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) { + int32_t tlen = 0; + SNodeList *pPhysiChildren = NULL; + + switch (pNode->type) { + case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: { + STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode; + pPhysiChildren = pTagScanNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:{ + STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; + pPhysiChildren = pTblScanNode->scan.node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:{ + SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode; + pPhysiChildren = pSTblScanNode->scan.node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ + SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; + pPhysiChildren = pPrjNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_JOIN:{ + SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + pPhysiChildren = pJoinNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; + pPhysiChildren = pAggNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ + SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; + pPhysiChildren = pExchNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SORT:{ + SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; + pPhysiChildren = pSortNode->node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; + pPhysiChildren = pIntNode->window.node.pChildren; + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{ + SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode; + pPhysiChildren = pSessNode->window.node.pChildren; + break; + } + default: + qError("not supported physical node type %d", pNode->type); + QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (pPhysiChildren) { + *pChildren = nodesMakeList(); + if (NULL == *pChildren) { + qError("nodesMakeList failed"); + QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + SNode* node = NULL; + SExplainResNode *pResNode = NULL; + FOREACH(node, pPhysiChildren) { + QRY_ERR_RET(qMakeExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode)); + QRY_ERR_RET(nodesListAppend(*pChildren, pResNode)); + } + + return TSDB_CODE_SUCCESS; } -int32_t qMakeExplainResNode(SPhysiNode *pNode, void *pExecInfo, SPhysiNodeExplainResNode **pRes) { +int32_t qMakeExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) { if (NULL == pNode) { *pRes = NULL; qError("physical node is NULL"); return TSDB_CODE_QRY_APP_ERROR; } - SPhysiNodeExplainResNode *res = calloc(1, sizeof(SPhysiNodeExplainResNode)); + SExplainResNode *res = taosMemoryCalloc(1, sizeof(SExplainResNode)); if (NULL == res) { qError("calloc SPhysiNodeExplainRes failed"); return TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -56,7 +158,7 @@ _return: QRY_RET(code); } -int32_t qMakeTaskExplainResTree(struct SSubplan *plan, void *pExecTree, SPhysiNodeExplainResNode **pRes) { +int32_t qMakeTaskExplainResTree(struct SSubplan *plan, void *pExecTree, SExplainResNode **pRes) { char *tbuf = taosMemoryMalloc(QUERY_EXPLAIN_MAX_RES_LEN); if (NULL == tbuf) { qError("malloc size %d failed", QUERY_EXPLAIN_MAX_RES_LEN); @@ -71,7 +173,7 @@ int32_t qMakeTaskExplainResTree(struct SSubplan *plan, void *pExecTree, SPhysiNo QRY_RET(code); } -int32_t qExplainResNodeAppendExecInfo(void *pExecInfo, char *tbuf) { +int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t tlen) { } @@ -96,7 +198,7 @@ int32_t qExplainResAppendRow(SArray *pRows, char *tbuf, int32_t len, int32_t lev } -int32_t qExplainResNodeToRowsImpl(SPhysiNodeExplainResNode *pResNode, SArray *pRows, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SArray *pRows, char *tbuf, int32_t level) { int32_t tlen = 0; SPhysiNode* pNode = pResNode->pNode; if (NULL == pNode) { @@ -107,25 +209,177 @@ int32_t qExplainResNodeToRowsImpl(SPhysiNodeExplainResNode *pResNode, SArray *pR switch (pNode->type) { case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: { STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode; - QUERY_EXPLAIN_NEWLINE(EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname); + QUERY_EXPLAIN_NEWLINE(EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length); if (pResNode->pExecInfo) { - QRY_ERR_RET(qExplainResNodeAppendExecInfo(pResNode->pExecInfo, tbuf)); + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf)); + } + QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count); + if (pTagScanNode->reverse) { + QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse); } QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + break; } - case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: - case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN: - case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN: - case QUERY_NODE_PHYSICAL_PLAN_PROJECT: - case QUERY_NODE_PHYSICAL_PLAN_JOIN: - case QUERY_NODE_PHYSICAL_PLAN_AGG: - case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: - case QUERY_NODE_PHYSICAL_PLAN_SORT: - case QUERY_NODE_PHYSICAL_PLAN_INTERVAL: - case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW: - case QUERY_NODE_PHYSICAL_PLAN_DISPATCH: - case QUERY_NODE_PHYSICAL_PLAN_INSERT: + case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:{ + STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count); + if (pTblScanNode->scan.reverse) { + QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + + QUERY_EXPLAIN_NEWLINE(EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + + if (pTblScanNode->pScanConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->pScanConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:{ + SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QUERY_EXPLAIN_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count); + if (pSTblScanNode->scan.reverse) { + QUERY_EXPLAIN_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + QUERY_EXPLAIN_NEWLINE(EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + break; + } + case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ + SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pPrjNode->node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_JOIN:{ + SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pJoinNode->node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + + QUERY_EXPLAIN_NEWLINE(EXPLAIN_ON_CONDITIONS_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + break; + } + case QUERY_NODE_PHYSICAL_PLAN_AGG:{ + SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pAggNode->node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ + SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pExchNode->node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SORT:{ + SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pSortNode->node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ + SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, + pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pIntNode->pFill) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + + if (pIntNode->window.node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{ + SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode; + QUERY_EXPLAIN_NEWLINE(EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->resultRowSize); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, tlen)); + } + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level)); + + if (pIntNode->window.node.pConditions) { + QUERY_EXPLAIN_NEWLINE(EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, QUERY_EXPLAIN_MAX_RES_LEN, &tlen)); + QRY_ERR_RET(qExplainResAppendRow(pRows, tbuf, tlen, level + 1)); + } + break; + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_QRY_APP_ERROR; @@ -135,7 +389,7 @@ int32_t qExplainResNodeToRowsImpl(SPhysiNodeExplainResNode *pResNode, SArray *pR } -int32_t qExplainResNodeToRows(SPhysiNodeExplainResNode *pResNode, SArray *pRsp, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SArray *pRsp, char *tbuf, int32_t level) { if (NULL == pResNode) { qError("explain res node is NULL"); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); @@ -146,14 +400,22 @@ int32_t qExplainResNodeToRows(SPhysiNodeExplainResNode *pResNode, SArray *pRsp, SNode* pNode = NULL; FOREACH(pNode, pResNode->pChildren) { - QRY_ERR_RET(qExplainResNodeToRows((SPhysiNodeExplainResNode *)pNode, pRsp, tbuf, level + 1)); + QRY_ERR_RET(qExplainResNodeToRows((SExplainResNode *)pNode, pRsp, tbuf, level + 1)); } return TSDB_CODE_SUCCESS; } +int32_t qExplainRowsToRsp(SArray *rows, SRetrieveTableRsp **pRsp) { + int32_t rspSize = sizeof(SRetrieveTableRsp) + ; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); + if (NULL == rsp) { + qError("malloc SRetrieveTableRsp failed"); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } +} -int32_t qMakeTaskExplainResRows(SPhysiNodeExplainResNode *pResNode, SRetrieveTableRsp **pRsp) { +int32_t qMakeTaskExplainResRows(SExplainResNode *pResNode, SRetrieveTableRsp **pRsp) { if (NULL == pResNode) { qError("explain res node is NULL"); QRY_RET(TSDB_CODE_QRY_APP_ERROR); @@ -174,22 +436,11 @@ int32_t qMakeTaskExplainResRows(SPhysiNodeExplainResNode *pResNode, SRetrieveTab QRY_ERR_JRET(qExplainResNodeToRows(pResNode, rows, tbuf, 0)); - int32_t rspSize = sizeof(SRetrieveTableRsp) + ; - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize); - if (NULL == rsp) { - qError("malloc SRetrieveTableRsp failed"); - QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - QRY_ERR_JRET(qExplainRowsToRsp(rows, rsp)); - - *pRsp = rsp; - rsp = NULL; + QRY_ERR_JRET(qExplainRowsToRsp(rows, pRsp)); _return: taosMemoryFree(tbuf); - taosMemoryFree(rsp); taosArrayDestroy(rows); QRY_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 5af13d97ca..fdb0b6498c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -2126,19 +2126,24 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD tsem_init(&pJob->rspSem, 0, 0); - pJob->refId = taosAddRef(schMgmt.jobRef, pJob); - if (pJob->refId < 0) { - SCH_JOB_ELOG("taosHashPut job failed, error:%s", tstrerror(terrno)); + int64_t refId = taosAddRef(schMgmt.jobRef, pJob); + if (refId < 0) { + SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_ERR_JRET(terrno); } + if (NULL == schAcquireJob(refId)) { + SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); + SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + pJob->refId = refId; + SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); pJob->status = JOB_TASK_STATUS_NOT_START; SCH_ERR_JRET(schLaunchJob(pJob)); - schAcquireJob(pJob->refId); - *job = pJob->refId; if (syncSchedule) { -- GitLab