From b0eae2956fb3b8131fe5fb90e67826a4c34349d5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 15 Mar 2023 15:14:27 +0800 Subject: [PATCH] fix: issues caused of renewing table meta --- src/client/inc/tsclient.h | 1 + src/client/src/tscParseInsert.c | 10 +++++++ src/client/src/tscServer.c | 19 +++++++++++-- src/client/src/tscSubquery.c | 49 +++++++++++++++++++++++++++++++-- 4 files changed, 74 insertions(+), 5 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index c4d095dc67..73703fed8e 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -406,6 +406,7 @@ typedef struct SSqlObj { int64_t lastAlive; void * pPrevContext; bool enableBatch; + bool needUpdateMeta; } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 6bf0b07328..a2b72f0e8c 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1736,6 +1736,16 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } } else if (code != TSDB_CODE_SUCCESS) { + SSqlObj *rootObj = pSql->rootObj; + if (rootObj->res.code && rootObj->needUpdateMeta) { + rootObj->needUpdateMeta = false; + + if (pSql->retry < pSql->maxRetry) { + tscRenewTableMeta(pSql); + return; + } + } + goto _error; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index dc534c275b..e01348204e 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -532,6 +532,20 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) { return true; } +int tscHandleRenewTableMeta(SSqlObj *pSql) { + SSqlObj *rootObj = pSql->rootObj; + + if (rootObj == pSql) { + return tscRenewTableMeta(pSql); + } + + rootObj->res.code = pSql->res.code; + rootObj->needUpdateMeta = true; + + return rootObj->res.code; +} + + void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); @@ -611,7 +625,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql); + rpcMsg->code = tscHandleRenewTableMeta(pSql); // if there is an error occurring, proceed to the following error handling procedure. if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { taosReleaseRef(tscObjRef, handle); @@ -3425,7 +3439,8 @@ int tscRenewTableMeta(SSqlObj *pSql) { SSqlObj *rootSql = pSql->rootObj; tscFreeSubobj(rootSql); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); - + rootSql->res.code = 0; + code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); taosArrayDestroyEx(&pNameList, freeElem); taosArrayDestroyEx(&vgroupList, freeElem); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index a9197a33f1..df3e5a5b4a 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2046,6 +2046,19 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { goto _return; } + SSqlObj *rootObj = pSql->rootObj; + if (rootObj->needUpdateMeta) { + rootObj->needUpdateMeta = false; + + if (pSql->retry < pSql->maxRetry) { + tscRenewTableMeta(pSql); + } else { + tscAsyncResultOnError(pParentSql); + } + + goto _return; + } + if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { goto _return; } @@ -2598,6 +2611,16 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*) tres; int32_t c = taos_errno(pSql); + SSqlObj *rootObj = pSql->rootObj; + if (rootObj->res.code && rootObj->needUpdateMeta) { + rootObj->needUpdateMeta = false; + + if (pSql->retry < pSql->maxRetry) { + tscRenewTableMeta(pSql); + return; + } + } + if (c != TSDB_CODE_SUCCESS) { SSqlObj* parent = pSup->pParent; @@ -3105,6 +3128,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscFreeRetrieveSup(&pSql->param); + SSqlObj *rootObj = pSql->rootObj; + if (rootObj->needUpdateMeta) { + rootObj->needUpdateMeta = false; + + if (pSql->retry < pSql->maxRetry) { + tscRenewTableMeta(pSql); + return; + } + } + // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); @@ -3206,7 +3239,19 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p tscFreeRetrieveSup(&pSql->param); return; } - + + tscFreeRetrieveSup(&pSql->param); + + SSqlObj *rootObj = pSql->rootObj; + if (rootObj->needUpdateMeta) { + rootObj->needUpdateMeta = false; + + if (pSql->retry < pSql->maxRetry) { + tscRenewTableMeta(pSql); + return; + } + } + // all sub-queries are returned, start to local merge process pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; @@ -3232,8 +3277,6 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p pParentSql->res.numOfRows = 0; pParentSql->res.row = 0; - tscFreeRetrieveSup(&pSql->param); - // set the command flag must be after the semaphore been correctly set. if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) { pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE; -- GitLab