提交 2c6dec89 编写于 作者: D dapan1121

feature/qnode

上级 27a3a1d7
...@@ -919,6 +919,7 @@ typedef struct SExplainExecInfo { ...@@ -919,6 +919,7 @@ typedef struct SExplainExecInfo {
uint64_t startupCost; uint64_t startupCost;
uint64_t totalCost; uint64_t totalCost;
uint64_t numOfRows; uint64_t numOfRows;
void *verboseInfo;
} SExplainExecInfo; } SExplainExecInfo;
typedef struct { typedef struct {
......
...@@ -40,7 +40,7 @@ extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t op ...@@ -40,7 +40,7 @@ extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t op
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols); extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param); extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win); extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar); extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo); extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info); extern void filterFreeInfo(SFilterInfo *info);
......
...@@ -58,6 +58,7 @@ extern "C" { ...@@ -58,6 +58,7 @@ extern "C" {
#define EXPLAIN_LOOPS_FORMAT "loops=%d" #define EXPLAIN_LOOPS_FORMAT "loops=%d"
#define EXPLAIN_REVERSE_FORMAT "reverse=%d" #define EXPLAIN_REVERSE_FORMAT "reverse=%d"
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d" #define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
#define EXPLAIN_EXECINFO_FORMAT "cost=%" PRIu64 "..%" PRIu64 " rows=%" PRIu64
typedef struct SExplainGroup { typedef struct SExplainGroup {
int32_t nodeNum; int32_t nodeNum;
......
...@@ -228,7 +228,10 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai ...@@ -228,7 +228,10 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplai
int32_t code = 0; int32_t code = 0;
resNode->pNode = pNode; resNode->pNode = pNode;
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group));
if (group->nodeExecInfo) {
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group));
}
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren)); QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren));
...@@ -247,14 +250,52 @@ _return: ...@@ -247,14 +250,52 @@ _return:
int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) { int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
int32_t tlen = *len; int32_t tlen = *len;
int32_t nodeNum = taosArrayGetSize(pExecInfo);
SExplainExecInfo maxExecInfo = {0};
EXPLAIN_ROW_APPEND("(exec info here)"); for (int32_t i = 0; i < nodeNum; ++i) {
SExplainExecInfo *execInfo = taosArrayGet(pExecInfo, i);
if (execInfo->startupCost > maxExecInfo.startupCost) {
maxExecInfo.startupCost = execInfo->startupCost;
}
if (execInfo->totalCost > maxExecInfo.totalCost) {
maxExecInfo.totalCost = execInfo->totalCost;
}
if (execInfo->numOfRows > maxExecInfo.numOfRows) {
maxExecInfo.numOfRows = execInfo->numOfRows;
}
}
EXPLAIN_ROW_APPEND(EXPLAIN_EXECINFO_FORMAT, maxExecInfo.startupCost, maxExecInfo.totalCost, maxExecInfo.numOfRows);
*len = tlen; *len = tlen;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qExplainBufAppendVerboseExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
int32_t tlen = 0;
bool gotVerbose = false;
int32_t nodeNum = taosArrayGetSize(pExecInfo);
SExplainExecInfo maxExecInfo = {0};
for (int32_t i = 0; i < nodeNum; ++i) {
SExplainExecInfo *execInfo = taosArrayGet(pExecInfo, i);
if (execInfo->verboseInfo) {
gotVerbose = true;
}
}
if (gotVerbose) {
EXPLAIN_ROW_APPEND("exec verbose info");
}
*len = tlen;
return TSDB_CODE_SUCCESS;
}
int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) { int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) {
SQueryExplainRowInfo row = {0}; SQueryExplainRowInfo row = {0};
row.buf = taosMemoryMalloc(len); row.buf = taosMemoryMalloc(len);
...@@ -322,6 +363,14 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -322,6 +363,14 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order));
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
if (tlen) {
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
} }
break; break;
} }
...@@ -532,8 +581,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -532,8 +581,8 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pExchNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pExchNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
...@@ -710,7 +759,7 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) { ...@@ -710,7 +759,7 @@ int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node)); QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node));
if (group->physiPlanNum != group->physiPlanExecNum) { if ((EXPLAIN_MODE_ANALYZE == ctx->mode) && (group->physiPlanNum != group->physiPlanExecNum)) {
qError("physiPlanNum %d mismatch with physiExecNum %d in group %d", group->physiPlanNum, group->physiPlanExecNum, groupId); qError("physiPlanNum %d mismatch with physiExecNum %d in group %d", group->physiPlanNum, group->physiPlanExecNum, groupId);
QRY_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QRY_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
......
...@@ -237,6 +237,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result ...@@ -237,6 +237,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
typedef void (*__optr_close_fn_t)(void* param, int32_t num); typedef void (*__optr_close_fn_t)(void* param, int32_t num);
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void **pOptrExplain);
typedef struct STaskIdInfo { typedef struct STaskIdInfo {
uint64_t queryId; // this is also a request id uint64_t queryId; // this is also a request id
...@@ -305,26 +306,27 @@ enum { ...@@ -305,26 +306,27 @@ enum {
}; };
typedef struct SOperatorInfo { typedef struct SOperatorInfo {
uint8_t operatorType; uint8_t operatorType;
bool blockingOptr; // block operator or not bool blockingOptr; // block operator or not
uint8_t status; // denote if current operator is completed uint8_t status; // denote if current operator is completed
int32_t numOfOutput; // number of columns of the current operator results int32_t numOfOutput; // number of columns of the current operator results
char* name; // name, used to show the query execution plan char* name; // name, used to show the query execution plan
void* info; // extension attribution void* info; // extension attribution
SExprInfo* pExpr; SExprInfo* pExpr;
STaskRuntimeEnv* pRuntimeEnv; // todo remove it STaskRuntimeEnv* pRuntimeEnv; // todo remove it
SExecTaskInfo* pTaskInfo; SExecTaskInfo* pTaskInfo;
SOperatorCostInfo cost; SOperatorCostInfo cost;
SResultInfo resultInfo; SResultInfo resultInfo;
struct SOperatorInfo** pDownstream; // downstram pointer list struct SOperatorInfo** pDownstream; // downstram pointer list
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
__optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_open_fn_t _openFn; // DO NOT invoke this function directly
__optr_fn_t getNextFn; __optr_fn_t getNextFn;
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model. __optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP __optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
__optr_close_fn_t closeFn; __optr_close_fn_t closeFn;
__optr_encode_fn_t encodeResultRow; __optr_encode_fn_t encodeResultRow;
__optr_decode_fn_t decodeResultRow; __optr_decode_fn_t decodeResultRow;
__optr_get_explain_fn_t getExplainFn;
} SOperatorInfo; } SOperatorInfo;
typedef struct { typedef struct {
...@@ -715,7 +717,7 @@ int32_t getMaximumIdleDurationSec(); ...@@ -715,7 +717,7 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum); int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -5585,6 +5585,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -5585,6 +5585,10 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
pOperator->getNextFn = doTableScan; pOperator->getNextFn = doTableScan;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
//pOperator->cost.openCost = 1314;
//pOperator->cost.totalCost = 2324;
//pOperator->resultInfo.totalRows = 3334;
return pOperator; return pOperator;
} }
...@@ -9517,7 +9521,7 @@ void releaseQueryBuf(size_t numOfTables) { ...@@ -9517,7 +9521,7 @@ void releaseQueryBuf(size_t numOfTables) {
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t); atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
} }
int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum) { int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum) {
if (*resNum >= *capacity) { if (*resNum >= *capacity) {
*capacity += 10; *capacity += 10;
...@@ -9528,15 +9532,23 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **p ...@@ -9528,15 +9532,23 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **p
} }
} }
(*pRes)[*resNum].numOfRows = operator->resultInfo.totalRows; (*pRes)[*resNum].numOfRows = operatorInfo->resultInfo.totalRows;
(*pRes)[*resNum].startupCost = operator->cost.openCost; (*pRes)[*resNum].startupCost = operatorInfo->cost.openCost;
(*pRes)[*resNum].totalCost = operator->cost.totalCost; (*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost;
if (operatorInfo->getExplainFn) {
int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
if (code) {
qError("operator getExplainFn failed, error:%s", tstrerror(code));
return code;
}
}
++(*resNum); ++(*resNum);
int32_t code = 0; int32_t code = 0;
for (int32_t i = 0; i < operator->numOfDownstream; ++i) { for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
code = getOperatorExplainExecInfo(operator->pDownstream[i], pRes, capacity, resNum); code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
if (code) { if (code) {
taosMemoryFreeClear(*pRes); taosMemoryFreeClear(*pRes);
return TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY;
......
...@@ -3314,7 +3314,8 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t ...@@ -3314,7 +3314,8 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) { int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
SFilterInfo *info = NULL;
SFilterRange ra = {0}; SFilterRange ra = {0};
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
......
...@@ -234,15 +234,16 @@ TEST(timerangeTest, greater) { ...@@ -234,15 +234,16 @@ TEST(timerangeTest, greater) {
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall); flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
SFilterInfo *filter = NULL; //SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(opNode1, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); //int32_t code = filterInitFromNode(opNode1, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
ASSERT_EQ(code, 0); //ASSERT_EQ(code, 0);
STimeWindow win = {0}; STimeWindow win = {0};
code = filterGetTimeRange(filter, &win); bool isStrict = false;
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, INT64_MAX); ASSERT_EQ(win.ekey, INT64_MAX);
filterFreeInfo(filter); //filterFreeInfo(filter);
nodesDestroyNode(opNode1); nodesDestroyNode(opNode1);
} }
...@@ -263,15 +264,16 @@ TEST(timerangeTest, greater_and_lower) { ...@@ -263,15 +264,16 @@ TEST(timerangeTest, greater_and_lower) {
flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
SFilterInfo *filter = NULL; //SFilterInfo *filter = NULL;
int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
ASSERT_EQ(code, 0); //ASSERT_EQ(code, 0);
STimeWindow win = {0}; STimeWindow win = {0};
code = filterGetTimeRange(filter, &win); bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.skey, tsmall);
ASSERT_EQ(win.ekey, tbig); ASSERT_EQ(win.ekey, tbig);
filterFreeInfo(filter); //filterFreeInfo(filter);
nodesDestroyNode(logicNode); nodesDestroyNode(logicNode);
} }
......
...@@ -199,6 +199,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m ...@@ -199,6 +199,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
int32_t reqMsgType = msgType - 1; int32_t reqMsgType = msgType - 1;
switch (msgType) { switch (msgType) {
case TDMT_SCH_LINK_BROKEN: case TDMT_SCH_LINK_BROKEN:
case TDMT_VND_EXPLAIN_RSP:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) { if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
...@@ -1187,6 +1188,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1187,6 +1188,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
SCH_ERR_JRET(schFetchFromRemote(pJob)); SCH_ERR_JRET(schFetchFromRemote(pJob));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册