提交 93be707c 编写于 作者: D dapan1121

feature/qnode

上级 068697bf
...@@ -546,7 +546,7 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -546,7 +546,7 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_VND_QUERY) { if (pRequest->type == TDMT_VND_QUERY) {
pRequest->type = TDMT_VND_FETCH; pRequest->type = TDMT_VND_FETCH;
scheduleFetchRows(pRequest->body.pQueryJob, &pRequest->body.resInfo.pData); scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
} else if (pRequest->type == TDMT_MND_SHOW) { } else if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE; pRequest->type = TDMT_MND_SHOW_RETRIEVE;
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
......
...@@ -191,9 +191,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -191,9 +191,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
return pTaskInfo->code; return pTaskInfo->code;
} }
int32_t status = 0; bool qcontinue = false;
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &status); pTaskInfo->code = dsPutDataBlock(pTaskInfo->dsHandle, &inputData, &qcontinue);
if (isTaskKilled(pTaskInfo)) { if (isTaskKilled(pTaskInfo)) {
qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo)); qDebug("QInfo:0x%" PRIx64 " task is killed", GET_TASKID(pTaskInfo));
...@@ -202,7 +202,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { ...@@ -202,7 +202,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
// pRuntimeEnv->resultInfo.total); // pRuntimeEnv->resultInfo.total);
} }
if (status == DS_DATA_FULL) { if (!qcontinue) {
qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), qDebug("QInfo:0x%"PRIx64" query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo),
0, 0L, 0); 0, 0L, 0);
return pTaskInfo->code; return pTaskInfo->code;
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
#include "qworkerInt.h" #include "qworkerInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h" #include "tname.h"
#include "dataSinkMgt.h"
int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) { int32_t qwValidateStatus(int8_t oriStatus, int8_t newStatus) {
int32_t code = 0; int32_t code = 0;
...@@ -891,14 +892,10 @@ int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uin ...@@ -891,14 +892,10 @@ int32_t qwHandleFetch(SQWorkerTaskHandlesCache *handles, SQWorkerMgmt *mgmt, uin
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }
code = dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd); dsGetDataLength(handles->sinkHandle, &dataLength, &queryEnd);
if (TSDB_CODE_SUCCESS != code) {
qError("dsGetDataLength failed, code:%x", code);
QW_ERR_JRET(code);
}
if (dataLength > 0) { if (dataLength > 0) {
SOutPutData output = {0}; SOutputData output = {0};
QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp)); QW_ERR_JRET(qwInitFetchRsp(dataLength, &rsp));
output.pData = rsp->data; output.pData = rsp->data;
......
...@@ -509,6 +509,27 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b ...@@ -509,6 +509,27 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b
} }
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
if (errCode) {
atomic_store_32(&pJob->errCode, errCode);
}
if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
assert(0);
}
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
int32_t schFetchFromRemote(SSchJob *pJob) { int32_t schFetchFromRemote(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
...@@ -540,25 +561,6 @@ _return: ...@@ -540,25 +561,6 @@ _return:
} }
// Note: no more error processing, handled in function internal
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) {
// if already FAILED, no more processing
SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED));
if (errCode) {
atomic_store_32(&pJob->errCode, errCode);
}
if (atomic_load_8(&pJob->userFetch) || ((!SCH_JOB_NEED_FETCH(&pJob->attr)) && pJob->attr.syncSchedule)) {
tsem_post(&pJob->rspSem);
}
SCH_ERR_RET(atomic_load_32(&pJob->errCode));
assert(0);
}
// Note: no more error processing, handled in function internal // Note: no more error processing, handled in function internal
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) {
int32_t code = 0; int32_t code = 0;
...@@ -760,8 +762,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -760,8 +762,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break; break;
} }
break;
}
case TDMT_VND_QUERY_RSP: { case TDMT_VND_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg; SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
...@@ -784,8 +784,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch ...@@ -784,8 +784,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
break; break;
} }
break;
}
case TDMT_VND_FETCH_RSP: { case TDMT_VND_FETCH_RSP: {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
...@@ -1316,7 +1314,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru ...@@ -1316,7 +1314,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
SSchJob *job = NULL; SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, true)); SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true));
*pJob = job; *pJob = job;
...@@ -1333,7 +1331,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, ...@@ -1333,7 +1331,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag,
SSchJob *job = NULL; SSchJob *job = NULL;
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, (void **)&job, false)); SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, false));
*pJob = job; *pJob = job;
...@@ -1368,11 +1366,11 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { ...@@ -1368,11 +1366,11 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
} }
if (status == JOB_TASK_STATUS_FAILED) { if (status == JOB_TASK_STATUS_FAILED) {
*data = atomic_load_ptr(&pJob->res); *pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL); atomic_store_ptr(&pJob->res, NULL);
SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); SCH_ERR_JRET(atomic_load_32(&pJob->errCode));
} else if (status == JOB_TASK_STATUS_SUCCEED) { } else if (status == JOB_TASK_STATUS_SUCCEED) {
*data = atomic_load_ptr(&pJob->res); *pData = atomic_load_ptr(&pJob->res);
atomic_store_ptr(&pJob->res, NULL); atomic_store_ptr(&pJob->res, NULL);
goto _return; goto _return;
} else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
...@@ -1392,9 +1390,9 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { ...@@ -1392,9 +1390,9 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) {
} }
while (true) { while (true) {
*data = atomic_load_ptr(&pJob->res); *pData = atomic_load_ptr(&pJob->res);
if (*data != atomic_val_compare_exchange_ptr(&pJob->res, *data, NULL)) { if (*pData != atomic_val_compare_exchange_ptr(&pJob->res, *pData, NULL)) {
continue; continue;
} }
...@@ -1472,17 +1470,21 @@ void scheduleFreeJob(void *job) { ...@@ -1472,17 +1470,21 @@ void scheduleFreeJob(void *job) {
taosArrayDestroy(pLevel->subTasks); taosArrayDestroy(pLevel->subTasks);
} }
taosHashCleanup(pJob->execTasks); taosHashCleanup(pJob->execTasks);
taosHashCleanup(pJob->failTasks); taosHashCleanup(pJob->failTasks);
taosHashCleanup(pJob->succTasks); taosHashCleanup(pJob->succTasks);
taosArrayDestroy(pJob->levels);
tfree(pJob->res); taosArrayDestroy(pJob->levels);
tfree(pJob); tfree(pJob->res);
}
void schedulerDestroy(void) { tfree(pJob);
if (schMgmt.jobs) { }
taosHashCleanup(sch
void schedulerDestroy(void) {
if (schMgmt.jobs) {
taosHashCleanup(schMgmt.jobs); //TODO
schMgmt.jobs = NULL;
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册