From 4fd2fb06bc2803ab87b89dd1daa646b6a073f746 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 2 Jun 2022 21:20:15 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/client/inc/clientInt.h | 2 ++ source/client/src/clientImpl.c | 1 + source/client/src/clientMain.c | 25 +++++++++++++++---------- source/client/test/clientTests.cpp | 7 +++---- source/libs/scheduler/src/schJob.c | 4 ++-- source/libs/scheduler/src/scheduler.c | 3 ++- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 2b23b73ebc..c569eb1496 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -46,6 +46,8 @@ extern "C" { #define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define HEARTBEAT_INTERVAL 1500 // ms +#define SYNC_ON_TOP_OF_ASYNC 0 + enum { RES_TYPE__QUERY = 1, RES_TYPE__TMQ, diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 2e648b25be..26ee63cb49 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1083,6 +1083,7 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc SSyncQueryParam* pParam = pRequest->body.param; taos_fetch_rows_a(pRequest, syncFetchFn, pParam); + tsem_wait(&pParam->sem); } if (setupOneRowPtr) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 5f39009801..1647a3fc60 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -189,7 +189,16 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { } STscObj* pTscObj = (STscObj*)taos; -#if 0 + +#if SYNC_ON_TOP_OF_ASYNC + SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam)); + tsem_init(¶m->sem, 0, 0); + + taos_query_a(pTscObj, sql, syncQueryFn, param); + tsem_wait(¶m->sem); + + return param->pRequest; +#else size_t sqlLen = strlen(sql); if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) { tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); @@ -199,14 +208,6 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { return execQuery(pTscObj, sql, sqlLen); #endif - - SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(struct SSyncQueryParam)); - tsem_init(¶m->sem, 0, 0); - - taos_query_a(pTscObj, sql, syncQueryFn, param); - tsem_wait(¶m->sem); - - return param->pRequest; } TAOS_ROW taos_fetch_row(TAOS_RES *res) { @@ -221,8 +222,12 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } +#if SYNC_ON_TOP_OF_ASYNC return doAsyncFetchRow(pRequest, true, true); -// return doFetchRows(pRequest, true, true); +#else + return doFetchRows(pRequest, true, true); +#endif + } else if (TD_RES_TMQ(res)) { SMqRspObj *msg = ((SMqRspObj *)res); SReqResultInfo *pResultInfo; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 13420d626b..467215108e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -94,7 +94,7 @@ TEST(testCase, driverInit_Test) { // taosInitGlobalCfg(); // taos_init(); } -#if 0 + TEST(testCase, connect_Test) { // taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg"); @@ -645,7 +645,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 1000000; i += 20) { + for(int32_t i = 0; i < 10000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -772,10 +772,9 @@ TEST(testCase, agg_query_tables) { taos_free_result(pRes); taos_close(pConn); } -#endif /* ---- copy the following script in the shell to setup the environment. +--- copy the following script in the shell to setup the environment --- create database test; use test; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index dbad053c65..a7588302fb 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -21,9 +21,9 @@ #include "tref.h" #include "trpc.h" -FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } +FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) { qDebug("acquire job:0x%"PRIx64, refId); return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId); } -FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } +FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("release job:0x%"PRIx64, refId); return taosReleaseRef(schMgmt.jobRef, refId); } int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6afa146c0a..a787a2ec1f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -124,7 +124,8 @@ int32_t schedulerAsyncFetchRows(int64_t job, schedulerFetchCallback fp, void* pa SSchJob *pJob = schAcquireJob(job); if (NULL == pJob) { qError("acquire job from jobRef list failed, may be dropped, refId:%" PRIx64, job); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + fp(NULL, param, TSDB_CODE_SCH_STATUS_ERROR); + return TSDB_CODE_SCH_STATUS_ERROR; } pJob->attr.syncSchedule = false; -- GitLab