提交 7602f916 编写于 作者: D dapan1121

feature/qnode

上级 a60ce69b
......@@ -915,6 +915,12 @@ typedef struct {
char data[];
} SRetrieveMetaTableRsp;
typedef struct {
int32_t numOfItems;
char data[];
} SExplainRsp;
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port;
......@@ -1052,6 +1058,7 @@ typedef struct SSubQueryMsg {
uint64_t taskId;
int64_t refId;
int8_t taskType;
int8_t explain;
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
char msg[];
......
......@@ -188,6 +188,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
......
......@@ -17,8 +17,20 @@
#include "tmsg.h"
#include "plannodes.h"
typedef struct SExplainCtx SExplainCtx;
typedef struct SExplainExecInfo {
uint64_t startupCost;
uint64_t totalCost;
uint64_t numOfRows;
} SExplainExecInfo;
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs);
int32_t qExecExplainEnd(SExplainCtx *pCtx);
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
void qExplainFreeCtx(SExplainCtx *pCtx);
......@@ -174,6 +174,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
#ifdef __cplusplus
}
#endif
......
......@@ -216,6 +216,7 @@ int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int3
int32_t nodesStringToList(const char* pStr, SNodeList** pList);
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len);
char *nodesGetNameFromColumnNode(SNode *pNode);
#ifdef __cplusplus
}
......
......@@ -308,6 +308,7 @@ typedef enum EExplainMode {
typedef struct SExplainInfo {
EExplainMode mode;
bool verbose;
double ratio;
} SExplainInfo;
typedef struct SQueryPlan {
......
......@@ -315,6 +315,7 @@ bool nodesIsTimelineQuery(const SNode* pQuery);
void* nodesGetValueFromNode(SValueNode *pNode);
char* nodesGetStrValueFromNode(SValueNode *pNode);
char *getFillModeString(EFillMode mode);
#ifdef __cplusplus
}
......
......@@ -71,7 +71,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return
*/
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, int64_t* pJob, const char* sql, SQueryResult *pRes);
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
/**
* Process the query job, generated according to the query physical plan.
......
......@@ -241,7 +241,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
......
......@@ -50,8 +50,9 @@ extern "C" {
typedef struct SExplainGroup {
int32_t nodeNum;
SRWLatch lock;
SSubplan *plan;
void *execInfo; //TODO
SArray *execInfo;
} SExplainGroup;
typedef struct SExplainResNode {
......@@ -67,10 +68,18 @@ typedef struct SQueryExplainRowInfo {
} SQueryExplainRowInfo;
typedef struct SExplainCtx {
int32_t totalSize;
double ratio;
bool verbose;
int32_t rootGroupId;
int32_t dataSize;
bool execDone;
int64_t reqStartTs;
int64_t jobStartTs;
int64_t jobDoneTs;
char *tbuf;
SArray *rows;
int32_t groupDoneNum;
SHashObj *groupHash;
} SExplainCtx;
......
......@@ -17,32 +17,31 @@
#include "plannodes.h"
#include "commandInt.h"
int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes);
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level);
int32_t qExplainGenerateResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
void qFreeExplainResTree(SExplainResNode *res) {
if (NULL == res) {
void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) {
return;
}
taosMemoryFreeClear(res->pExecInfo);
taosMemoryFreeClear(resNode->pExecInfo);
SNode* node = NULL;
FOREACH(node, res->pChildren) {
qFreeExplainResTree((SExplainResNode *)node);
FOREACH(node, resNode->pChildren) {
qExplainFreeResNode((SExplainResNode *)node);
}
nodesClearList(res->pChildren);
nodesClearList(resNode->pChildren);
taosMemoryFreeClear(res);
taosMemoryFreeClear(resNode);
}
void qFreeExplainCtx(void *ctx) {
if (NULL == ctx) {
void qExplainFreeCtx(SExplainCtx *pCtx) {
if (NULL == pCtx) {
return;
}
SExplainCtx *pCtx = (SExplainCtx *)ctx;
int32_t rowSize = taosArrayGetSize(pCtx->rows);
for (int32_t i = 0; i < rowSize; ++i) {
SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i);
......@@ -54,7 +53,7 @@ void qFreeExplainCtx(void *ctx) {
taosMemoryFree(pCtx);
}
int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) {
int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, double ratio) {
int32_t code = 0;
SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx));
if (NULL == ctx) {
......@@ -75,6 +74,7 @@ int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) {
}
ctx->verbose = verbose;
ctx->ratio = ratio;
ctx->tbuf = tbuf;
ctx->rows = rows;
ctx->groupHash = groupHash;
......@@ -92,35 +92,7 @@ _return:
QRY_RET(code);
}
char *qFillModeString(EFillMode mode) {
switch (mode) {
case FILL_MODE_NONE:
return "none";
case FILL_MODE_VALUE:
return "value";
case FILL_MODE_PREV:
return "prev";
case FILL_MODE_NULL:
return "null";
case FILL_MODE_LINEAR:
return "linear";
case FILL_MODE_NEXT:
return "next";
default:
return "unknown";
}
}
char *qGetNameFromColumnNode(SNode *pNode) {
if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) {
return "NULL";
}
return ((SColumnNode *)pNode)->colName;
}
int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) {
int32_t qExplainGenerateResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) {
int32_t tlen = 0;
SNodeList *pPhysiChildren = NULL;
......@@ -192,38 +164,38 @@ int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeLis
SNode* node = NULL;
SExplainResNode *pResNode = NULL;
FOREACH(node, pPhysiChildren) {
QRY_ERR_RET(qGenerateExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode));
QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, pExecInfo, &pResNode));
QRY_ERR_RET(nodesListAppend(*pChildren, pResNode));
}
return TSDB_CODE_SUCCESS;
}
int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) {
int32_t qExplainGenerateResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pResNode) {
if (NULL == pNode) {
*pRes = NULL;
*pResNode = NULL;
qError("physical node is NULL");
return TSDB_CODE_QRY_APP_ERROR;
}
SExplainResNode *res = taosMemoryCalloc(1, sizeof(SExplainResNode));
if (NULL == res) {
SExplainResNode *resNode = taosMemoryCalloc(1, sizeof(SExplainResNode));
if (NULL == resNode) {
qError("calloc SPhysiNodeExplainRes failed");
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
int32_t code = 0;
res->pNode = pNode;
res->pExecInfo = pExecInfo;
QRY_ERR_JRET(qGenerateExplainResChildren(pNode, pExecInfo, &res->pChildren));
resNode->pNode = pNode;
resNode->pExecInfo = pExecInfo;
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, pExecInfo, &resNode->pChildren));
*pRes = res;
*pResNode = resNode;
return TSDB_CODE_SUCCESS;
_return:
qFreeExplainResTree(res);
qExplainFreeResNode(resNode);
QRY_RET(code);
}
......@@ -249,7 +221,7 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
memcpy(row.buf, tbuf, len);
row.level = level;
row.len = len;
ctx->totalSize += len;
ctx->dataSize += len;
if (NULL == taosArrayPush(ctx->rows, &row)) {
qError("taosArrayPush row to explain res rows failed");
......@@ -444,7 +416,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
}
QRY_ERR_RET(qAppendTaskExplainResRows(ctx, pExchNode->srcGroupId, level + 1));
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1));
break;
}
case QUERY_NODE_PHYSICAL_PLAN_SORT:{
......@@ -468,7 +440,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length,
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length,
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit,
pIntNode->offset, getPrecisionUnit(pIntNode->precision),
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit,
......@@ -481,7 +453,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
if (pIntNode->pFill) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode));
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, getFillModeString(pIntNode->pFill->mode));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
......@@ -540,7 +512,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
return TSDB_CODE_SUCCESS;
}
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) {
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
SExplainResNode *node = NULL;
int32_t code = 0;
SExplainCtx *ctx = (SExplainCtx *)pCtx;
......@@ -551,19 +523,19 @@ int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) {
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
}
QRY_ERR_RET(qGenerateExplainResNode(group->plan->pNode, group->execInfo, &node));
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group->execInfo, &node));
QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level));
_return:
qFreeExplainResTree(node);
qExplainFreeResNode(node);
QRY_RET(code);
}
int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
SExplainCtx *pCtx = (SExplainCtx *)ctx;
int32_t rowNum = taosArrayGetSize(pCtx->rows);
if (rowNum <= 0) {
......@@ -572,7 +544,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
}
int32_t colNum = 1;
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->totalSize;
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
......@@ -582,7 +554,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1;
rsp->numOfRows = htonl(rowNum);
*(int32_t *)rsp->data = htonl(pCtx->totalSize);
*(int32_t *)rsp->data = htonl(pCtx->dataSize);
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
char *data = (char *)(offset + rowNum);
......@@ -604,13 +576,13 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
return TSDB_CODE_SUCCESS;
}
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
int32_t code = 0;
SNodeListNode *plans = NULL;
int32_t taskNum = 0;
SExplainGroup *pGroup = NULL;
void *pCtx = NULL;
int32_t rootGroupId = 0;
SExplainCtx *ctx = NULL;
if (pDag->numOfSubplans <= 0) {
qError("invalid subplan num:%d", pDag->numOfSubplans);
......@@ -629,7 +601,7 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
QRY_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose));
QRY_ERR_JRET(qExplainInitCtx(&ctx, groupHash, pDag->explainInfo.verbose, pDag->explainInfo.ratio));
for (int32_t i = 0; i < levelNum; ++i) {
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
......@@ -666,22 +638,62 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
rootGroupId = plan->id.groupId;
ctx->rootGroupId = plan->id.groupId;
}
qDebug("level %d group handled, taskNum:%d", i, taskNum);
}
QRY_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0));
*pCtx = ctx;
return TSDB_CODE_SUCCESS;
_return:
qExplainFreeCtx(ctx);
QRY_RET(code);
}
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp) {
}
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
int32_t code = 0;
SExplainCtx *pCtx = NULL;
QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx));
QRY_ERR_JRET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
QRY_ERR_JRET(qGetExplainRspFromCtx(pCtx, pRsp));
QRY_ERR_JRET(qExplainGetRspFromCtx(pCtx, pRsp));
_return:
qFreeExplainCtx(pCtx);
qExplainFreeCtx(pCtx);
QRY_RET(code);
}
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs) {
QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx));
(*pCtx)->reqStartTs = startTs;
(*pCtx)->jobStartTs = taosGetTimestampMs();
return TSDB_CODE_SUCCESS;
}
int32_t qExecExplainEnd(SExplainCtx *pCtx) {
pCtx->jobDoneTs = taosGetTimestampMs();
atomic_store_8((int8_t *)&pCtx->execDone, true);
return TSDB_CODE_SUCCESS;
}
......@@ -229,3 +229,12 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
queryCostStatis(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo);
}
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
int32_t capacity = 0;
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
}
......@@ -9516,3 +9516,18 @@ void releaseQueryBuf(size_t numOfTables) {
// restore value is not enough buffer available
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
}
int32_t getOperatorExplainExecInfo(SOperatorInfo *operator, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum) {
if (*resNum >= *capacity) {
*capacity += 10;
*pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
if (NULL == *pRes) {
qError("malloc %d failed", capacity * sizeof(SExplainExecInfo));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
}
}
......@@ -31,6 +31,7 @@ extern "C" {
#define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
#ifdef __cplusplus
}
#endif
......
......@@ -1023,3 +1023,33 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
return TSDB_CODE_SUCCESS;
}
char *getFillModeString(EFillMode mode) {
switch (mode) {
case FILL_MODE_NONE:
return "none";
case FILL_MODE_VALUE:
return "value";
case FILL_MODE_PREV:
return "prev";
case FILL_MODE_NULL:
return "null";
case FILL_MODE_LINEAR:
return "linear";
case FILL_MODE_NEXT:
return "next";
default:
return "unknown";
}
}
char *nodesGetNameFromColumnNode(SNode *pNode) {
if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) {
return "NULL";
}
return ((SColumnNode *)pNode)->colName;
}
......@@ -107,15 +107,17 @@ typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
int8_t taskType;
int8_t explain;
bool emptyRes;
bool queryFetched;
bool queryEnd;
bool queryContinue;
bool queryInQueue;
int32_t rspCode;
SQWConnInfo connInfo;
SQWConnInfo ctrlConnInfo;
SQWConnInfo dataConnInfo;
int8_t events[QW_EVENT_MAX];
qTaskInfo_t taskHandle;
......
......@@ -432,8 +432,10 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
ctx->connInfo.handle = NULL;
tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
ctx->ctrlConnInfo.handle = NULL;
// NO need to release dataConnInfo
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
......@@ -537,6 +539,26 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
return TSDB_CODE_SUCCESS;
}
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
qTaskInfo_t *taskHandle = &ctx->taskHandle;
if (TASK_TYPE_TEMP == ctx->taskType) {
if (ctx->explain) {
SExplainExecInfo *execInfo = NULL;
int32_t resNum = 0;
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
QW_ERR_RET(qwBuildAndSendExplainRsp(&ctx->ctrlConnInfo, &rsp));
}
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
return TSDB_CODE_SUCCESS;
}
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
int32_t code = 0;
bool qcontinue = true;
......@@ -562,10 +584,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
dsEndPut(sinkHandle, useconds);
if (TASK_TYPE_TEMP == ctx->taskType) {
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
}
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
if (queryEnd) {
*queryEnd = true;
......@@ -658,19 +678,6 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
bool queryEnd = false;
int32_t code = 0;
if (ctx->emptyRes) {
QW_TASK_DLOG_E("query end with empty result");
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
*rspMsg = rsp;
*dataLen = 0;
pOutput->queryEnd = true;
return TSDB_CODE_SUCCESS;
}
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len < 0) {
......@@ -760,12 +767,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
dropConnection = &ctx->connInfo;
dropConnection = &ctx->ctrlConnInfo;
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL;
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break;
......@@ -798,12 +805,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
}
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
dropConnection = &ctx->connInfo;
dropConnection = &ctx->ctrlConnInfo;
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
dropConnection = NULL;
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
}
......@@ -863,17 +870,13 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
}
if (QW_PHASE_POST_QUERY == phase) {
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
ctx->emptyRes = true;
}
#if 0
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
readyConnection = &ctx->connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
}
#else
connInfo.handle = ctx->connInfo.handle;
connInfo.handle = ctx->ctrlConnInfo.handle;
readyConnection = &connInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
......@@ -886,8 +889,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
}
qwBuildAndSendDropRsp(&ctx->connInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
......@@ -931,7 +934,7 @@ _return:
QW_RET(code);
}
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
int32_t code = 0;
bool queryRsped = false;
struct SSubplan *plan = NULL;
......@@ -947,9 +950,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
atomic_store_8(&ctx->taskType, taskType);
atomic_store_8(&ctx->explain, explain);
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
......@@ -1011,8 +1015,8 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (ctx->phase == QW_PHASE_PRE_QUERY) {
ctx->connInfo.handle == qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle;
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
needRsp = false;
QW_TASK_DLOG_E("ready msg will not rsp now");
......@@ -1089,10 +1093,13 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
if (rsp) {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
if (qComplete) {
atomic_store_8((int8_t*)&ctx->queryEnd, true);
}
qwMsg->connInfo = ctx->connInfo;
qwMsg->connInfo = ctx->dataConnInfo;
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
......@@ -1113,7 +1120,7 @@ _return:
qwFreeFetchRsp(rsp);
rsp = NULL;
qwMsg->connInfo = ctx->connInfo;
qwMsg->connInfo = ctx->dataConnInfo;
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
}
......@@ -1151,14 +1158,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
if (NULL == rsp) {
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
} else {
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
if (qComplete) {
atomic_store_8((int8_t*)&ctx->queryEnd, true);
}
}
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
......@@ -1236,8 +1246,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
}
if (!rsped) {
ctx->connInfo.handle = qwMsg->connInfo.handle;
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
}
......
......@@ -327,7 +327,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
taosMemoryFreeClear(sql);
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
......
......@@ -26,6 +26,7 @@ extern "C" {
#include "scheduler.h"
#include "thash.h"
#include "trpc.h"
#include "command.h"
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
......@@ -165,6 +166,7 @@ typedef struct SSchJob {
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
SExplainCtx *explainCtx;
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem;
......@@ -211,6 +213,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
......@@ -251,6 +254,8 @@ int32_t schFetchFromRemote(SSchJob *pJob);
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
int32_t schCloneSMsgSendInfo(void *src, void **dst);
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
void schFreeJobImpl(void *job);
#ifdef __cplusplus
......
......@@ -67,6 +67,81 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
return TSDB_CODE_SUCCESS;
}
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool syncSchedule) {
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->sql = sql;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
}
pJob->execTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->succTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->failTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&pJob->rspSem, 0, 0);
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
*pSchJob = pJob;
return TSDB_CODE_SUCCESS;
_return:
schFreeJobImpl(pJob);
SCH_RET(code);
}
void schFreeRpcCtx(SRpcCtx *pCtx) {
if (NULL == pCtx) {
return;
......@@ -1050,6 +1125,40 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break;
}
case TDMT_VND_EXPLAIN_RSP: {
SExplainRsp *taskRsp = (SExplainRsp *)msg;
SCH_ERR_JRET(rspCode);
if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (!SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (pJob->resData) {
SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
taosMemoryFreeClear(taskRsp);
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
}
SRetrieveTableRsp *pRsp = NULL;
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, taskRsp, pTask->plan->id.groupId, &pRsp));
if (pRsp) {
atomic_store_ptr(&pJob->resData, pRsp);
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
schProcessOnDataFetched(pJob);
}
break;
}
case TDMT_VND_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
......@@ -1058,6 +1167,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (SCH_IS_EXPLAIN_JOB(pJob)) {
if (rsp->completed) {
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx));
return TSDB_CODE_SUCCESS;
}
SCH_ERR_JRET(schFetchFromRemote(pJob));
return TSDB_CODE_SUCCESS;
}
if (pJob->resData) {
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
taosMemoryFreeClear(rsp);
......@@ -1767,6 +1887,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->taskId = htobe64(pTask->taskId);
pMsg->refId = htobe64(pJob->refId);
pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len);
......@@ -2083,6 +2204,8 @@ void schFreeJobImpl(void *job) {
taosArrayDestroy(pJob->levels);
taosArrayDestroy(pJob->nodeList);
qExplainFreeCtx(pJob->explainCtx);
taosMemoryFreeClear(pJob->resData);
taosMemoryFreeClear(pJob);
......@@ -2090,70 +2213,17 @@ void schFreeJobImpl(void *job) {
}
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
bool syncSchedule) {
int64_t startTs, bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
}
int32_t code = 0;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) {
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule;
pJob->transport = transport;
pJob->sql = sql;
if (pNodeList != NULL) {
pJob->nodeList = taosArrayDup(pNodeList);
}
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
pJob->execTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->execTasks) {
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->succTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->succTasks) {
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
pJob->failTasks =
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
if (NULL == pJob->failTasks) {
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
tsem_init(&pJob->rspSem, 0, 0);
SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
SCH_ERR_JRET(terrno);
}
if (NULL == schAcquireJob(refId)) {
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
}
pJob->refId = refId;
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
pJob->status = JOB_TASK_STATUS_NOT_START;
SCH_ERR_JRET(schLaunchJob(pJob));
*job = pJob->refId;
......@@ -2266,7 +2336,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
}
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
SQueryResult *pRes) {
int64_t startTs, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -2274,7 +2344,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
}
SSchJob *job = schAcquireJob(*pJob);
......@@ -2292,7 +2362,11 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
} else {
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -951,7 +951,7 @@ TEST(insertTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册