From 236b8adad3d7e460b1dca075b057fb082e2ce5eb Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 31 Mar 2022 16:00:47 +0800 Subject: [PATCH] feature/qnode --- include/common/ttime.h | 1 + include/libs/qcom/query.h | 7 +- include/os/osTime.h | 8 + source/common/src/ttime.c | 29 +- source/libs/planner/src/planLogicCreater.c | 2 +- source/libs/qcom/inc/queryInt.h | 52 ++-- source/libs/qcom/src/queryExplain.c | 292 ++++++++++++--------- source/libs/scheduler/inc/schedulerInt.h | 6 +- source/libs/scheduler/src/scheduler.c | 42 +-- 9 files changed, 264 insertions(+), 175 deletions(-) diff --git a/include/common/ttime.h b/include/common/ttime.h index 57af24e635..2209cc998f 100644 --- a/include/common/ttime.h +++ b/include/common/ttime.h @@ -62,6 +62,7 @@ int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t t void deltaToUtcInitOnce(); int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision); +int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit); void taosFormatUtcTime(char *buf, int32_t bufLen, int64_t time, int32_t precision); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index dd2dccc02d..5908b0b875 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -174,9 +174,10 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_ int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); char *jobTaskStatusStr(int32_t status); -int32_t qAppendTaskExplainResRows(void **pRowCtx, void *plan, void *pExecTree, int32_t level); -int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp); -void qFreeExplainRowCtx(void *ctx); +int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose); +int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level); +int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp); +void qFreeExplainCtx(void *ctx); SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name); diff --git a/include/os/osTime.h b/include/os/osTime.h index 9e426455dc..766fec0fbd 100644 --- a/include/os/osTime.h +++ b/include/os/osTime.h @@ -46,6 +46,14 @@ extern "C" { #define MILLISECOND_PER_DAY (MILLISECOND_PER_HOUR * 24) #define MILLISECOND_PER_WEEK (MILLISECOND_PER_DAY * 7) +#define NANOSECOND_PER_USEC (1000L) +#define NANOSECOND_PER_MSEC (1000000L) +#define NANOSECOND_PER_SEC (1000000000L) +#define NANOSECOND_PER_MINUTE (NANOSECOND_PER_SEC * 60) +#define NANOSECOND_PER_HOUR (NANOSECOND_PER_MINUTE * 60) +#define NANOSECOND_PER_DAY (NANOSECOND_PER_HOUR * 24) +#define NANOSECOND_PER_WEEK (NANOSECOND_PER_DAY * 7) + int32_t taosGetTimeOfDay(struct timeval *tv); //@return timestamp in second diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 510a2809e7..23b19b55e7 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -370,6 +370,33 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec return (int64_t)((double)time * factors[fromPrecision][toPrecision]); } +int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit) { + assert(fromPrecision == TSDB_TIME_PRECISION_MILLI || fromPrecision == TSDB_TIME_PRECISION_MICRO || + fromPrecision == TSDB_TIME_PRECISION_NANO); + static double factors[3] = {1000000., 1000., 1.}; + switch (toUnit) { + case 's': + return time * factors[fromPrecision] / NANOSECOND_PER_SEC; + case 'm': + return time * factors[fromPrecision] / NANOSECOND_PER_MINUTE; + case 'h': + return time * factors[fromPrecision] / NANOSECOND_PER_HOUR; + case 'd': + return time * factors[fromPrecision] / NANOSECOND_PER_DAY; + case 'w': + return time * factors[fromPrecision] / NANOSECOND_PER_WEEK; + case 'a': + return time * factors[fromPrecision] / NANOSECOND_PER_MSEC; + case 'u': + return time * factors[fromPrecision] / NANOSECOND_PER_USEC; + case 'b': + return time * factors[fromPrecision]; + default: { + return -1; + } + } +} + static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) { switch (unit) { case 's': @@ -688,4 +715,4 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm); tstrncpy(buf, ts, bufLen); -} \ No newline at end of file +} diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 7f6d8826e6..f57f476d51 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -288,7 +288,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect // set the output if (TSDB_CODE_SUCCESS == code) { pJoin->node.pTargets = nodesCloneList(pLeft->pTargets); - if (NULL == pJoin->pOnConditions) { + if (NULL == pJoin->node.pTargets) { code = TSDB_CODE_OUT_OF_MEMORY; } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qcom/inc/queryInt.h b/source/libs/qcom/inc/queryInt.h index 510ba92a86..06d5b07dc7 100644 --- a/source/libs/qcom/inc/queryInt.h +++ b/source/libs/qcom/inc/queryInt.h @@ -23,25 +23,34 @@ extern "C" { #include "plannodes.h" //newline area -#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d" -#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d" -#define EXPLAIN_SYSTBL_SCAN_FORMAT "System table scan on %s columns=%d" +#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d" +#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d" +#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d" #define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d" #define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d" -#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d groups=%d width=%d" -#define EXPLAIN_EXCHANGE_FORMAT "Exchange %d:1 width=%d" -#define EXPLAIN_SORT_FORMAT "Sort on %d columns width=%d" -#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d" +#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d" +#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d" +#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d" +#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d" #define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d" #define EXPLAIN_ORDER_FORMAT "Order: %s" #define EXPLAIN_FILTER_FORMAT "Filter: " #define EXPLAIN_FILL_FORMAT "Fill: %s" #define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: " -#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]" +#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]" //append area -#define EXPLAIN_LOOPS_FORMAT "loops=%d" -#define EXPLAIN_REVERSE_FORMAT "reverse=%d" +#define EXPLAIN_GROUPS_FORMAT " groups=%d" +#define EXPLAIN_WIDTH_FORMAT " width=%d" +#define EXPLAIN_LOOPS_FORMAT " loops=%d" +#define EXPLAIN_REVERSE_FORMAT " reverse=%d" + +//TODO MOVE TO LIB +typedef struct SExplainGroup { + int32_t nodeNum; + SSubplan *plan; + void *execInfo; //TODO +} SExplainGroup; typedef struct SExplainResNode { @@ -56,22 +65,27 @@ typedef struct SQueryExplainRowInfo { char *buf; } SQueryExplainRowInfo; -typedef struct SExplainRowCtx { - int32_t totalSize; - SArray *rows; -} SExplainRowCtx; +typedef struct SExplainCtx { + int32_t totalSize; + bool verbose; + char *tbuf; + SArray *rows; + SHashObj *groupHash; +} SExplainCtx; #define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending") #define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") -#define EXPLAIN_ROW_NEW(level, ...) \ - do { \ - tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s", (level) * 2, ""); \ - tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \ +#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) + +#define EXPLAIN_ROW_NEW(level, ...) \ + do { \ + tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", (isVerboseLine ? "" : "-> ")); \ + tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \ } while (0) #define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__) -#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0) +#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0) #define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) diff --git a/source/libs/qcom/src/queryExplain.c b/source/libs/qcom/src/queryExplain.c index b2b061cee0..af092e842e 100644 --- a/source/libs/qcom/src/queryExplain.c +++ b/source/libs/qcom/src/queryExplain.c @@ -36,22 +36,62 @@ void qFreeExplainResTree(SExplainResNode *res) { taosMemoryFreeClear(res); } -void qFreeExplainRowCtx(void *ctx) { +void qFreeExplainCtx(void *ctx) { if (NULL == ctx) { return; } - SExplainRowCtx *pCtx = (SExplainRowCtx *)ctx; + SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowSize = taosArrayGetSize(pCtx->rows); for (int32_t i = 0; i < rowSize; ++i) { SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i); taosMemoryFreeClear(row->buf); } + taosHashCleanup(pCtx->groupHash); taosArrayDestroy(pCtx->rows); taosMemoryFree(pCtx); } +int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) { + int32_t code = 0; + SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx)); + if (NULL == ctx) { + qError("calloc SExplainCtx failed"); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); + if (NULL == rows) { + qError("taosArrayInit SQueryExplainRowInfo failed"); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + char *tbuf = taosMemoryMalloc(TSDB_EXPLAIN_RESULT_ROW_SIZE); + if (NULL == tbuf) { + qError("malloc size %d failed", TSDB_EXPLAIN_RESULT_ROW_SIZE); + QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + ctx->verbose = verbose; + ctx->tbuf = tbuf; + ctx->rows = rows; + ctx->groupHash = groupHash; + + *pCtx = ctx; + + return TSDB_CODE_SUCCESS; + +_return: + + taosArrayDestroy(rows); + taosHashCleanup(groupHash); + taosMemoryFree(ctx); + + QRY_RET(code); +} + + char *qFillModeString(EFillMode mode) { switch (mode) { case FILL_MODE_NONE: @@ -187,11 +227,6 @@ _return: QRY_RET(code); } -int32_t qGenerateExplainResNodeTree(struct SSubplan *plan, void *pExecTree, SExplainResNode **pRes) { - void *pExecInfo = NULL; // TODO - QRY_RET(qGenerateExplainResNode(plan->pNode, pExecInfo, pRes)); -} - int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t *len) { int32_t tlen = *len; @@ -202,7 +237,7 @@ int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t *len) { return TSDB_CODE_SUCCESS; } -int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32_t level) { +int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) { SQueryExplainRowInfo row = {0}; row.buf = taosMemoryMalloc(len); if (NULL == row.buf) { @@ -225,8 +260,11 @@ int32_t qExplainResAppendRow(SExplainRowCtx *ctx, char *tbuf, int32_t len, int32 } -int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) { int32_t tlen = 0; + bool isVerboseLine = false; + char *tbuf = ctx->tbuf; + bool verbose = ctx->verbose; SPhysiNode* pNode = pResNode->pNode; if (NULL == pNode) { qError("pyhsical node in explain res node is NULL"); @@ -246,10 +284,12 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN: @@ -265,20 +305,22 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pTblScanNode->scan.node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pTblScanNode->scan.node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pTblScanNode->scan.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } @@ -294,10 +336,12 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } break; } case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{ @@ -309,11 +353,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pPrjNode->node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pPrjNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } @@ -326,51 +372,70 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pJoinNode->node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + if (verbose) { + if (pJoinNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } - - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pOnConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_AGG:{ SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length, pAggNode->pGroupKeys->length, pAggNode->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length); + if (pAggNode->pGroupKeys) { + EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pAggNode->pGroupKeys->length); + } + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->outputRowSize); + if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pAggNode->node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pAggNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:{ SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode; - EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->pSrcEndPoints->length, pExchNode->node.pOutputDataBlockDesc->outputRowSize); + SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId)); + if (NULL == group) { + qError("exchange src group %d not in groupHash", pExchNode->srcGroupId); + QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum, pExchNode->node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pExchNode->node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pExchNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } + + QRY_ERR_RET(qAppendTaskExplainResRows(ctx, pExchNode->srcGroupId, level + 1)); break; } case QUERY_NODE_PHYSICAL_PLAN_SORT:{ @@ -382,35 +447,42 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pSortNode->node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pSortNode->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{ SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode; EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length, - pIntNode->interval, pIntNode->intervalUnit, pIntNode->offset, pIntNode->intervalUnit, pIntNode->sliding, pIntNode->slidingUnit, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize); + INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit, + INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->offset, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit, + INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit, + pIntNode->window.node.pOutputDataBlockDesc->outputRowSize); if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); } EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pIntNode->pFill) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - } - - if (pIntNode->window.node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pIntNode->pFill) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + + if (pIntNode->window.node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } @@ -423,11 +495,13 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); - if (pIntNode->window.node.pConditions) { - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); - QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + if (verbose) { + if (pIntNode->window.node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } } break; } @@ -440,88 +514,48 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainRowCtx *ctx } -int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainRowCtx *ctx, char *tbuf, int32_t level) { +int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32_t level) { if (NULL == pResNode) { qError("explain res node is NULL"); QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } int32_t code = 0; - QRY_ERR_RET(qExplainResNodeToRowsImpl(pResNode, ctx, tbuf, level)); + QRY_ERR_RET(qExplainResNodeToRowsImpl(pResNode, ctx, level)); SNode* pNode = NULL; FOREACH(pNode, pResNode->pChildren) { - QRY_ERR_RET(qExplainResNodeToRows((SExplainResNode *)pNode, ctx, tbuf, level + 1)); + QRY_ERR_RET(qExplainResNodeToRows((SExplainResNode *)pNode, ctx, level + 1)); } return TSDB_CODE_SUCCESS; } -int32_t qGenerateExplainResRowsFromNode(SExplainResNode *pResNode, SExplainRowCtx *ctx, int32_t level) { - if (NULL == pResNode) { - qError("explain res node is NULL"); - QRY_RET(TSDB_CODE_QRY_APP_ERROR); - } - - int32_t code = 0; - char *tbuf = taosMemoryMalloc(TSDB_EXPLAIN_RESULT_ROW_SIZE); - if (NULL == tbuf) { - qError("malloc size %d failed", TSDB_EXPLAIN_RESULT_ROW_SIZE); - QRY_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - QRY_ERR_JRET(qExplainResNodeToRows(pResNode, ctx, tbuf, level)); - -_return: - - taosMemoryFree(tbuf); - - QRY_RET(code); -} - -int32_t qAppendTaskExplainResRows(void **pRowCtx, void *plan, void *pExecTree, int32_t level) { +int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) { SExplainResNode *node = NULL; int32_t code = 0; - struct SSubplan *pPlan = (struct SSubplan *)plan; - SExplainRowCtx *ctx = (SExplainRowCtx *)*pRowCtx; - - QRY_ERR_RET(qGenerateExplainResNodeTree(pPlan, pExecTree, &node)); - - if (NULL == *pRowCtx) { - *pRowCtx = taosMemoryCalloc(1, sizeof(SExplainRowCtx)); - if (NULL == *pRowCtx) { - qError("calloc SExplainRowCtx failed"); - QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - ctx = (SExplainRowCtx *)*pRowCtx; + SExplainCtx *ctx = (SExplainCtx *)pCtx; - SArray *rows = taosArrayInit(10, sizeof(SQueryExplainRowInfo)); - if (NULL == rows) { - qError("taosArrayInit SQueryExplainRowInfo failed"); - QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } - - ctx->rows = rows; + SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId)); + if (NULL == group) { + qError("group %d not in groupHash", groupId); + QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } + + QRY_ERR_RET(qGenerateExplainResNode(group->plan->pNode, group->execInfo, &node)); - QRY_ERR_JRET(qGenerateExplainResRowsFromNode(node, ctx, level)); + QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level)); _return: qFreeExplainResTree(node); - - if (code) { - qFreeExplainRowCtx(*pRowCtx); - *pRowCtx = NULL; - } QRY_RET(code); } -int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp) { - SExplainRowCtx *pCtx = (SExplainRowCtx *)ctx; +int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) { + SExplainCtx *pCtx = (SExplainCtx *)ctx; int32_t rowNum = taosArrayGetSize(pCtx->rows); if (rowNum <= 0) { qError("empty explain res rows"); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 1b2c7d54e2..f19e822974 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -38,10 +38,12 @@ enum { SCH_WRITE, }; -typedef struct SSchExplainGroup { +//TODO MOVE TO LIB +typedef struct SExplainGroup { int32_t nodeNum; SSubplan *plan; -} SSchExplainGroup; + void *execInfo; //TODO +} SExplainGroup; typedef struct SSchTrans { void *transInst; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4a84536d00..940aaed7c1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -478,7 +478,14 @@ _return: int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) { int32_t code = 0; + SNodeListNode *plans = NULL; + int32_t taskNum = 0; + SExplainGroup *pGroup = NULL; + void *pCtx = NULL; + int32_t rootGroupId = 0; + pJob->queryId = pDag->queryId; + pJob->subPlans = pDag->pSubplans; if (pDag->numOfSubplans <= 0) { SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans); @@ -497,12 +504,7 @@ int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - pJob->subPlans = pDag->pSubplans; - - SNodeListNode *plans = NULL; - int32_t taskNum = 0; - SSchExplainGroup *pGroup = NULL; - void *rowCtx = NULL; + SCH_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose)); for (int32_t i = 0; i < levelNum; ++i) { plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i); @@ -517,44 +519,44 @@ int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + SSubplan *plan = NULL; for (int32_t n = 0; n < taskNum; ++n) { - SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); + plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n); pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId)); if (pGroup) { ++pGroup->nodeNum; continue; } - SSchExplainGroup group = {.nodeNum = 1, .plan = plan}; + SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL}; if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) { SCH_JOB_ELOG("taosHashPut to explainGroupHash failed, taskIdx:%d", n); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } - void *pIter = taosHashIterate(groupHash, NULL); - while (pIter) { - pGroup = (SSchExplainGroup *)pIter; + if (0 == i) { + if (taskNum > 1) { + SCH_JOB_ELOG("invalid taskNum %d for level 0", taskNum); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } - SCH_ERR_JRET(qAppendTaskExplainResRows(&rowCtx, pGroup->plan, NULL, i)); - - pIter = taosHashIterate(groupHash, pIter); + rootGroupId = plan->id.groupId; } - taosHashClear(groupHash); - - SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum); + SCH_JOB_DLOG("level %d group handled, taskNum:%d", i, taskNum); } + SCH_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0)); + SRetrieveTableRsp *pRsp = NULL; - SCH_ERR_JRET(qGetExplainRspFromRowCtx(rowCtx, &pRsp)); + SCH_ERR_JRET(qGetExplainRspFromCtx(pCtx, &pRsp)); pJob->resData = pRsp; _return: - taosHashCleanup(groupHash); - qFreeExplainRowCtx(rowCtx); + qFreeExplainCtx(pCtx); SCH_RET(code); } -- GitLab