From 114dd6ea13d59e7892aba93d31561156de2a5d6f Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Wed, 15 Jan 2020 10:48:37 +0800 Subject: [PATCH] fix bugs and refactor code --- src/client/src/tscJoinProcess.c | 28 ++++++++++++++++++-------- src/system/detail/src/vnodeQueryImpl.c | 16 ++++++++++----- src/util/src/hash.c | 8 +++----- 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index d1e74f6599..0e93eac0cc 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -390,7 +390,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) { } static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSubquerySupporter* pSupporter) { - int32_t numOfTotal = pSupporter->pState->numOfCompleted; + int32_t numOfTotal = pSupporter->pState->numOfTotal; int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); if (finished >= numOfTotal) { @@ -479,8 +479,13 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { return; } } - - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + if (pSupporter->pState->code != TSDB_CODE_SUCCESS) { tscTrace("%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure", pParentSql, tres, pSupporter->subqueryIndex); @@ -538,10 +543,12 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { return; } } - - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { - assert(pSupporter->pState->numOfCompleted == pSupporter->pState->numOfTotal); - + + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); tscTrace("%p all %d secondary retrieves are completed, global code:%d", tres, pSupporter->pState->numOfTotal, pParentSql->res.code); @@ -756,7 +763,12 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { quitAllSubquery(pParentSql, pSupporter); } else { - if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { + int32_t numOfTotal = pSupporter->pState->numOfTotal; + int32_t finished = atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1); + + if (finished >= numOfTotal) { + assert(finished == numOfTotal); + tscSetupOutputColumnIndex(pParentSql); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index ca9bbfae10..ac6f845f91 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -2006,9 +2006,9 @@ int32_t getNextDataFileCompInfo(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeter // no files left, abort if (fileIndex < 0) { if (step == QUERY_ASC_FORWARD_STEP) { - dTrace("QInfo:%p no file to access, try data in cache", GET_QINFO_ADDR(pQuery)); + dTrace("QInfo:%p no more file to access, try data in cache", GET_QINFO_ADDR(pQuery)); } else { - dTrace("QInfo:%p no file to access in desc order, query completed", GET_QINFO_ADDR(pQuery)); + dTrace("QInfo:%p no more file to access in desc order, query completed", GET_QINFO_ADDR(pQuery)); } vnodeFreeFieldsEx(pRuntimeEnv); @@ -2596,6 +2596,7 @@ int64_t getQueryStartPositionInCache(SQueryRuntimeEnv *pRuntimeEnv, int32_t *slo // cache block has been flushed to disk, no required data block in cache. SCacheBlock* pBlock = getCacheDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); if (pBlock == NULL) { + pQuery->skey = rawskey; // restore the skey return -1; } @@ -2868,8 +2869,8 @@ static bool doGetQueryPos(TSKEY key, SMeterQuerySupportObj *pSupporter, SPointIn } } -static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, - SPointInterpoSupporter *pPointInterpSupporter, SMeterObj *pMeterObj,TSKEY nextKey) { +static bool doSetDataInfo(SMeterQuerySupportObj *pSupporter, SPointInterpoSupporter *pPointInterpSupporter, + SMeterObj *pMeterObj,TSKEY nextKey) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; @@ -5441,6 +5442,9 @@ static void queryStatusSave(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pStatus pStatus->overStatus = pQuery->over; pStatus->lastKey = pQuery->lastKey; + + pStatus->skey = pQuery->skey; + pStatus->ekey = pQuery->ekey; pStatus->start = pRuntimeEnv->startPos; pStatus->next = pRuntimeEnv->nextPos; @@ -5465,7 +5469,9 @@ static void queryStatusRestore(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus *pSta SWAP(pQuery->skey, pQuery->ekey, TSKEY); pQuery->lastKey = pStatus->lastKey; - + pQuery->skey = pStatus->skey; + pQuery->ekey = pStatus->ekey; + pQuery->over = pStatus->overStatus; pRuntimeEnv->startPos = pStatus->start; diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 42948f3563..a57f04e11d 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -339,13 +339,11 @@ static void doAddToHashTable(HashObj *pObj, SHashNode *pNode) { pNode->prev1 = pEntry; pEntry->num++; - pObj->size++; - char key[512] = {0}; - memcpy(key, pNode->key, MIN(512, pNode->keyLen)); - - pTrace("key:%s %p add to hash table", key, pNode); +// char key[512] = {0}; +// memcpy(key, pNode->key, MIN(512, pNode->keyLen)); +// pTrace("key:%s %p add to hash table", key, pNode); } /** -- GitLab