提交 6982f2b7 编写于 作者: D dapan1121

feature/qnode

上级 c8e13205
......@@ -210,6 +210,8 @@ int32_t nodesStringToNode(const char* pStr, SNode** pNode);
int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int32_t* pLen);
int32_t nodesStringToList(const char* pStr, SNodeList** pList);
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len);
#ifdef __cplusplus
}
#endif
......
......@@ -283,11 +283,23 @@ typedef struct SSubplan {
SDataSinkNode* pDataSink; // data of the subplan flow into the datasink
} SSubplan;
typedef enum EExplainMode {
EXPLAIN_MODE_DISABLE = 1,
EXPLAIN_MODE_STATIC,
EXPLAIN_MODE_ANALYZE
} EExplainMode;
typedef struct SExplainInfo {
EExplainMode mode;
bool verbose;
} SExplainInfo;
typedef struct SQueryPlan {
ENodeType type;
uint64_t queryId;
int32_t numOfSubplans;
SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0.
SExplainInfo explainInfo;
} SQueryPlan;
#ifdef __cplusplus
......
......@@ -300,6 +300,7 @@ bool nodesIsTimeorderQuery(const SNode* pQuery);
bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode *pNode);
char* nodesGetStrValueFromNode(SValueNode *pNode);
#ifdef __cplusplus
}
......
......@@ -53,11 +53,6 @@ typedef struct SIndexMeta {
} SIndexMeta;
typedef struct SExplainResNode {
SNodeList* pChildren;
SPhysiNode* pNode;
void* pExecInfo;
} SExplainResNode;
/*
* ASSERT(sizeof(SCTableMeta) == 24)
......@@ -179,6 +174,10 @@ bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta);
char *jobTaskStatusStr(int32_t status);
int32_t qAppendTaskExplainResRows(void **pRowCtx, struct SSubplan *plan, void *pExecTree, int32_t level);
int32_t qGetExplainRspFromRowCtx(void *ctx, SRetrieveTableRsp **pRsp);
void qFreeExplainRowCtx(void *ctx);
SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen);
......
......@@ -224,6 +224,7 @@ typedef enum ELogicConditionType {
#define TSDB_APP_NAME_LEN TSDB_UNI_LEN
#define TSDB_STB_COMMENT_LEN 1024
/**
* In some scenarios uint16_t (0~65535) is used to store the row len.
* - Firstly, we use 65531(65535 - 4), as the SDataRow/SKVRow contains 4 bits header.
......@@ -471,6 +472,9 @@ enum {
#define QND_VGID 1
#define VND_VGID 0
#define MAX_NUM_STR_SIZE 40
#ifdef __cplusplus
}
#endif
......
......@@ -20,12 +20,16 @@
extern "C" {
#endif
#define nodesFatal(param, ...) qFatal("NODES: " param, __VA_ARGS__)
#define nodesError(param, ...) qError("NODES: " param, __VA_ARGS__)
#define nodesWarn(param, ...) qWarn("NODES: " param, __VA_ARGS__)
#define nodesInfo(param, ...) qInfo("NODES: " param, __VA_ARGS__)
#define nodesDebug(param, ...) qDebug("NODES: " param, __VA_ARGS__)
#define nodesTrace(param, ...) qTrace("NODES: " param, __VA_ARGS__)
#define nodesFatal(...) qFatal("NODES: " __VA_ARGS__)
#define nodesError(...) qError("NODES: " __VA_ARGS__)
#define nodesWarn(...) qWarn("NODES: " __VA_ARGS__)
#define nodesInfo(...) qInfo("NODES: " __VA_ARGS__)
#define nodesDebug(...) qDebug("NODES: " __VA_ARGS__)
#define nodesTrace(...) qTrace("NODES: " __VA_ARGS__)
#define NODES_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
......
......@@ -50,7 +50,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
char *t = nodesGetStrValueFromNode(colNode);
if (NULL == t) {
nodesError("fail to get str value from valueNode");
return TSDB_CODE_QRY_APP_ERROR;
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf, bufSize - *len, "%s", t);
......@@ -62,18 +62,18 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
SOperatorNode* pOpNode = (SOperatorNode*)pNode;
*len += snprintf(buf, bufSize - *len, "(");
if (pOpNode->pLeft) {
QRY_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len));
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pLeft, buf, bufSize, len));
}
if (pOpNode->opType >= (sizeof(gOperatorStr) / sizeof(gOperatorStr[0]))) {
nodesError("unknown operation type:%d", pOpNode->opType);
return TSDB_CODE_QRY_APP_ERROR;
NODES_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
*len += snprintf(buf, bufSize - *len, " %s ", gOperatorStr[pOpNode->opType]);
if (pOpNode->pRight) {
QRY_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len));
NODES_ERR_RET(nodesNodeToSQL(pOpNode->pRight, buf, bufSize, len));
}
*len += snprintf(buf, bufSize - *len, ")");
......@@ -91,7 +91,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
if (!first) {
*len += snprintf(buf, bufSize - *len, " %s ", gLogicConditionStr[pLogicNode->condType]);
}
QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
......@@ -110,7 +110,7 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
if (!first) {
*len += snprintf(buf, bufSize - *len, ", ");
}
QRY_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
......@@ -120,8 +120,20 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
}
case QUERY_NODE_NODE_LIST:{
SNodeListNode* pListNode = (SNodeListNode *)pNode;
SNode* node = NULL;
bool first = true;
*len += snprintf(buf, bufSize - *len, "(");
//TODO
FOREACH(node, pListNode->pNodeList) {
if (!first) {
*len += snprintf(buf, bufSize - *len, ", ");
}
NODES_ERR_RET(nodesNodeToSQL(node, buf, bufSize, len));
first = false;
}
*len += snprintf(buf, bufSize - *len, ")");
return TSDB_CODE_SUCCESS;
}
......@@ -130,5 +142,5 @@ int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len) {
}
nodesError("nodesNodeToSQL unknown node = %s", nodesNodeName(pNode->type));
return TSDB_CODE_QRY_APP_ERROR;
NODES_RET(TSDB_CODE_QRY_APP_ERROR);
}
......@@ -469,7 +469,7 @@ void* nodesGetValueFromNode(SValueNode *pNode) {
char* nodesGetStrValueFromNode(SValueNode *pNode) {
switch (pNode->node.resType.type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_BOOL: {
void *buf = taosMemoryMalloc(MAX_NUM_STR_SIZE);
if (NULL == buf) {
return NULL;
......@@ -477,6 +477,7 @@ char* nodesGetStrValueFromNode(SValueNode *pNode) {
sprintf(buf, "%s", pNode->datum.b ? "true" : "false");
return buf;
}
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
......
......@@ -8,7 +8,7 @@ target_include_directories(
target_link_libraries(
qcom
PRIVATE os util transport
PRIVATE os util transport nodes
)
if(${BUILD_TEST})
......
......@@ -19,9 +19,12 @@
#ifdef __cplusplus
extern "C" {
#endif
#include "nodes.h"
#include "plannodes.h"
#define QUERY_EXPLAIN_MAX_RES_LEN 1024
//newline area
#define EXPLAIN_TAG_SCAN_FORMAT "Tag scan on %s columns=%d"
#define EXPLAIN_TBL_SCAN_FORMAT "Table scan on %s columns=%d"
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System table scan on %s columns=%d"
......@@ -30,29 +33,47 @@ extern "C" {
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d groups=%d width=%d"
#define EXPLAIN_EXCHANGE_FORMAT "Exchange %d:1 width=%d"
#define EXPLAIN_SORT_FORMAT "Sort on %d columns width=%d"
#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%d%c offset=%d%c sliding=%d%c width=%d"
#define EXPLAIN_INTERVAL_FORMAT "Interval on column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
#define EXPLAIN_ORDER_FORMAT "Order: %s"
#define EXPLAIN_FILTER_FORMAT "Filter: "
#define EXPLAIN_FILL_FORMAT "Fill: %s"
#define EXPLAIN_ON_CONDITIONS_FORMAT "ON Conditions: "
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
#define EXPLAIN_TIMERANGE_FORMAT "Time range: [%" PRId64 ", %" PRId64 "]"
//append area
#define EXPLAIN_LOOPS_FORMAT "loops %d"
#define EXPLAIN_REVERSE_FORMAT "reverse %d"
typedef struct SExplainResNode {
SNodeList* pChildren;
SPhysiNode* pNode;
void* pExecInfo;
} SExplainResNode;
typedef struct SQueryExplainRowInfo {
int32_t level;
int32_t len;
char *buf;
} SQueryExplainRowInfo;
typedef struct SExplainRowCtx {
int32_t totalSize;
SArray *rows;
} SExplainRowCtx;
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define QUERY_EXPLAIN_NEWLINE(...) tlen = snprintf(tbuf, QUERY_EXPLAIN_MAX_RES_LEN, __VA_ARGS__)
#define QUERY_EXPLAIN_APPEND(...) tlen += snprintf(tbuf + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__)
#define EXPLAIN_ROW_NEW(level, ...) \
do { \
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, QUERY_EXPLAIN_MAX_RES_LEN, "%*s", level, ""); \
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__); \
} while (0)
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, QUERY_EXPLAIN_MAX_RES_LEN - tlen, __VA_ARGS__)
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0)
#define QRY_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
#define QRY_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
......
此差异已折叠。
......@@ -36,8 +36,6 @@ extern "C" {
#define FILTER_DUMMY_EMPTY_OPTR 127
#define MAX_NUM_STR_SIZE 40
#define FILTER_RM_UNIT_MIN_ROWS 100
enum {
......
......@@ -38,6 +38,11 @@ enum {
SCH_WRITE,
};
typedef struct SSchExplainGroup {
int32_t nodeNum;
SSubplan *plan;
} SSchExplainGroup;
typedef struct SSchTrans {
void *transInst;
void *transHandle;
......@@ -142,7 +147,7 @@ typedef struct SSchTask {
} SSchTask;
typedef struct SSchJobAttr {
bool needFetch;
bool analyzeExplain;
bool syncSchedule;
bool queryJob;
bool needFlowCtrl;
......
......@@ -476,6 +476,90 @@ _return:
SCH_RET(code);
}
int32_t schValidateAndBuildJobExplain(SQueryPlan *pDag, SSchJob *pJob) {
int32_t code = 0;
pJob->queryId = pDag->queryId;
if (pDag->numOfSubplans <= 0) {
SCH_JOB_ELOG("invalid subplan num:%d", pDag->numOfSubplans);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
int32_t levelNum = (int32_t)LIST_LENGTH(pDag->pSubplans);
if (levelNum <= 0) {
SCH_JOB_ELOG("invalid level num:%d", levelNum);
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SHashObj *groupHash = taosHashInit(SCHEDULE_DEFAULT_MAX_TASK_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == groupHash) {
SCH_JOB_ELOG("groupHash %d failed", SCHEDULE_DEFAULT_MAX_TASK_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->subPlans = pDag->pSubplans;
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SSchExplainGroup *pGroup = NULL;
void *rowCtx = NULL;
for (int32_t i = 0; i < levelNum; ++i) {
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
if (NULL == plans) {
SCH_JOB_ELOG("empty level plan, level:%d", i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
taskNum = (int32_t)LIST_LENGTH(plans->pNodeList);
if (taskNum <= 0) {
SCH_JOB_ELOG("invalid level plan number:%d, level:%d", taskNum, i);
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
for (int32_t n = 0; n < taskNum; ++n) {
SSubplan *plan = (SSubplan *)nodesListGetNode(plans->pNodeList, n);
pGroup = taosHashGet(groupHash, &plan->id.groupId, sizeof(plan->id.groupId));
if (pGroup) {
++pGroup->nodeNum;
continue;
}
SSchExplainGroup group = {.nodeNum = 1, .plan = plan};
if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) {
SCH_TASK_ELOG("taosHashPut to explainGroupHash failed, taskIdx:%d", n);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
}
void *pIter = taosHashIterate(groupHash, NULL);
while (pIter) {
pGroup = (SSchExplainGroup *)pIter;
SCH_ERR_JRET(qAppendTaskExplainResRows(&rowCtx, pGroup->plan, NULL, i));
pIter = taosHashIterate(groupHash, pIter);
}
taosHashClear(groupHash);
SCH_JOB_DLOG("level initialized, taskNum:%d", taskNum);
}
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qGetExplainRspFromRowCtx(rowCtx, &pRsp));
pJob->resData = pRsp;
_return:
taosHashCleanup(groupHash);
qFreeExplainRowCtx(rowCtx);
SCH_RET(code);
}
int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (NULL != pTask->candidateAddrs) {
return TSDB_CODE_SUCCESS;
......@@ -2093,6 +2177,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.analyzeExplain = (EXPLAIN_MODE_ANALYZE == pDag->explainInfo.mode);
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->sql = sql;
......@@ -2163,6 +2248,51 @@ _return:
SCH_RET(code);
}
int32_t schStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->sql = sql;
pJob->attr.queryJob = true;
SCH_ERR_JRET(schValidateAndBuildJobExplain(pDag, pJob));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED;
*job = pJob->refId;
SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
schReleaseJob(pJob->refId);
return TSDB_CODE_SUCCESS;
_return:
schFreeJobImpl(pJob);
SCH_RET(code);
}
int32_t schedulerInit(SSchedulerCfg *cfg) {
if (schMgmt.jobRef) {
qError("scheduler already initialized");
......@@ -2211,7 +2341,11 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
}
SSchJob *job = schAcquireJob(*pJob);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册