未验证 提交 72b6a474 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #20475 from taosdata/fix/TS-2851

fix: issues caused of renewing table meta
...@@ -406,6 +406,7 @@ typedef struct SSqlObj { ...@@ -406,6 +406,7 @@ typedef struct SSqlObj {
int64_t lastAlive; int64_t lastAlive;
void * pPrevContext; void * pPrevContext;
bool enableBatch; bool enableBatch;
bool needUpdateMeta;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -1736,6 +1736,16 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow ...@@ -1736,6 +1736,16 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error; goto _error;
} }
} else if (code != TSDB_CODE_SUCCESS) { } else if (code != TSDB_CODE_SUCCESS) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
goto _error; goto _error;
} }
......
...@@ -532,6 +532,20 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) { ...@@ -532,6 +532,20 @@ bool shouldRewTableMeta(SSqlObj* pSql, SRpcMsg* rpcMsg) {
return true; return true;
} }
int tscHandleRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootObj = pSql->rootObj;
if (rootObj == pSql) {
return tscRenewTableMeta(pSql);
}
rootObj->res.code = pSql->res.code;
rootObj->needUpdateMeta = true;
return rootObj->res.code;
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle; TSDB_CACHE_PTR_TYPE handle = (TSDB_CACHE_PTR_TYPE) rpcMsg->ahandle;
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle); SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, handle);
...@@ -611,7 +625,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -611,7 +625,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
} }
pSql->retryReason = rpcMsg->code; pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql); rpcMsg->code = tscHandleRenewTableMeta(pSql);
// if there is an error occurring, proceed to the following error handling procedure. // if there is an error occurring, proceed to the following error handling procedure.
if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, handle); taosReleaseRef(tscObjRef, handle);
...@@ -3425,7 +3439,8 @@ int tscRenewTableMeta(SSqlObj *pSql) { ...@@ -3425,7 +3439,8 @@ int tscRenewTableMeta(SSqlObj *pSql) {
SSqlObj *rootSql = pSql->rootObj; SSqlObj *rootSql = pSql->rootObj;
tscFreeSubobj(rootSql); tscFreeSubobj(rootSql);
tscResetSqlCmd(&rootSql->cmd, true, rootSql->self); tscResetSqlCmd(&rootSql->cmd, true, rootSql->self);
rootSql->res.code = 0;
code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true); code = getMultiTableMetaFromMnode(rootSql, pNameList, vgroupList, NULL, tscTableMetaCallBack, true);
taosArrayDestroyEx(&pNameList, freeElem); taosArrayDestroyEx(&pNameList, freeElem);
taosArrayDestroyEx(&vgroupList, freeElem); taosArrayDestroyEx(&vgroupList, freeElem);
......
...@@ -2046,6 +2046,19 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2046,6 +2046,19 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
goto _return; goto _return;
} }
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
} else {
tscAsyncResultOnError(pParentSql);
}
goto _return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
goto _return; goto _return;
} }
...@@ -2598,6 +2611,16 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) { ...@@ -2598,6 +2611,16 @@ void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*) tres; SSqlObj* pSql = (SSqlObj*) tres;
int32_t c = taos_errno(pSql); int32_t c = taos_errno(pSql);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->res.code && rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
if (c != TSDB_CODE_SUCCESS) { if (c != TSDB_CODE_SUCCESS) {
SSqlObj* parent = pSup->pParent; SSqlObj* parent = pSup->pParent;
...@@ -3105,6 +3128,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -3105,6 +3128,16 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub); tscDestroyGlobalMergerEnv(trsupport->pExtMemBuffer, trsupport->pOrderDescriptor, pState->numOfSub);
tscFreeRetrieveSup(&pSql->param); tscFreeRetrieveSup(&pSql->param);
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
return;
}
}
// in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes // in case of second stage join subquery, invoke its callback function instead of regular QueueAsyncRes
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
...@@ -3206,7 +3239,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -3206,7 +3239,18 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeRetrieveSup(&pSql->param); tscFreeRetrieveSup(&pSql->param);
return; return;
} }
SSqlObj *rootObj = pSql->rootObj;
if (rootObj->needUpdateMeta) {
rootObj->needUpdateMeta = false;
if (pSql->retry < pSql->maxRetry) {
tscRenewTableMeta(pSql);
tscFreeRetrieveSup(&pSql->param);
return;
}
}
// all sub-queries are returned, start to local merge process // all sub-queries are returned, start to local merge process
pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage; pDesc->pColumnModel->capacity = trsupport->pExtMemBuffer[idx]->numOfElemsPerPage;
...@@ -3354,6 +3398,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR ...@@ -3354,6 +3398,7 @@ static void tscRetrieveFromDnodeCallBack(void *param, TAOS_RES *tres, int numOfR
if (pModelDesc == NULL) { if (pModelDesc == NULL) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" column model has been freed", pParentSql->self, pSql->self); tscError("0x%"PRIx64" sub:0x%"PRIx64" column model has been freed", pParentSql->self, pSql->self);
tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR); tscAbortFurtherRetryRetrieval(trsupport, tres, TSDB_CODE_QRY_APP_ERROR);
return;
} }
SColumnModel *pModelMemBuf = trsupport->pExtMemBuffer[idx]->pColumnModel; SColumnModel *pModelMemBuf = trsupport->pExtMemBuffer[idx]->pColumnModel;
if (pModelDesc->numOfCols != pModelMemBuf->numOfCols || if (pModelDesc->numOfCols != pModelMemBuf->numOfCols ||
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册