diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 9e93c362659c5aac7b1e596811d5c27208a4693f..5cb6dbd3bcb55f7ad2411fb8eb72754475b59eab 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -100,7 +100,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in */ int32_t schedulerFetchRows(int64_t job, void **data); -int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); +void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param); int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 26ee63cb4901be6efabddede88253f3849d4d1b2..20c7d41db3cf17b87faad95b906d9acb09dfb0b1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1086,7 +1086,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc tsem_wait(&pParam->sem); } - if (setupOneRowPtr) { + if (pRequest->code == TSDB_CODE_SUCCESS && setupOneRowPtr) { doSetOneRowPtr(pResultInfo); pResultInfo->current += 1; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1647a3fc6065aa8e993c53c2e4ddb1767c6861b6..20f14f619dd0224d441f068423f290ddc06bed98 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -793,7 +793,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { } } - pRequest->code = schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); + schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); } TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 467215108eae56238e11ebf55bc24611020c53e6..1e81a78129b9bee7b59058ab6c5cfe90187aadc4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -474,7 +474,7 @@ TEST(testCase, create_multiple_tables) { taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_2 using st1 tags(1)"); + pRes = taos_query(pConn, "create table if not exists t_2 using st1 tags(1)"); if (taos_errno(pRes) != 0) { printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -482,7 +482,7 @@ TEST(testCase, create_multiple_tables) { } taos_free_result(pRes); - pRes = taos_query(pConn, "create table t_3 using st1 tags(2)"); + pRes = taos_query(pConn, "create table if not exists t_3 using st1 tags(2)"); if (taos_errno(pRes) != 0) { printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); @@ -590,6 +590,7 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } + TEST(testCase, insert_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -645,7 +646,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 10000; i += 20) { + for(int32_t i = 0; i < 1000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 7e0efe22dbfa2dd61a5c662c1103a1b98f579da8..e9927fcaca73cf136d9c5806a9b878ad6a903972 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1034,10 +1034,10 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt CTG_ERR_JRET(ctgLaunchJob(pJob)); - *jobId = pJob->refId; + // NOTE: here the assignment of jobId is invalid, may over-write the true scheduler created query job. +// *jobId = pJob->refId; _return: - if (pJob) { taosReleaseRef(gCtgMgmt.jobPool, pJob->refId); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index a7588302fb91e8c838f933bc25cadbebba15e468..e383026f335b32ee501380bf39846d29fa6e6e12 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -21,9 +21,9 @@ #include "tref.h" #include "trpc.h" -FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire job:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } +FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire jobId:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } -FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release job:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } +FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release jobId:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; @@ -1365,8 +1365,6 @@ void schFreeJobImpl(void *job) { int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql, SSchResInfo *pRes, int64_t startTs, bool sync) { - qDebug("QID:0x%" PRIx64 " job started", pDag->queryId); - if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId); } @@ -1375,6 +1373,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ SSchJob *pJob = NULL; SCH_ERR_RET(schInitJob(&pJob, pDag, pTrans, pNodeList, sql, pRes, startTs, sync)); + qDebug("QID:0x%" PRIx64 " jobId:0x%"PRIx64 " started", pDag->queryId, pJob->refId); *job = pJob->refId; SCH_ERR_JRET(schLaunchJob(pJob)); @@ -1386,7 +1385,7 @@ int32_t schExecJobImpl(void *pTrans, SArray *pNodeList, SQueryPlan *pDag, int64_ pJob->userCb = SCH_EXEC_CB; } - SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); + SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId); _return: diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a787a2ec1f8fc32175e0a03b63a58137deb82410..32e00c1b700a3eadfa9e15580d76950798791927 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -102,7 +102,7 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { int32_t code = 0; SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -115,28 +115,26 @@ int32_t schedulerFetchRows(int64_t job, void **pData) { SCH_RET(code); } -int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { +void schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* param) { if (NULL == fp || NULL == param) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + fp(NULL, param, TSDB_CODE_QRY_INVALID_INPUT); + return; } - int32_t code = 0; SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); fp(NULL, param, TSDB_CODE_SCH_STATUS_ERROR); - return TSDB_CODE_SCH_STATUS_ERROR; + return; } pJob->attr.syncSchedule = false; pJob->userRes.fetchFp = fp; pJob->userRes.userParam = param; - code = schAsyncFetchRows(pJob); + /*code = */schAsyncFetchRows(pJob); schReleaseJob(job); - - SCH_RET(code); } int32_t schedulerGetTasksStatus(int64_t job, SArray *pSub) { @@ -173,7 +171,7 @@ _return: int32_t scheduleCancelJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -187,7 +185,7 @@ int32_t scheduleCancelJob(int64_t job) { void schedulerFreeJob(int64_t job) { SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { - qDebug("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); + qError("acquire job from jobRef list failed, may be dropped, jobId:0x%" PRIx64, job); return; }