提交 4c82ce18 编写于 作者: D dapan1121

fix: fix request freed issue

上级 16c6427e
......@@ -69,18 +69,20 @@ typedef struct SSchdFetchParam {
int32_t* code;
} SSchdFetchParam;
typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code);
typedef void (*schedulerExecFp)(SQueryResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param);
typedef struct SSchedulerReq {
bool *reqKilled;
SRequestConnInfo *pConn;
SArray *pNodeList;
SQueryPlan *pDag;
const char *sql;
int64_t startTs;
schedulerExecCallback fp;
void* cbParam;
schedulerExecFp execFp;
void* execParam;
schedulerChkKillFp chkKillFp;
void* chkKillParam;
} SSchedulerReq;
......@@ -110,7 +112,7 @@ int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes)
*/
int32_t schedulerFetchRows(int64_t job, void **data);
void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param);
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param);
int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub);
......
......@@ -55,6 +55,18 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
return strdup(key);
}
bool chkRequestKilled(void* param) {
bool killed = false;
SRequestObj* pRequest = acquireRequest((int64_t)param);
if (NULL == pRequest || pRequest->killed) {
killed = true;
}
releaseRequest((int64_t)param);
return killed;
}
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo, int connType);
......@@ -612,58 +624,6 @@ _return:
return code;
}
int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
tsem_init(&schdRspSem, 0, 0);
SQueryResult res = {.code = 0, .numOfRows = 0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId,
.requestObjRefId = pRequest->self};
SSchedulerReq req = {.pConn = &conn,
.pNodeList = pNodeList,
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = schdExecCallback,
.cbParam = &res};
int32_t code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
pRequest->body.resInfo.execRes = res.res;
while (true) {
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
}
pRequest->code = code;
terrno = code;
return pRequest->code;
} else {
tsem_wait(&schdRspSem);
if (res.code) {
code = res.code;
} else {
break;
}
}
}
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows;
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob, 0);
}
}
pRequest->code = res.code;
terrno = res.code;
return pRequest->code;
}
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
......@@ -676,9 +636,10 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = NULL,
.cbParam = NULL,
.reqKilled = &pRequest->killed};
.execFp = NULL,
.execParam = NULL,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob, &res);
pRequest->body.resInfo.execRes = res.res;
......@@ -986,9 +947,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.pDag = pDag,
.sql = pRequest->sqlstr,
.startTs = pRequest->metric.start,
.fp = schedulerExecCb,
.cbParam = pRequest,
.reqKilled = &pRequest->killed};
.execFp = schedulerExecCb,
.execParam = pRequest,
.chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self};
code = schedulerAsyncExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList);
} else {
......
......@@ -99,8 +99,8 @@ typedef struct SSchStat {
typedef struct SSchResInfo {
SQueryResult* queryRes;
void** fetchRes;
schedulerExecCallback execFp;
schedulerFetchCallback fetchFp;
schedulerExecFp execFp;
schedulerFetchFp fetchFp;
void* userParam;
} SSchResInfo;
......@@ -204,37 +204,38 @@ typedef struct {
} SSchOpStatus;
typedef struct SSchJob {
int64_t refId;
uint64_t queryId;
SSchJobAttr attr;
int32_t levelNum;
int32_t taskNum;
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag;
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem;
SSchOpStatus opStatus;
bool *reqKilled;
SSchTask *fetchTask;
int32_t errCode;
SRWLatch resLock;
SQueryExecRes execRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
int64_t refId;
uint64_t queryId;
SSchJobAttr attr;
int32_t levelNum;
int32_t taskNum;
SRequestConnInfo conn;
SArray *nodeList; // qnode/vnode list, SArray<SQueryNodeLoad>
SArray *levels; // starting from 0. SArray<SSchLevel>
SQueryPlan *pDag;
SArray *dataSrcTasks; // SArray<SQueryTask*>
int32_t levelIdx;
SEpSet dataSrcEps;
SHashObj *taskList;
SHashObj *execTasks; // executing and executed tasks, key:taskid, value:SQueryTask*
SHashObj *flowCtrl; // key is ep, element is SSchFlowControl
SExplainCtx *explainCtx;
int8_t status;
SQueryNodeAddr resNode;
tsem_t rspSem;
SSchOpStatus opStatus;
schedulerChkKillFp chkKillFp;
void* chkKillParam;
SSchTask *fetchTask;
int32_t errCode;
SRWLatch resLock;
SQueryExecRes execRes;
void *resData; //TODO free it or not
int32_t resNumOfRows;
SSchResInfo userRes;
const char *sql;
SQueryProfileSummary summary;
} SSchJob;
......
......@@ -55,9 +55,10 @@ int32_t schInitJob(SSchedulerReq *pReq, SSchJob **pSchJob) {
pJob->conn = *pReq->pConn;
pJob->sql = pReq->sql;
pJob->pDag = pReq->pDag;
pJob->reqKilled = pReq->reqKilled;
pJob->userRes.execFp = pReq->fp;
pJob->userRes.userParam = pReq->cbParam;
pJob->chkKillFp = pReq->chkKillFp;
pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp;
pJob->userRes.userParam = pReq->execParam;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
......@@ -182,7 +183,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
*pStatus = status;
}
if (*pJob->reqKilled) {
if ((*pJob->chkKillFp)(pJob->chkKillParam)) {
schUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING);
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
......@@ -1584,7 +1585,7 @@ _return:
schEndOperation(pJob);
if (!sync) {
pReq->fp(NULL, pReq->cbParam, code);
pReq->execFp(NULL, pReq->execParam, code);
}
schFreeJobImpl(pJob);
......@@ -1651,7 +1652,7 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync) {
_return:
if (!sync) {
pReq->fp(NULL, pReq->cbParam, code);
pReq->execFp(NULL, pReq->execParam, code);
}
SCH_RET(code);
......
......@@ -140,7 +140,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) {
SCH_RET(code);
}
void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) {
void schedulerAsyncFetchRows(int64_t job, schedulerFetchFp fp, void* param) {
qDebug("scheduler async fetch rows start");
int32_t code = 0;
......
......@@ -511,8 +511,8 @@ void* schtRunJobThread(void *aa) {
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.fp = schtQueryCb;
req.cbParam = &queryDone;
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &queryJobRefId);
assert(code == 0);
......@@ -663,8 +663,8 @@ TEST(queryTest, normalCase) {
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.fp = schtQueryCb;
req.cbParam = &queryDone;
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -767,8 +767,8 @@ TEST(queryTest, readyFirstCase) {
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.fp = schtQueryCb;
req.cbParam = &queryDone;
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -874,8 +874,8 @@ TEST(queryTest, flowCtrlCase) {
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "select * from tb";
req.fp = schtQueryCb;
req.cbParam = &queryDone;
req.execFp = schtQueryCb;
req.execParam = &queryDone;
code = schedulerAsyncExecJob(&req, &job);
ASSERT_EQ(code, 0);
......@@ -987,8 +987,8 @@ TEST(insertTest, normalCase) {
req.pNodeList = qnodeList;
req.pDag = &dag;
req.sql = "insert into tb values(now,1)";
req.fp = schtQueryCb;
req.cbParam = NULL;
req.execFp = schtQueryCb;
req.execParam = NULL;
code = schedulerExecJob(&req, &insertJobRefId, &res);
ASSERT_EQ(code, 0);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册