“9c3bcfa027cb32421ed20ab77553860b922b82fc”上不存在“compatible/apps/dsa.c”
提交 fb944e85 编写于 作者: D dapan1121

fix explain res issue

上级 b2fc04b8
...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); ...@@ -72,7 +72,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
* @return * @return
*/ */
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, bool needRes, SQueryResult *pRes); int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
......
...@@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -291,7 +291,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, NULL != pRes, &res); pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) { if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob); schedulerFreeJob(pRequest->body.queryJob);
......
...@@ -39,6 +39,12 @@ enum { ...@@ -39,6 +39,12 @@ enum {
SCH_WRITE, SCH_WRITE,
}; };
typedef enum {
SCH_RES_TYPE_QUERY,
SCH_RES_TYPE_FETCH,
} SCH_RES_TYPE;
typedef struct SSchTrans { typedef struct SSchTrans {
void *transInst; void *transInst;
void *transHandle; void *transHandle;
...@@ -159,7 +165,6 @@ typedef struct SSchTask { ...@@ -159,7 +165,6 @@ typedef struct SSchTask {
typedef struct SSchJobAttr { typedef struct SSchJobAttr {
EExplainMode explainMode; EExplainMode explainMode;
bool needRes;
bool syncSchedule; bool syncSchedule;
bool queryJob; bool queryJob;
bool needFlowCtrl; bool needFlowCtrl;
...@@ -192,6 +197,7 @@ typedef struct SSchJob { ...@@ -192,6 +197,7 @@ typedef struct SSchJob {
int32_t errCode; int32_t errCode;
SArray *errList; // SArray<SQueryErrorInfo> SArray *errList; // SArray<SQueryErrorInfo>
SRWLatch resLock; SRWLatch resLock;
SCH_RES_TYPE resType;
void *resData; //TODO free it or not void *resData; //TODO free it or not
int32_t resNumOfRows; int32_t resNumOfRows;
const char *sql; const char *sql;
......
...@@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * ...@@ -70,7 +70,7 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
} }
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
int64_t startTs, bool needRes, bool syncSchedule) { int64_t startTs, bool syncSchedule) {
int32_t code = 0; int32_t code = 0;
int64_t refId = -1; int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
...@@ -81,7 +81,6 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray ...@@ -81,7 +81,6 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray
pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.explainMode = pDag->explainInfo.mode;
pJob->attr.syncSchedule = syncSchedule; pJob->attr.syncSchedule = syncSchedule;
pJob->attr.needRes = needRes;
pJob->transport = transport; pJob->transport = transport;
pJob->sql = sql; pJob->sql = sql;
...@@ -1059,6 +1058,8 @@ _return: ...@@ -1059,6 +1058,8 @@ _return:
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) { int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
pJob->resType = SCH_RES_TYPE_FETCH;
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
atomic_store_ptr(&pJob->resData, pRsp); atomic_store_ptr(&pJob->resData, pRsp);
...@@ -1179,23 +1180,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -1179,23 +1180,20 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows); SCH_TASK_DLOG("submit succeed, affectedRows:%d", rsp->affectedRows);
if (pJob->attr.needRes) { pJob->resType = SCH_RES_TYPE_QUERY;
SCH_LOCK(SCH_WRITE, &pJob->resLock); SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (pJob->resData) { if (pJob->resData) {
SSubmitRsp *sum = pJob->resData; SSubmitRsp *sum = pJob->resData;
sum->affectedRows += rsp->affectedRows; sum->affectedRows += rsp->affectedRows;
sum->nBlocks += rsp->nBlocks; sum->nBlocks += rsp->nBlocks;
sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks)); sum->pBlocks = taosMemoryRealloc(sum->pBlocks, sum->nBlocks * sizeof(*sum->pBlocks));
memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks)); memcpy(sum->pBlocks + sum->nBlocks - rsp->nBlocks, rsp->pBlocks, rsp->nBlocks * sizeof(*sum->pBlocks));
taosMemoryFree(rsp->pBlocks); taosMemoryFree(rsp->pBlocks);
taosMemoryFree(rsp); taosMemoryFree(rsp);
} else {
pJob->resData = rsp;
}
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} else { } else {
tFreeSSubmitRsp(rsp); pJob->resData = rsp;
} }
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
} }
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
...@@ -2412,7 +2410,7 @@ void schFreeJobImpl(void *job) { ...@@ -2412,7 +2410,7 @@ void schFreeJobImpl(void *job) {
} }
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
int64_t startTs, bool needRes, bool syncSchedule) { int64_t startTs, bool syncSchedule) {
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
...@@ -2421,7 +2419,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD ...@@ -2421,7 +2419,7 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pD
int32_t code = 0; int32_t code = 0;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, needRes, syncSchedule)); SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
SCH_ERR_JRET(schLaunchJob(pJob)); SCH_ERR_JRET(schLaunchJob(pJob));
...@@ -2463,6 +2461,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa ...@@ -2463,6 +2461,8 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa
SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData));
pJob->resType = SCH_RES_TYPE_FETCH;
int64_t refId = taosAddRef(schMgmt.jobRef, pJob); int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
if (refId < 0) { if (refId < 0) {
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno)); SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
...@@ -2535,7 +2535,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { ...@@ -2535,7 +2535,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} }
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
int64_t startTs, bool needRes, SQueryResult *pRes) { int64_t startTs, SQueryResult *pRes) {
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
...@@ -2543,14 +2543,14 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in ...@@ -2543,14 +2543,14 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
} else { } else {
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, needRes, true)); SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
} }
SSchJob *job = schAcquireJob(*pJob); SSchJob *job = schAcquireJob(*pJob);
pRes->code = atomic_load_32(&job->errCode); pRes->code = atomic_load_32(&job->errCode);
pRes->numOfRows = job->resNumOfRows; pRes->numOfRows = job->resNumOfRows;
if (needRes) { if (SCH_RES_TYPE_QUERY == job->resType) {
pRes->res = job->resData; pRes->res = job->resData;
job->resData = NULL; job->resData = NULL;
} }
...@@ -2568,7 +2568,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD ...@@ -2568,7 +2568,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
} else { } else {
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false, false)); SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
......
...@@ -985,7 +985,7 @@ TEST(insertTest, normalCase) { ...@@ -985,7 +985,7 @@ TEST(insertTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0}; SQueryResult res = {0};
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, false, &res); code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20); ASSERT_EQ(res.numOfRows, 20);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册