diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 91421a9174f4428fb28d3e40e57a30ea3791501e..da70f2149896e68310b0417ed1fbf9facc57e163 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -150,7 +150,7 @@ struct SQueryNode; * @param requestId * @return */ -int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId); +int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag** pDag, uint64_t requestId); // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 74b7813465659479edd343238b45a5b9917b18ee..ceb54eabef1bc41d2e8a81324b6f5905e57ba5b2 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -20,8 +20,10 @@ extern "C" { #endif -#include "planner.h" #include "catalog.h" +#include "planner.h" + +struct SSchJob; typedef struct SSchedulerCfg { uint32_t maxJobNum; @@ -65,7 +67,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes); +int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes); /** * Process the query job, generated according to the query physical plan. @@ -73,7 +75,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void * @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob); +int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob); /** * Fetch query result from the remote query executor @@ -81,7 +83,7 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, * @param data * @return */ -int32_t scheduleFetchRows(void *pJob, void **data); +int32_t scheduleFetchRows(struct SSchJob *pJob, void **data); /** diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 26afe237c9f4e42c1b1b7b16fb34d50942f2852e..7292375e7fd1f839f309c49c772438eb66e7ce97 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -113,6 +113,7 @@ typedef struct SRequestSendRecvBody { tsem_t rspSem; // not used now void* fp; SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed. + struct SSchJob *pQueryJob; // query job, created according to sql query DAG. SDataBuf requestMsg; SReqResultInfo resInfo; } SRequestSendRecvBody; @@ -129,7 +130,7 @@ typedef struct SRequestObj { char *msgBuf; void *pInfo; // sql parse info, generated by parser module int32_t code; - uint64_t affectedRows; + uint64_t affectedRows; // todo remove it SQueryExecMetric metric; SRequestSendRecvBody body; } SRequestObj; @@ -161,12 +162,13 @@ int taos_options_imp(TSDB_OPTION option, const char *str); void* openTransporter(const char *user, const char *auth, int32_t numOfThreads); void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet); + void initMsgHandleFp(); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port); -TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); void *doFetchRow(SRequestObj* pRequest); + void setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); #ifdef __cplusplus diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 26f97cc6ef1c28061ab2edcc120773a6b81748b5..14cb35355fa4a2d18effaebc1543419e6215975a 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1,6 +1,10 @@ +#include "../../libs/scheduler/inc/schedulerInt.h" #include "clientInt.h" #include "clientLog.h" +#include "parser.h" +#include "planner.h" +#include "scheduler.h" #include "tdef.h" #include "tep.h" #include "tglobal.h" @@ -8,9 +12,6 @@ #include "tnote.h" #include "tpagedfile.h" #include "tref.h" -#include "parser.h" -#include "planner.h" -#include "scheduler.h" #define CHECK_CODE_GOTO(expr, lable) \ do { \ @@ -57,6 +58,7 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i } static STscObj* taosConnectImpl(const char *ip, const char *user, const char *auth, const char *db, uint16_t port, __taos_async_fn_t fp, void *param, SAppInstInfo* pAppInfo); +static void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema); TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, const char *auth, const char *db, uint16_t port) { if (taos_init() != TSDB_CODE_SUCCESS) { @@ -198,36 +200,48 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) { int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag) { pRequest->type = pQueryNode->type; - SSchema *pSchema = NULL; SReqResultInfo* pResInfo = &pRequest->body.resInfo; - - int32_t code = qCreateQueryDag(pQueryNode, pDag, &pSchema, &pResInfo->numOfCols, pRequest->requestId); + int32_t code = qCreateQueryDag(pQueryNode, pDag, pRequest->requestId); if (code != 0) { return code; } if (pQueryNode->type == TSDB_SQL_SELECT) { - pResInfo->fields = calloc(1, sizeof(TAOS_FIELD)); - for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { - pResInfo->fields[i].bytes = pSchema[i].bytes; - pResInfo->fields[i].type = pSchema[i].type; - tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); - } + SArray* pa = taosArrayGetP((*pDag)->pSubplans, 0); + + SSubplan* pPlan = taosArrayGetP(pa, 0); + SDataBlockSchema* pDataBlockSchema = &(pPlan->pDataSink->schema); + setResSchemaInfo(pResInfo, pDataBlockSchema); + pRequest->type = TDMT_VND_QUERY; } return code; } -int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { +void setResSchemaInfo(SReqResultInfo* pResInfo, const SDataBlockSchema* pDataBlockSchema) { + assert(pDataBlockSchema != NULL && pDataBlockSchema->numOfCols > 0); + + pResInfo->numOfCols = pDataBlockSchema->numOfCols; + pResInfo->fields = calloc(pDataBlockSchema->numOfCols, sizeof(pDataBlockSchema->pSchema[0])); + + for (int32_t i = 0; i < pResInfo->numOfCols; ++i) { + SSchema* pSchema = &pDataBlockSchema->pSchema[i]; + pResInfo->fields[i].bytes = pSchema->bytes; + pResInfo->fields[i].type = pSchema->type; + tstrncpy(pResInfo->fields[i].name, pSchema[i].name, tListLen(pResInfo->fields[i].name)); + } +} + +int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) { if (TSDB_SQL_INSERT == pRequest->type || TSDB_SQL_CREATE_TABLE == pRequest->type) { SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; - int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, pJob, &res); + int32_t code = scheduleExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob, &res); if (code != TSDB_CODE_SUCCESS) { // handle error and retry } else { - if (*pJob != NULL) { - scheduleFreeJob(*pJob); + if (pRequest->body.pQueryJob != NULL) { + scheduleFreeJob(pRequest->body.pQueryJob); } } @@ -235,7 +249,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { return res.code; } - return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL /*todo appInfo.xxx*/, pDag, pJob); + return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); } TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { @@ -312,7 +326,6 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { SRequestObj *pRequest = NULL; SQueryNode *pQuery = NULL; SQueryDag *pDag = NULL; - void *pJob = NULL; terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); @@ -322,9 +335,8 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { CHECK_CODE_GOTO(execDdlQuery(pRequest, pQuery), _return); } else { CHECK_CODE_GOTO(getPlan(pRequest, pQuery, &pDag), _return); - CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag, &pJob), _return); + CHECK_CODE_GOTO(scheduleQuery(pRequest, pDag), _return); pRequest->code = terrno; - return pRequest; } _return: @@ -333,6 +345,7 @@ _return: if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno; } + return pRequest; } @@ -531,7 +544,10 @@ void* doFetchRow(SRequestObj* pRequest) { SReqResultInfo* pResultInfo = &pRequest->body.resInfo; if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { - if (pRequest->type == TDMT_MND_SHOW) { + if (pRequest->type == TDMT_VND_QUERY) { + pRequest->type = TDMT_VND_FETCH; + scheduleFetchRows(pRequest->body.pQueryJob, &pRequest->body.resInfo.pData); + } else if (pRequest->type == TDMT_MND_SHOW) { pRequest->type = TDMT_MND_SHOW_RETRIEVE; } else if (pRequest->type == TDMT_VND_SHOW_TABLES) { pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index af5ad874f80021c6673e8e647723d72428e6fef0..d507c565df07beaac5102b9553a3b7da71733b47 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -568,7 +568,7 @@ TEST(testCase, projection_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); // pRes = taos_query(pConn, "create table m1 (ts timestamp, k int) tags(a int)"); -// taos_free_result(pRes); + taos_free_result(pRes); // // pRes = taos_query(pConn, "create table tu using m1 tags(1)"); // taos_free_result(pRes); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 7ef434ccf5e1129b2d660a6680d92fc1e74151fa..01fb3c25130b34086427976ed51f86c25290b1f0 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -89,7 +89,7 @@ static bool copySchema(SDataBlockSchema* dst, const SDataBlockSchema* src) { static bool toDataBlockSchema(SQueryPlanNode* pPlanNode, SDataBlockSchema* dataBlockSchema) { dataBlockSchema->numOfCols = pPlanNode->numOfExpr; - dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfCols); + dataBlockSchema->pSchema = malloc(sizeof(SSlotSchema) * pPlanNode->numOfExpr); if (NULL == dataBlockSchema->pSchema) { return false; } @@ -306,8 +306,6 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { case QNODE_TABLESCAN: node = createTableScanNode(pCxt, pPlanNode); break; - case QNODE_PROJECT: -// node = create case QNODE_MODIFY: // Insert is not an operator in a physical plan. break; diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index de3454ddbbf4b063b7eb49c283950cd25fb9cb55..3047ef4f5ac5b8cf6eba9f3036aee44f55021d9e 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -56,7 +56,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) { tfree(pDag); } -int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, SSchema** pSchema, uint32_t* numOfResCols, uint64_t requestId) { +int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, uint64_t requestId) { SQueryPlanNode* pLogicPlan; int32_t code = createQueryPlan(pNode, &pLogicPlan); if (TSDB_CODE_SUCCESS != code) { @@ -70,15 +70,6 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag, printf("%s\n", str); } - int32_t numOfOutput = pLogicPlan->numOfExpr; - *pSchema = calloc(numOfOutput, sizeof(SSchema)); - *numOfResCols = numOfOutput; - - for(int32_t i = 0; i < numOfOutput; ++i) { - SExprInfo* pExprInfo = taosArrayGetP(pLogicPlan->pExpr, i); - memcpy(&(*pSchema)[i], pExprInfo->pExpr->pSchema, sizeof(SSchema)); - } - code = optimizeQueryPlan(pLogicPlan); if (TSDB_CODE_SUCCESS != code) { destroyQueryPlan(pLogicPlan); diff --git a/source/libs/planner/test/phyPlanTests.cpp b/source/libs/planner/test/phyPlanTests.cpp index 0d3820cc77f421717a34af326664cc7bd8db8aee..2733a73a3f0dc4f6384009a91956a567ecbee35d 100644 --- a/source/libs/planner/test/phyPlanTests.cpp +++ b/source/libs/planner/test/phyPlanTests.cpp @@ -65,7 +65,7 @@ protected: SSchema *schema = NULL; uint32_t numOfOutput = 0; - code = qCreateQueryDag(query, &dag, &schema, &numOfOutput, requestId); + code = qCreateQueryDag(query, &dag, requestId); dag_.reset(dag); return code; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index ada9b247cef96fa56780d95e725084e93ccfa4b1..b2115b5c6d1bbca67d0e55ba080a179b004ad640 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -58,7 +58,6 @@ typedef struct SSchLevel { SArray *subTasks; // Element is SQueryTask } SSchLevel; - typedef struct SSchTask { uint64_t taskId; // task id SSchLevel *level; // level diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f9406a04b6d895a53de193aa48ad1c8485c55ffa..2ce555525f2a1fd67c8377389b825ddbbb789478 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -606,8 +606,8 @@ int32_t schProcessRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *ms case TDMT_VND_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - if (rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + if (rspCode != TSDB_CODE_SUCCESS || rsp->code != TSDB_CODE_SUCCESS || NULL == msg) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); } else { code = schBuildAndSendMsg(job, task, TDMT_VND_RES_READY); if (code) { @@ -986,7 +986,7 @@ void schDropJobAllTasks(SSchJob *job) { } } -int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, void** job, bool syncSchedule) { +int32_t schExecJobImpl(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** job, bool syncSchedule) { if (nodeList && taosArrayGetSize(nodeList) <= 0) { qInfo("QID:%"PRIx64" input nodeList is empty", pDag->queryId); } @@ -1092,7 +1092,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { +int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob, SQueryResult *pRes) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1107,7 +1107,7 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void return TSDB_CODE_SUCCESS; } -int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob) { +int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, struct SSchJob** pJob) { if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1116,53 +1116,51 @@ int32_t scheduleAsyncExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, } -int32_t scheduleFetchRows(void *pJob, void **data) { - if (NULL == pJob || NULL == data) { +int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { + if (NULL == pJob || NULL == pData) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SSchJob *job = pJob; int32_t code = 0; - - if (!SCH_JOB_NEED_FETCH(&job->attr)) { + if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { qError("no need to fetch data"); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (job->status == JOB_TASK_STATUS_FAILED) { - job->res = NULL; - SCH_RET(atomic_load_32(&job->errCode)); + if (pJob->status == JOB_TASK_STATUS_FAILED) { + pJob->res = NULL; + SCH_RET(atomic_load_32(&pJob->errCode)); } - if (job->status == JOB_TASK_STATUS_SUCCEED) { - job->res = NULL; + if (pJob->status == JOB_TASK_STATUS_SUCCEED) { + pJob->res = NULL; return TSDB_CODE_SUCCESS; } - if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { + if (atomic_val_compare_exchange_32(&pJob->userFetch, 0, 1) != 0) { qError("prior fetching not finished"); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_ERR_JRET(schFetchFromRemote(job)); + if (pJob->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); } - tsem_wait(&job->rspSem); + tsem_wait(&pJob->rspSem); - if (job->status == JOB_TASK_STATUS_FAILED) { - code = atomic_load_32(&job->errCode); + if (pJob->status == JOB_TASK_STATUS_FAILED) { + code = atomic_load_32(&pJob->errCode); } - if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { - job->status = JOB_TASK_STATUS_SUCCEED; + if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { + pJob->status = JOB_TASK_STATUS_SUCCEED; } - *data = job->res; - job->res = NULL; + *pData = pJob->res; + pJob->res = NULL; _return: - atomic_val_compare_exchange_32(&job->userFetch, 1, 0); + atomic_val_compare_exchange_32(&pJob->userFetch, 1, 0); SCH_RET(code); }