diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index dcd058a293f0a35080335b30b38e32a792c43a74..a72489f338908e8396776b5fba85d8738005b6b7 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -54,8 +54,6 @@ typedef struct SQueryProfileSummary { typedef struct SQueryResult { int32_t code; uint64_t numOfRows; - int32_t msgSize; - char *msg; void *res; } SQueryResult; @@ -64,6 +62,10 @@ typedef struct STaskInfo { SSubQueryMsg *msg; } STaskInfo; +typedef void (*schedulerExecCallback)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerFetchCallback)(void* pResult, void* param, int32_t code); + + int32_t schedulerInit(SSchedulerCfg *cfg); /** @@ -80,7 +82,8 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in * @param pNodeList Qnode/Vnode address list, element is SQueryNodeAddr * @return */ -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pDag, const char* sql, int64_t *pJob); + int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, schedulerExecCallback fp, void* param); /** * Fetch query result from the remote query executor @@ -90,6 +93,8 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan* pD */ int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); + int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index c288b5d2f88828ad8559f0ba15a8bd5f4605bed5..a9c5cd06f668ba625dee6d13c44261ef2badf8bb 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -140,7 +140,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { STscObj *pTscObj = (STscObj *)acquireTscObj(pRsp->connKey.tscRid); if (NULL == pTscObj) { tscDebug("tscObj rid %" PRIx64 " not exist", pRsp->connKey.tscRid); - } else { + } else { if (pRsp->query->totalDnodes > 1 && !isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pRsp->query->epSet)) { updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pRsp->query->epSet); } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index daa5887127ae5df63fe600b59b9ef5c8da7a592a..f6ba72db521aec60b1ab02b806bf90d7f1e4c2e6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -292,7 +292,7 @@ void setResPrecision(SReqResultInfo* pResInfo, int32_t precision) { int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList, void** pRes) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf}; + SQueryResult res = {.code = 0, .numOfRows = 0}; int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 4afebf9951db2a895ef4b8c728e7b99fe979ce35..bbb8983713aa9933fada53326d8ad249eb6b3472 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -540,12 +540,6 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) { CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == metaCache) { - qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES); if (code) { if (HASH_NODE_EXIST(code)) { diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index fd79767cc98bab3e2a6070df07391fcf011ac12a..ad36e06117ae3929ab9efe30a9eb87317dd7e086 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1078,7 +1078,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { int32_t tbNum = 0; char tbFName[TSDB_TABLE_FNAME_LEN]; bool autoCreateTbl = false; - STableMeta* pMeta = NULL; // for each table while (1) { @@ -1141,12 +1140,10 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { CHECK_CODE(getDataBlockFromList(pCxt->pTableBlockHashObj, tbFName, strlen(tbFName), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pCxt->pTableMeta).rowSize, pCxt->pTableMeta, &dataBuf, NULL, &pCxt->createTblReq)); - pMeta = pCxt->pTableMeta; - pCxt->pTableMeta = NULL; if (TK_NK_LP == sToken.type) { // pSql -> field1_name, ...) - CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pMeta))); + CHECK_CODE(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta))); NEXT_TOKEN(pCxt->pSql, sToken); } @@ -1182,7 +1179,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } memcpy(tags, &pCxt->tags, sizeof(pCxt->tags)); - (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, + (*pCxt->pStmtCb->setInfoFn)(pCxt->pStmtCb->pStmt, pCxt->pTableMeta, tags, tbFName, autoCreateTbl, pCxt->pVgroupsHashObj, pCxt->pTableBlockHashObj); memset(&pCxt->tags, 0, sizeof(pCxt->tags)); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index ffac0f856dbebf7b04b44c0cdfad8df75021e650..6599d00f58d530595435618e9409344970ee531a 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -39,9 +39,14 @@ enum { SCH_WRITE, }; +enum { + SCH_EXEC_CB = 1, + SCH_FETCH_CB, +}; + typedef struct SSchTrans { - void *transInst; - void *transHandle; + void *pTrans; + void *pHandle; } SSchTrans; typedef struct SSchHbTrans { @@ -74,12 +79,19 @@ typedef struct SSchJobStat { } SSchJobStat; -typedef struct SSchedulerStat { +typedef struct SSchStat { SSchApiStat api; SSchRuntimeStat runtime; SSchJobStat job; -} SSchedulerStat; +} SSchStat; +typedef struct SSchResInfo { + SQueryResult* queryRes; + void** fetchRes; + schedulerExecCallback execFp; + schedulerFetchCallback fetchFp; + void* userParam; +} SSchResInfo; typedef struct SSchedulerMgmt { uint64_t taskId; // sequential taksId @@ -89,7 +101,7 @@ typedef struct SSchedulerMgmt { bool exit; int32_t jobRef; int32_t jobNum; - SSchedulerStat stat; + SSchStat stat; SHashObj *hbConnections; } SSchedulerMgmt; @@ -108,7 +120,7 @@ typedef struct SSchTaskCallbackParam { typedef struct SSchHbCallbackParam { SSchCallbackParamHeader head; SQueryNodeEpId nodeEpId; - void *transport; + void *pTrans; } SSchHbCallbackParam; typedef struct SSchFlowControl { @@ -170,7 +182,7 @@ typedef struct SSchJob { SSchJobAttr attr; int32_t levelNum; int32_t taskNum; - void *transport; + void *pTrans; SArray *nodeList; // qnode/vnode list, SArray SArray *levels; // starting from 0. SArray SNodeList *subPlans; // subplan pointer copied from DAG, no need to free it in scheduler @@ -191,12 +203,13 @@ typedef struct SSchJob { int32_t remoteFetch; SSchTask *fetchTask; int32_t errCode; - SArray *errList; // SArray SRWLatch resLock; void *queryRes; void *resData; //TODO free it or not int32_t resNumOfRows; + SSchResInfo userRes; const char *sql; + int32_t userCb; SQueryProfileSummary summary; } SSchJob; @@ -292,15 +305,21 @@ int32_t schUpdateTaskExecNodeHandle(SSchTask *pTask, void *handle, int32_t rspCo void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); int32_t schRecordTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, void *handle); -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule); -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync); +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync); +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync); int32_t schChkUpdateJobStatus(SSchJob *pJob, int8_t newStatus); int32_t schCancelJob(SSchJob *pJob); int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode); uint64_t schGenTaskId(void); void schCloseJobRef(void); +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes); +int32_t schFetchRows(SSchJob *pJob); +int32_t schAsyncFetchRows(SSchJob *pJob); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 14f46463971fd103b1cf79a1c6bddd9ee0efa85c..d64c944994bbcb2817bb711b768242d02db71934 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -39,8 +39,8 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * return TSDB_CODE_SUCCESS; } -int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql, - int64_t startTs, bool syncSchedule) { +int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *pTrans, SArray *pNodeList, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool syncSchedule) { int32_t code = 0; int64_t refId = -1; SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); @@ -51,9 +51,12 @@ int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray pJob->attr.explainMode = pDag->explainInfo.mode; pJob->attr.syncSchedule = syncSchedule; - pJob->transport = transport; + pJob->pTrans = pTrans; pJob->sql = sql; - + if (pRes) { + pJob->userRes = *pRes; + } + if (pNodeList != NULL) { pJob->nodeList = taosArrayDup(pNodeList); } @@ -458,6 +461,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { SCH_ERR_JRET(schBuildTaskRalation(pJob, planToTask)); _return: + if (planToTask) { taosHashCleanup(planToTask); } @@ -727,6 +731,69 @@ _return: SCH_JOB_DLOG("job errCode updated to %x - %s", errCode, tstrerror(errCode)); } + +int32_t schSetJobQueryRes(SSchJob* pJob, SQueryResult* pRes) { + pRes->code = atomic_load_32(&pJob->errCode); + pRes->numOfRows = pJob->resNumOfRows; + pRes->res = pJob->queryRes; + pJob->queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schSetJobFetchRes(SSchJob* pJob, void** pData) { + int32_t code = 0; + if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { + SCH_ERR_RET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); + } + + while (true) { + *pData = atomic_load_ptr(&pJob->resData); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { + continue; + } + + break; + } + + if (NULL == *pData) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); + if (rsp) { + rsp->completed = 1; + } + + *pData = rsp; + SCH_JOB_DLOG("empty res and set query complete, code:%x", code); + } + + SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserQueryRes(SSchJob* pJob) { + pJob->userRes.queryRes = taosMemoryCalloc(1, sizeof(*pJob->userRes.queryRes)); + if (pJob->userRes.queryRes) { + schSetJobQueryRes(pJob, pJob->userRes.queryRes); + } + + (*pJob->userRes.execFp)(pJob->userRes.queryRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + pJob->userRes.queryRes = NULL; + + return TSDB_CODE_SUCCESS; +} + +int32_t schNotifyUserFetchRes(SSchJob* pJob) { + void* pRes = NULL; + + SCH_ERR_RET(schSetJobFetchRes(pJob, &pRes)); + + (*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + + return TSDB_CODE_SUCCESS; +} + int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing SCH_ERR_RET(schChkUpdateJobStatus(pJob, status)); @@ -741,6 +808,14 @@ int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCod SCH_JOB_DLOG("job failed with error: %s", tstrerror(code)); + if (!pJob->attr.syncSchedule) { + if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); + } + } + SCH_RET(code); } @@ -762,6 +837,10 @@ int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { if (pJob->attr.syncSchedule) { tsem_post(&pJob->rspSem); + } else if (SCH_EXEC_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_EXEC_CB, 0)) { + schNotifyUserQueryRes(pJob); + } else if (SCH_FETCH_CB == atomic_val_compare_exchange_32(&pJob->userCb, SCH_FETCH_CB, 0)) { + schNotifyUserFetchRes(pJob); } if (atomic_load_8(&pJob->userFetch)) { @@ -1218,6 +1297,7 @@ void schFreeJobImpl(void *job) { tFreeSSubmitRsp((SSubmitRsp*)pJob->queryRes); } + taosMemoryFreeClear(pJob->userRes.queryRes); taosMemoryFreeClear(pJob->resData); taosMemoryFreeClear(pJob); @@ -1228,8 +1308,8 @@ void schFreeJobImpl(void *job) { schCloseJobRef(); } -int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - int64_t startTs, bool sync) { +int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, int64_t startTs, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { @@ -1238,31 +1318,68 @@ int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int int32_t code = 0; SSchJob *pJob = NULL; - SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, sync)); - - SCH_ERR_JRET(schLaunchJob(pJob)); + SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); *job = pJob->refId; + SCH_ERR_JRET(schLaunchJob(pJob)); + if (sync) { SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); + } else { + pJob->userCb = SCH_EXEC_CB; } SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); +_return: + schReleaseJob(pJob->refId); + + SCH_RET(code); +} - return TSDB_CODE_SUCCESS; +int32_t schExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_JRET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, NULL, true)); + } else { + SCH_ERR_JRET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, NULL, startTs, true)); + } _return: - schFreeJobImpl(pJob); - SCH_RET(code); + if (*pJob) { + SSchJob *job = schAcquireJob(*pJob); + schSetJobQueryRes(job, pRes->queryRes); + schReleaseJob(*pJob); + } + + return code; +} + +int32_t schAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, SSchResInfo *pRes) { + int32_t code = 0; + + *pJob = 0; + + if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { + SCH_ERR_RET(schExecStaticExplainJob(pTrans, pNodeList, pDag, pJob, sql, pRes, false)); + } else { + SCH_ERR_RET(schExecJobImpl(pTrans, pNodeList, pDag, pJob, sql, pRes, startTs, false)); + } + + return code; } -int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, - bool syncSchedule) { +int32_t schExecStaticExplainJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, + SSchResInfo *pRes, bool sync) { qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); int32_t code = 0; @@ -1274,10 +1391,14 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa pJob->sql = sql; pJob->attr.queryJob = true; + pJob->attr.syncSchedule = sync; pJob->attr.explainMode = pDag->explainInfo.mode; pJob->queryId = pDag->queryId; pJob->subPlans = pDag->pSubplans; - + if (pRes) { + pJob->userRes = *pRes; + } + SCH_ERR_JRET(qExecStaticExplain(pDag, (SRetrieveTableRsp **)&pJob->resData)); int64_t refId = taosAddRef(schMgmt.jobRef, pJob); @@ -1288,7 +1409,7 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa if (NULL == schAcquireJob(refId)) { SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId); - SCH_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } pJob->refId = refId; @@ -1296,12 +1417,17 @@ int32_t schExecStaticExplain(void *transport, SArray *pNodeList, SQueryPlan *pDa SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId); pJob->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + *job = pJob->refId; SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + if (!pJob->attr.syncSchedule) { + code = schNotifyUserQueryRes(pJob); + } + schReleaseJob(pJob->refId); - return TSDB_CODE_SUCCESS; + SCH_RET(code); _return: @@ -1309,4 +1435,103 @@ _return: SCH_RET(code); } +int32_t schFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { + SCH_ERR_JRET(schFetchFromRemote(pJob)); + tsem_wait(&pJob->rspSem); + + status = SCH_GET_JOB_STATUS(pJob); + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } + } + + SCH_ERR_JRET(schSetJobFetchRes(pJob, pJob->userRes.fetchRes)); + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + +int32_t schAsyncFetchRows(SSchJob *pJob) { + int32_t code = 0; + + int8_t status = SCH_GET_JOB_STATUS(pJob); + if (status == JOB_TASK_STATUS_DROPPING) { + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (!SCH_JOB_NEED_FETCH(pJob)) { + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { + SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); + } else if (status == JOB_TASK_STATUS_SUCCEED) { + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); + goto _return; + } else if (status != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); + } + + if (pJob->attr.explainMode == EXPLAIN_MODE_STATIC) { + SCH_ERR_JRET(schNotifyUserFetchRes(pJob)); + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + } else { + pJob->userCb = SCH_FETCH_CB; + + SCH_ERR_JRET(schFetchFromRemote(pJob)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + + SCH_RET(code); +} + diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 3996771443fe0c4a8d43e781077c6b15b8d72f19..947a4fd1f27d8c663e4a017339f96e9fd1973a82 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -396,6 +396,7 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); _return: + if (pJob) { schReleaseJob(pParam->refId); } @@ -450,7 +451,7 @@ int32_t schHandleLinkBrokenCallback(void *param, const SDataBuf *pMsg, int32_t c if (head->isHbParam) { SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; - SSchTrans trans = {.transInst = hbParam->transport, .transHandle = NULL}; + SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL}; SCH_ERR_RET(schUpdateHbConnection(&hbParam->nodeEpId, &trans)); SCH_ERR_RET(schBuildAndSendHbMsg(&hbParam->nodeEpId)); @@ -481,7 +482,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, int32_t msgType, param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; msgSendInfo->param = param; msgSendInfo->fp = fp; @@ -556,7 +557,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->nodeEpId.nodeId = addr->nodeId; memcpy(¶m->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); - param->transport = pJob->transport; + param->pTrans = pJob->pTrans; *pParam = param; @@ -639,7 +640,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) { SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_QUERY_HEARTBEAT, &fp)); param->nodeEpId = epId; - param->transport = pJob->transport; + param->pTrans = pJob->pTrans; pMsgSendInfo->param = param; pMsgSendInfo->fp = fp; @@ -668,7 +669,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId * int32_t code = 0; SSchHbTrans hb = {0}; - hb.trans.transInst = pJob->transport; + hb.trans.pTrans = pJob->pTrans; SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &hb.rpcCtx)); @@ -745,12 +746,12 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { __async_send_cb_fn_t fp = NULL; SCH_ERR_JRET(schGetCallbackFp(msgType, &fp)); - param->transport = trans.transInst; + param->transport = trans.pTrans; pMsgSendInfo->param = param; pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans.transHandle; + pMsgSendInfo->msgInfo.handle = trans.pHandle; pMsgSendInfo->msgType = msgType; pMsgSendInfo->fp = fp; @@ -758,13 +759,13 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) { SEpSet epSet = {.inUse = 0, .numOfEps = 1}; memcpy(&epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep)); - qDebug("start to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d", trans.transInst, trans.transHandle, + qDebug("start to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d", trans.pTrans, trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port); - code = asyncSendMsgToServerExt(trans.transInst, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); + code = asyncSendMsgToServerExt(trans.pTrans, &epSet, &transporterId, pMsgSendInfo, true, &rpcCtx); if (code) { - qError("fail to send hb msg, instance:%p, handle:%p, fqdn:%s, port:%d, error:%x - %s", trans.transInst, - trans.transHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); + qError("fail to send hb msg, pTrans:%p, pHandle:%p, fqdn:%s, port:%d, error:%x - %s", trans.pTrans, + trans.pHandle, nodeEpId->ep.fqdn, nodeEpId->ep.port, code, tstrerror(code)); SCH_ERR_JRET(code); } @@ -814,8 +815,8 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { memcpy(&hb->trans, trans, sizeof(*trans)); SCH_UNLOCK(SCH_WRITE, &hb->lock); - qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, instance:%p, handle:%p", schMgmt.sId, - epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->transInst, trans->transHandle); + qDebug("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, + epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); return TSDB_CODE_SUCCESS; } @@ -835,8 +836,8 @@ int32_t schHandleHbCallback(void *param, const SDataBuf *pMsg, int32_t code) { } SSchTrans trans = {0}; - trans.transInst = pParam->transport; - trans.transHandle = pMsg->handle; + trans.pTrans = pParam->transport; + trans.pHandle = pMsg->handle; SCH_ERR_JRET(schUpdateHbConnection(&rsp.epId, &trans)); @@ -881,7 +882,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) { param->queryId = pJob->queryId; param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); - param->transport = pJob->transport; + param->transport = pJob->pTrans; *pParam = param; @@ -1036,15 +1037,15 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet pMsgSendInfo->msgInfo.pData = msg; pMsgSendInfo->msgInfo.len = msgSize; - pMsgSendInfo->msgInfo.handle = trans->transHandle; + pMsgSendInfo->msgInfo.handle = trans->pHandle; pMsgSendInfo->msgType = msgType; - qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType), + qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "pTrans:%p, pHandle:%p", TMSG_INFO(msgType), ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId, - trans->transInst, trans->transHandle); + trans->pTrans, trans->pHandle); int64_t transporterId = 0; - code = asyncSendMsgToServerExt(trans->transInst, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); + code = asyncSendMsgToServerExt(trans->pTrans, epSet, &transporterId, pMsgSendInfo, persistHandle, ctx); if (code) { SCH_ERR_JRET(code); } @@ -1210,12 +1211,12 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_SET_TASK_LASTMSG_TYPE(pTask, msgType); - SSchTrans trans = {.transInst = pJob->transport, .transHandle = SCH_GET_TASK_HANDLE(pTask)}; + SSchTrans trans = {.pTrans = pJob->pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)}; SCH_ERR_JRET(schAsyncSendMsg(pJob, pTask, &trans, &epSet, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL))); if (msgType == TDMT_VND_QUERY) { - SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.transHandle)); + SCH_ERR_RET(schRecordTaskExecNode(pJob, pTask, addr, trans.pHandle)); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index bd2c7e5b4926d1f2fe17252713b812ebff8c4b49..8d802980ea2e9bdf16e6cfc7e22fe217d8791743 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -67,50 +67,24 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { return TSDB_CODE_SUCCESS; } -int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, +int32_t schedulerExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - int32_t code = 0; - - *pJob = 0; - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true)); - } else { - SCH_ERR_JRET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true)); - } - -_return: - - if (*pJob) { - SSchJob *job = schAcquireJob(*pJob); - - pRes->code = atomic_load_32(&job->errCode); - pRes->numOfRows = job->resNumOfRows; - pRes->res = job->queryRes; - job->queryRes = NULL; - - schReleaseJob(*pJob); - } - - return code; + SSchResInfo resInfo = {.queryRes = pRes}; + SCH_RET(schExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } -int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pDag, const char *sql, int64_t *pJob) { - if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) { - SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false)); - } else { - SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false)); - } - - return TSDB_CODE_SUCCESS; +int32_t schedulerAsyncExecJob(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, + int64_t startTs, schedulerExecCallback fp, void* param) { + if (NULL == pTrans || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSchResInfo resInfo = {.execFp = fp, .userParam = param}; + SCH_RET(schAsyncExecJob(pTrans, pNodeList, pDag, pJob, sql, startTs, &resInfo)); } int32_t schedulerFetchRows(int64_t job, void **pData) { @@ -125,76 +99,32 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - int8_t status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_DROPPING) { - SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); - } - - if (!SCH_JOB_NEED_FETCH(pJob)) { - SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { - SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - schReleaseJob(job); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); - } - - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); - goto _return; - } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { - if (!(pJob->attr.explainMode == EXPLAIN_MODE_STATIC)) { - SCH_ERR_JRET(schFetchFromRemote(pJob)); - tsem_wait(&pJob->rspSem); - } - } else { - SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); - } - - status = SCH_GET_JOB_STATUS(pJob); - - if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); - SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); - } + pJob->attr.syncSchedule = true; + pJob->userRes.fetchRes = pData; + code = schFetchRows(pJob); - if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_JRET(schChkUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); - } + schReleaseJob(job); - while (true) { - *pData = atomic_load_ptr(&pJob->resData); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { - continue; - } + SCH_RET(code); +} - break; +int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { + if (NULL == fp || NULL == param) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - if (NULL == *pData) { - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - if (rsp) { - rsp->completed = 1; - } - - *pData = rsp; - SCH_JOB_DLOG("empty res and set query complete, code:%x", code); + int32_t code = 0; + SSchJob *pJob = schAcquireJob(job); + if (NULL == pJob) { + qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - SCH_JOB_DLOG("fetch done, totalRows:%d, code:%s", pJob->resNumOfRows, tstrerror(code)); - -_return: - - atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + pJob->attr.syncSchedule = false; + pJob->userRes.fetchFp = fp; + pJob->userRes.userParam = param; + + code = schFetchRows(pJob); schReleaseJob(job); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index fc0e05aaf106fb11d8daa9be9a55e510aac58ff5..ec5d74372d2df681ce20c58a69dba22eaf7f8239 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -87,6 +87,11 @@ void schtInitLogFile() { } +void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) { + assert(TSDB_CODE_SUCCESS == code); + *(int32_t*)param = 1; +} + void schtBuildQueryDag(SQueryPlan *dag) { uint64_t qId = schtQueryId; @@ -485,6 +490,7 @@ void* schtRunJobThread(void *aa) { SHashObj *execTasks = NULL; SDataBuf dataBuf = {0}; uint32_t jobFinished = 0; + int32_t queryDone = 0; while (!schtTestStop) { schtBuildQueryDag(&dag); @@ -496,7 +502,8 @@ void* schtRunJobThread(void *aa) { qnodeAddr.port = 6031; taosArrayPush(qnodeList, &qnodeAddr); - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &queryJobRefId); + queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &queryJobRefId, "select * from tb", 0, schtQueryCb, &queryDone); assert(code == 0); pJob = schAcquireJob(queryJobRefId); @@ -595,6 +602,14 @@ void* schtRunJobThread(void *aa) { pIter = taosHashIterate(execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + atomic_store_32(&schtStartFetch, 1); void *data = NULL; @@ -667,8 +682,9 @@ TEST(queryTest, normalCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -718,6 +734,14 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } + TdThreadAttr thattr; taosThreadAttrInit(&thattr); @@ -773,8 +797,9 @@ TEST(queryTest, readyFirstCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); @@ -824,6 +849,13 @@ TEST(queryTest, readyFirstCase) { pIter = taosHashIterate(pJob->execTasks, pIter); } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; @@ -885,16 +917,17 @@ TEST(queryTest, flowCtrlCase) { schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); - - code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + + int32_t queryDone = 0; + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, &job, "select * from tb", 0, schtQueryCb, &queryDone); ASSERT_EQ(code, 0); SSchJob *pJob = schAcquireJob(job); - bool queryDone = false; + bool qDone = false; - while (!queryDone) { + while (!qDone) { void *pIter = taosHashIterate(pJob->execTasks, NULL); if (NULL == pIter) { break; @@ -915,7 +948,7 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); } else { - queryDone = true; + qDone = true; break; } @@ -923,6 +956,13 @@ TEST(queryTest, flowCtrlCase) { } } + while (true) { + if (queryDone) { + break; + } + + taosUsleep(10000); + } TdThreadAttr thattr; taosThreadAttrInit(&thattr);