diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index a72489f338908e8396776b5fba85d8738005b6b7..0d32cce20b6e489249fa79080e6144754c17218b 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -23,6 +23,8 @@ extern "C" { #include "catalog.h" #include "planner.h" +extern tsem_t schdRspSem; + typedef struct SSchedulerCfg { uint32_t maxJobNum; int32_t maxNodeTableNum; @@ -62,6 +64,11 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; +typedef struct SSchdFetchParam { + void **pData; + int32_t* code; +} SSchdFetchParam; + typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code); typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); @@ -113,23 +120,8 @@ void schedulerFreeJob(int64_t job); void schedulerDestroy(void); -/** - * convert dag to task list - * @param pDag - * @param pTasks SArray** - * @return - */ -int32_t schedulerConvertDagToTaskList(SQueryPlan* pDag, SArray **pTasks); - -/** - * make one task info's multiple copies - * @param src - * @param dst SArray** - * @return - */ -int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum); - -void schedulerFreeTaskList(SArray *taskList); +void schdExecCallback(SQueryResult* pResult, void* param, int32_t code); +void schdFetchCallback(void* pResult, void* param, int32_t code); #ifdef __cplusplus diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index f6ba72db521aec60b1ab02b806bf90d7f1e4c2e6..96adba9cfc10ef06077e74a44ffccc490bd635b1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -289,6 +289,52 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { pResInfo->precision = precision; } +int32_t scheduleAsyncQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { + void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; + + tsem_init(&schdRspSem, 0, 0); + + SQueryResult res = {.code = 0, .numOfRows = 0}; + int32_t code = schedulerAsyncExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, + pRequest->metric.start, schdExecCallback, &res); + while (true) { + if (code != TSDB_CODE_SUCCESS) { + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); + } + + *pRes = res.res; + + pRequest->code = code; + terrno = code; + return pRequest->code; + } else { + tsem_wait(&schdRspSem); + + if (res.code) { + code = res.code; + } else { + break; + } + } + } + + if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { + pRequest->body.resInfo.numOfRows = res.numOfRows; + + if (pRequest->body.queryJob != 0) { + schedulerFreeJob(pRequest->body.queryJob); + } + } + + *pRes = res.res; + + pRequest->code = res.code; + terrno = res.code; + return pRequest->code; +} + + int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; @@ -796,7 +842,58 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo) { } } +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { + assert(pRequest != NULL); + + SReqResultInfo* pResultInfo = &pRequest->body.resInfo; + if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { + // All data has returned to App already, no need to try again + if (pResultInfo->completed) { + pResultInfo->numOfRows = 0; + return NULL; + } + + tsem_init(&schdRspSem, 0, 0); + + SReqResultInfo* pResInfo = &pRequest->body.resInfo; + SSchdFetchParam param = {.pData = (void**)&pResInfo->pData, .code = &pRequest->code}; + pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, schdFetchCallback, ¶m); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + return NULL; + } + + tsem_wait(&schdRspSem); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + return NULL; + } + + pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData, convertUcs4); + if (pRequest->code != TSDB_CODE_SUCCESS) { + pResultInfo->numOfRows = 0; + return NULL; + } + + tscDebug("0x%" PRIx64 " fetch results, numOfRows:%d total Rows:%" PRId64 ", complete:%d, reqId:0x%" PRIx64, + pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId); + + if (pResultInfo->numOfRows == 0) { + return NULL; + } + } + + if (setupOneRowPtr) { + doSetOneRowPtr(pResultInfo); + pResultInfo->current += 1; + } + + return pResultInfo->row; +} + + void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { + //return doAsyncFetchRows(pRequest, setupOneRowPtr, convertUcs4); assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b26aeb35bd6f1108db9a5ba2ec5c0d3c43b18cc9..73abe6cd9d0e90299c6d32c3145ba31134379559 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2791,6 +2791,7 @@ static int32_t buildRollupAst(STranslateContext* pCxt, SCreateTableStmt* pStmt, int32_t code = getDBCfg(pCxt, pStmt->dbName, &dbCfg); int32_t num = taosArrayGetSize(dbCfg.pRetensions); if (TSDB_CODE_SUCCESS != code || num < 2) { + taosArrayDestroy(dbCfg.pRetensions); return code; } for (int32_t i = 1; i < num; ++i) { diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c new file mode 100644 index 0000000000000000000000000000000000000000..4b5f74114d2ae7d4ec47b09f8a48da2f3f61de8d --- /dev/null +++ b/source/libs/scheduler/src/schDbg.c @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "query.h" +#include "schedulerInt.h" + +tsem_t schdRspSem; + +void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) { + if (code) { + pResult->code = code; + } + + *(SQueryResult*)param = *pResult; + + taosMemoryFree(pResult); + + tsem_post(&schdRspSem); +} + +void schdFetchCallback(void* pResult, void* param, int32_t code) { + SSchdFetchParam* fParam = (SSchdFetchParam*)param; + + *fParam->pData = pResult; + *fParam->code = code; + + tsem_post(&schdRspSem); +} + + diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d64c944994bbcb2817bb711b768242d02db71934..af249334b76f47604d4c38e76e40cc3cb582f1ed 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -856,7 +856,12 @@ _return: void schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - tsem_post(&job->rspSem); + + if (job->attr.syncSchedule) { + tsem_post(&job->rspSem); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&job->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(job); + } } // Note: no more task error processing, handled in function internal diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 3862ba76f61c372d0287a8d75a7986a31ee02fd7..cec754bdcdc76a83bf637448c4303037b8b74447 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -66,6 +66,7 @@ void schFreeRpcCtxVal(const void *arg) { SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg; taosMemoryFreeClear(pMsgSendInfo->param); + taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData); taosMemoryFreeClear(pMsgSendInfo); } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 8d802980ea2e9bdf16e6cfc7e22fe217d8791743..3ecc4f4a301fa3a36b17a1d920bcf1c6352507b1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -124,7 +124,7 @@ int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* pa pJob->userRes.fetchFp = fp; pJob->userRes.userParam = param; - code = schFetchRows(pJob); + code = schAsyncFetchRows(pJob); schReleaseJob(job);