提交 473ac84f 编写于 作者: D dapan1121

fix: fix compile errors

上级 cd0cf055
...@@ -68,6 +68,13 @@ typedef struct SIndexMeta { ...@@ -68,6 +68,13 @@ typedef struct SIndexMeta {
} SIndexMeta; } SIndexMeta;
typedef struct SExecResult {
int32_t code;
uint64_t numOfRows;
int32_t msgType;
void* res;
} SExecResult;
typedef struct STbVerInfo { typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion; int32_t sversion;
......
...@@ -53,13 +53,6 @@ typedef struct SQueryProfileSummary { ...@@ -53,13 +53,6 @@ typedef struct SQueryProfileSummary {
uint64_t resultSize; // generated result size in Kb. uint64_t resultSize; // generated result size in Kb.
} SQueryProfileSummary; } SQueryProfileSummary;
typedef struct SExecResult {
int32_t code;
uint64_t numOfRows;
int32_t msgType;
void* res;
} SExecResult;
typedef struct STaskInfo { typedef struct STaskInfo {
SQueryNodeAddr addr; SQueryNodeAddr addr;
SSubQueryMsg *msg; SSubQueryMsg *msg;
...@@ -70,7 +63,7 @@ typedef struct SSchdFetchParam { ...@@ -70,7 +63,7 @@ typedef struct SSchdFetchParam {
int32_t* code; int32_t* code;
} SSchdFetchParam; } SSchdFetchParam;
typedef void (*schedulerExecFp)(SQueryResult* pResult, void* param, int32_t code); typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code);
typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code); typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code);
typedef bool (*schedulerChkKillFp)(void* param); typedef bool (*schedulerChkKillFp)(void* param);
...@@ -87,7 +80,7 @@ typedef struct SSchedulerReq { ...@@ -87,7 +80,7 @@ typedef struct SSchedulerReq {
schedulerChkKillFp chkKillFp; schedulerChkKillFp chkKillFp;
void* chkKillParam; void* chkKillParam;
SExecResult* pExecRes; SExecResult* pExecRes;
char** pFetchRes; void** pFetchRes;
} SSchedulerReq; } SSchedulerReq;
...@@ -95,7 +88,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); ...@@ -95,7 +88,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob);
int32_t schedulerFetchRows(int64_t job, void **data); int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq);
void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param); void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param);
...@@ -119,7 +112,7 @@ void schedulerFreeJob(int64_t* job, int32_t errCode); ...@@ -119,7 +112,7 @@ void schedulerFreeJob(int64_t* job, int32_t errCode);
void schedulerDestroy(void); void schedulerDestroy(void);
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code); void schdExecCallback(SExecResult* pResult, void* param, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -628,7 +628,7 @@ _return: ...@@ -628,7 +628,7 @@ _return:
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) {
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {0}; SExecResult res = {0};
SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self}; .requestObjRefId = pRequest->self};
...@@ -640,14 +640,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList ...@@ -640,14 +640,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.execFp = NULL, .execFp = NULL,
.execParam = NULL, .cbParam = NULL,
.chkKillFp = chkRequestKilled, .chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self .chkKillParam = (void*)pRequest->self,
.pQueryRes = &res, .pExecRes = &res,
}; };
int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob); int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob);
pRequest->body.resInfo.execRes = res.res; memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);
...@@ -784,10 +784,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { ...@@ -784,10 +784,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
return code; return code;
} }
void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
SRequestObj* pRequest = (SRequestObj*)param; SRequestObj* pRequest = (SRequestObj*)param;
pRequest->code = code; pRequest->code = code;
pRequest->body.resInfo.execRes = pResult->res; memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult));
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) { TDMT_VND_CREATE_TABLE == pRequest->type) {
...@@ -952,10 +952,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM ...@@ -952,10 +952,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM
.sql = pRequest->sqlstr, .sql = pRequest->sqlstr,
.startTs = pRequest->metric.start, .startTs = pRequest->metric.start,
.execFp = schedulerExecCb, .execFp = schedulerExecCb,
.execParam = pRequest, .cbParam = pRequest,
.chkKillFp = chkRequestKilled, .chkKillFp = chkRequestKilled,
.chkKillParam = (void*)pRequest->self, .chkKillParam = (void*)pRequest->self,
.pQueryRes = NULL, .pExecRes = NULL,
}; };
code = schedulerExecJob(&req, &pRequest->body.queryJob); code = schedulerExecJob(&req, &pRequest->body.queryJob);
taosArrayDestroy(pNodeList); taosArrayDestroy(pNodeList);
...@@ -1398,7 +1398,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) ...@@ -1398,7 +1398,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = true, .syncReq = true,
.pFetchRes = &pResInfo->pData, .pFetchRes = (void**)&pResInfo->pData,
}; };
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
......
...@@ -862,7 +862,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { ...@@ -862,7 +862,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
SSchedulerReq req = { SSchedulerReq req = {
.syncReq = false, .syncReq = false,
.fetchFp = fetchCallback, .fetchFp = fetchCallback,
.execParam = pRequest, .cbParam = pRequest,
}; };
schedulerFetchRows(pRequest->body.queryJob, &req); schedulerFetchRows(pRequest->body.queryJob, &req);
} }
......
...@@ -98,7 +98,7 @@ typedef struct SSchStat { ...@@ -98,7 +98,7 @@ typedef struct SSchStat {
} SSchStat; } SSchStat;
typedef struct SSchResInfo { typedef struct SSchResInfo {
SQueryResult* queryRes; SExecResult* execRes;
void** fetchRes; void** fetchRes;
schedulerExecFp execFp; schedulerExecFp execFp;
schedulerFetchFp fetchFp; schedulerFetchFp fetchFp;
...@@ -111,11 +111,6 @@ typedef struct SSchOpEvent { ...@@ -111,11 +111,6 @@ typedef struct SSchOpEvent {
SSchedulerReq *pReq; SSchedulerReq *pReq;
} SSchOpEvent; } SSchOpEvent;
typedef struct SSchEvent {
SCH_EVENT_TYPE event;
void* info;
} SSchEvent;
typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam);
typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent); typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent);
...@@ -315,9 +310,9 @@ extern SSchedulerMgmt schMgmt; ...@@ -315,9 +310,9 @@ extern SSchedulerMgmt schMgmt;
#define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status)
#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job)) #define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job))
#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync) #define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq)
#define SCH_JOB_IN_ASYNC_EXEC_OP(job) (((job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync)) #define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
#define SCH_JOB_IN_ASYNC_FETCH_OP(job) (((job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync)) #define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq))
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
...@@ -355,7 +350,7 @@ extern SSchedulerMgmt schMgmt; ...@@ -355,7 +350,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0) #define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0)
#define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0)
#define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0)
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); goto _return; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0)
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
...@@ -408,11 +403,32 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); ...@@ -408,11 +403,32 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList);
void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo); void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo);
char* schGetOpStr(SCH_OP_TYPE type); char* schGetOpStr(SCH_OP_TYPE type);
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
int32_t schInitJob(SSchJob **pJob, SSchedulerReq *pReq); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes); int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes);
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet);
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode); int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode);
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq);
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId);
void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask);
bool schJobDone(SSchJob *pJob);
int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask);
int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask);
int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param);
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq);
int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode);
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask);
void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode);
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry);
int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode);
int32_t schProcessOnJobPartialSuccess(SSchJob *pJob);
void schFreeTask(SSchJob *pJob, SSchTask *pTask);
void schDropTaskInHashList(SSchJob *pJob, SHashObj *list);
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level);
int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask);
int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -14,16 +14,16 @@ ...@@ -14,16 +14,16 @@
*/ */
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
tsem_t schdRspSem; tsem_t schdRspSem;
void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) { void schdExecCallback(SExecResult* pResult, void* param, int32_t code) {
if (code) { if (code) {
pResult->code = code; pResult->code = code;
} }
*(SQueryResult*)param = *pResult; *(SExecResult*)param = *pResult;
taosMemoryFree(pResult); taosMemoryFree(pResult);
......
...@@ -13,7 +13,7 @@ ...@@ -13,7 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "query.h" #include "query.h"
#include "catalog.h" #include "catalog.h"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
...@@ -72,6 +72,8 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { ...@@ -72,6 +72,8 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) {
schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED);
return true; return true;
} }
return false;
} }
int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
...@@ -369,7 +371,7 @@ _return: ...@@ -369,7 +371,7 @@ _return:
int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) {
pRes->code = atomic_load_32(&pJob->errCode); pRes->code = atomic_load_32(&pJob->errCode);
pRes->numOfRows = pJob->resNumOfRows; pRes->numOfRows = pJob->resNumOfRows;
pRes->res = pJob->execRes; memcpy(pRes, &pJob->execRes, sizeof(pJob->execRes));
pJob->execRes.res = NULL; pJob->execRes.res = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -406,15 +408,13 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { ...@@ -406,15 +408,13 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) {
} }
int32_t schNotifyUserExecRes(SSchJob* pJob) { int32_t schNotifyUserExecRes(SSchJob* pJob) {
SQueryResult* pRes = taosMemoryCalloc(1, sizeof(SQueryResult)); SExecResult* pRes = taosMemoryCalloc(1, sizeof(SExecResult));
if (pRes) { if (pRes) {
schDumpJobExecRes(pJob, pRes); schDumpJobExecRes(pJob, pRes);
} }
schEndOperation(pJob);
SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.execFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -425,10 +425,8 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) { ...@@ -425,10 +425,8 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) {
schDumpJobFetchRes(pJob, &pRes); schDumpJobFetchRes(pJob, &pRes);
schEndOperation(pJob);
SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode));
(*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode));
SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode)); SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -627,7 +625,7 @@ void schFreeJobImpl(void *job) { ...@@ -627,7 +625,7 @@ void schFreeJobImpl(void *job) {
qDestroyQueryPlan(pJob->pDag); qDestroyQueryPlan(pJob->pDag);
taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->userRes.execRes);
taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob->resData);
taosMemoryFree(pJob); taosMemoryFree(pJob);
...@@ -648,10 +646,14 @@ int32_t schJobFetchRows(SSchJob *pJob) { ...@@ -648,10 +646,14 @@ int32_t schJobFetchRows(SSchJob *pJob) {
if (pJob->opStatus.syncReq) { if (pJob->opStatus.syncReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
tsem_wait(&pJob->rspSem); tsem_wait(&pJob->rspSem);
schPostJobRes(pJob, SCH_OP_FETCH); SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} }
} else { } else {
schPostJobRes(pJob, SCH_OP_FETCH); if (pJob->opStatus.syncReq) {
SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes));
} else {
schPostJobRes(pJob, SCH_OP_FETCH);
}
} }
SCH_RET(code); SCH_RET(code);
...@@ -674,8 +676,6 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { ...@@ -674,8 +676,6 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->chkKillParam = pReq->chkKillParam; pJob->chkKillParam = pReq->chkKillParam;
pJob->userRes.execFp = pReq->execFp; pJob->userRes.execFp = pReq->execFp;
pJob->userRes.cbParam = pReq->cbParam; pJob->userRes.cbParam = pReq->cbParam;
pJob->opStatus.op = SCH_OP_EXEC;
pJob->opStatus.syncReq = pReq->syncReq;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
...@@ -750,22 +750,27 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { ...@@ -750,22 +750,27 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t op = 0;
switch (type) { switch (type) {
case SCH_OP_EXEC: case SCH_OP_EXEC:
int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); /*
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
} }
*/
if (pReq) { if (pReq && pReq->syncReq) {
schDumpJobExecRes(pJob, pReq->pExecRes); schDumpJobExecRes(pJob, pReq->pExecRes);
} }
break; break;
case SCH_OP_FETCH: case SCH_OP_FETCH:
int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); /*
op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL);
if (SCH_OP_NULL == op || op != type) { if (SCH_OP_NULL == op || op != type) {
SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status));
} }
*/
break; break;
case SCH_OP_GET_STATUS: case SCH_OP_GET_STATUS:
errCode = TSDB_CODE_SUCCESS; errCode = TSDB_CODE_SUCCESS;
...@@ -775,7 +780,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int ...@@ -775,7 +780,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int
} }
if (errCode) { if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode);
} }
SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode));
...@@ -846,7 +851,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { ...@@ -846,7 +851,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) {
} }
if (errCode) { if (errCode) {
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode);
} }
if (pJob) { if (pJob) {
...@@ -865,7 +870,6 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_ ...@@ -865,7 +870,6 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
} }
int8_t status = 0;
if (schJobNeedToStop(pJob, &status)) { if (schJobNeedToStop(pJob, &status)) {
SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
...@@ -875,6 +879,9 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_ ...@@ -875,6 +879,9 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_
SCH_LOCK_TASK(pTask); SCH_LOCK_TASK(pTask);
*job = pJob;
*task = pTask;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
...@@ -378,7 +378,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -378,7 +378,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
SSchTask *pTask = NULL; SSchTask *pTask = NULL;
SSchJob *pJob = NULL; SSchJob *pJob = NULL;
SCH_TASK_DLOG("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId));
...@@ -390,7 +390,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { ...@@ -390,7 +390,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
SCH_TASK_DLOG("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode));
SCH_RET(code); SCH_RET(code);
} }
......
...@@ -37,10 +37,10 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { ...@@ -37,10 +37,10 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
case JOB_TASK_STATUS_SUCC: case JOB_TASK_STATUS_SUCC:
break; break;
case JOB_TASK_STATUS_FAIL: case JOB_TASK_STATUS_FAIL:
SCH_RET(schProcessOnJobFailure(pJob, (int32_t)param)); SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
break; break;
case JOB_TASK_STATUS_DROP: case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(schProcessOnJobDropped(pJob, (int32_t)param)); SCH_ERR_JRET(schProcessOnJobDropped(pJob, *(int32_t*)param));
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) { if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId); SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
...@@ -73,14 +73,22 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS ...@@ -73,14 +73,22 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS
SCH_RET(schProcessOnOpBegin(pJob, type, pReq)); SCH_RET(schProcessOnOpBegin(pJob, type, pReq));
} }
void schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) {
int32_t code = errCode;
if (NULL == pJob) { if (NULL == pJob) {
return; SCH_RET(code);
} }
schProcessOnOpEnd(pJob, type, pReq, errCode); schProcessOnOpEnd(pJob, type, pReq, errCode);
if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) {
code = pJob->errCode;
}
schReleaseJob(pJob->refId); schReleaseJob(pJob->refId);
return code;
} }
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
...@@ -226,7 +226,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { ...@@ -226,7 +226,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
if (pTask->level->taskFailed > 0) { if (pTask->level->taskFailed > 0) {
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, 0)); SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, NULL));
} else { } else {
SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL));
} }
...@@ -294,7 +294,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 ...@@ -294,7 +294,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32
if ((pTask->execId + 1) >= pTask->maxExecTimes) { if ((pTask->execId + 1) >= pTask->maxExecTimes) {
SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId);
schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)rspCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&rspCode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
......
...@@ -16,7 +16,7 @@ ...@@ -16,7 +16,7 @@
#include "catalog.h" #include "catalog.h"
#include "command.h" #include "command.h"
#include "query.h" #include "query.h"
#include "schedulerInt.h" #include "schInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tref.h" #include "tref.h"
#include "trpc.h" #include "trpc.h"
...@@ -148,7 +148,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) { ...@@ -148,7 +148,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return; return;
} }
schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)errCode); schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode);
*jobId = 0; *jobId = 0;
} }
......
...@@ -50,7 +50,7 @@ ...@@ -50,7 +50,7 @@
#pragma GCC diagnostic ignored "-Wreturn-type" #pragma GCC diagnostic ignored "-Wreturn-type"
#pragma GCC diagnostic ignored "-Wformat" #pragma GCC diagnostic ignored "-Wformat"
#include "schedulerInt.h" #include "schInt.h"
#include "stub.h" #include "stub.h"
#include "tref.h" #include "tref.h"
...@@ -87,7 +87,7 @@ void schtInitLogFile() { ...@@ -87,7 +87,7 @@ void schtInitLogFile() {
} }
void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) { void schtQueryCb(SExecResult* pResult, void* param, int32_t code) {
assert(TSDB_CODE_SUCCESS == code); assert(TSDB_CODE_SUCCESS == code);
*(int32_t*)param = 1; *(int32_t*)param = 1;
} }
...@@ -585,7 +585,10 @@ void* schtRunJobThread(void *aa) { ...@@ -585,7 +585,10 @@ void* schtRunJobThread(void *aa) {
atomic_store_32(&schtStartFetch, 1); atomic_store_32(&schtStartFetch, 1);
void *data = NULL; void *data = NULL;
code = schedulerFetchRows(queryJobRefId, &data); req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(queryJobRefId, &req);
assert(code == 0 || code); assert(code == 0 || code);
if (0 == code) { if (0 == code) {
...@@ -595,7 +598,7 @@ void* schtRunJobThread(void *aa) { ...@@ -595,7 +598,7 @@ void* schtRunJobThread(void *aa) {
} }
data = NULL; data = NULL;
code = schedulerFetchRows(queryJobRefId, &data); code = schedulerFetchRows(queryJobRefId, &req);
assert(code == 0 || code); assert(code == 0 || code);
schtFreeQueryJob(0); schtFreeQueryJob(0);
...@@ -710,7 +713,10 @@ TEST(queryTest, normalCase) { ...@@ -710,7 +713,10 @@ TEST(queryTest, normalCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL; void *data = NULL;
code = schedulerFetchRows(job, &data); req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
...@@ -719,7 +725,7 @@ TEST(queryTest, normalCase) { ...@@ -719,7 +725,7 @@ TEST(queryTest, normalCase) {
taosMemoryFreeClear(data); taosMemoryFreeClear(data);
data = NULL; data = NULL;
code = schedulerFetchRows(job, &data); code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL); ASSERT_TRUE(data == NULL);
...@@ -814,7 +820,9 @@ TEST(queryTest, readyFirstCase) { ...@@ -814,7 +820,9 @@ TEST(queryTest, readyFirstCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL; void *data = NULL;
code = schedulerFetchRows(job, &data); req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
...@@ -823,7 +831,7 @@ TEST(queryTest, readyFirstCase) { ...@@ -823,7 +831,7 @@ TEST(queryTest, readyFirstCase) {
taosMemoryFreeClear(data); taosMemoryFreeClear(data);
data = NULL; data = NULL;
code = schedulerFetchRows(job, &data); code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL); ASSERT_TRUE(data == NULL);
...@@ -926,7 +934,9 @@ TEST(queryTest, flowCtrlCase) { ...@@ -926,7 +934,9 @@ TEST(queryTest, flowCtrlCase) {
taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL; void *data = NULL;
code = schedulerFetchRows(job, &data); req.syncReq = true;
req.pFetchRes = &data;
code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
...@@ -935,7 +945,7 @@ TEST(queryTest, flowCtrlCase) { ...@@ -935,7 +945,7 @@ TEST(queryTest, flowCtrlCase) {
taosMemoryFreeClear(data); taosMemoryFreeClear(data);
data = NULL; data = NULL;
code = schedulerFetchRows(job, &data); code = schedulerFetchRows(job, &req);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL); ASSERT_TRUE(data == NULL);
...@@ -979,7 +989,7 @@ TEST(insertTest, normalCase) { ...@@ -979,7 +989,7 @@ TEST(insertTest, normalCase) {
TdThread thread1; TdThread thread1;
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
SQueryResult res = {0}; SExecResult res = {0};
SRequestConnInfo conn = {0}; SRequestConnInfo conn = {0};
conn.pTrans = mockPointer; conn.pTrans = mockPointer;
...@@ -991,7 +1001,7 @@ TEST(insertTest, normalCase) { ...@@ -991,7 +1001,7 @@ TEST(insertTest, normalCase) {
req.execFp = schtQueryCb; req.execFp = schtQueryCb;
req.cbParam = NULL; req.cbParam = NULL;
code = schedulerExecJob(&req, &insertJobRefId, &res); code = schedulerExecJob(&req, &insertJobRefId);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(res.numOfRows, 20); ASSERT_EQ(res.numOfRows, 20);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册