From 38c48654ae04b78f857e935573f781f80a2f0421 Mon Sep 17 00:00:00 2001 From: wpan Date: Sat, 28 Aug 2021 13:49:36 +0800 Subject: [PATCH] fix join retry issue --- src/client/inc/tsclient.h | 1 + src/client/src/tscAsync.c | 1 + src/client/src/tscServer.c | 11 +++++++-- src/client/src/tscSubquery.c | 48 ++++++++++++++++++++++++++++++------ src/client/src/tscUtil.c | 33 +++++++++++++++---------- 5 files changed, 71 insertions(+), 23 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4249155eab..cd2cde1ada 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -912,3 +912,4 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok #endif #endif + \ No newline at end of file diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index a1f2d3c761..7a7e1adc1e 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para pSql->maxRetry = TSDB_MAX_REPLICA; pSql->fp = fp; pSql->fetchFp = fp; + pSql->rootObj = pSql; registerSqlObj(pSql); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index c2202f1e67..96ac352020 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -577,7 +577,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || - (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { + (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct) + || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY))) { // do nothing in case of super table subquery } else { pSql->retry += 1; @@ -2972,7 +2973,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen); pNew->fp = fp; - pNew->param = (void *)pSql->self; + pNew->param = (void *)pSql->rootObj->self; tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self); @@ -3118,6 +3119,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + SSqlCmd* pCmd2 = &pSql->rootObj->cmd; + pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap); + pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + + pSql->rootObj->retryReason = pSql->retryReason; + SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index de9296b8c9..8fbf6aa1c8 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1728,6 +1728,37 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { // tscFieldInfoUpdateOffset(pQueryInfo); } + +bool tscReparseSql(SSqlObj *sql, int32_t code){ + if (!((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && sql->retry < sql->maxRetry)) { + return true; + } + + sql->res.code = TSDB_CODE_SUCCESS; + sql->retry++; + + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", sql->self, + tstrerror(code), sql->retry); + + tscResetSqlCmd(&sql->cmd, true); + code = tsParseSql(sql, true); + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + return false; + } + + if (code != TSDB_CODE_SUCCESS) { + sql->res.code = code; + tscAsyncResultOnError(sql); + return false; + } + + SQueryInfo* pQueryInfo = tscGetQueryInfo(&sql->cmd); + executeQuery(sql, pQueryInfo); + + return false; +} + + void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { SSqlObj* pSql = (SSqlObj*)tres; @@ -1747,6 +1778,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { return; } + if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { + return; + } + tscAsyncResultOnError(pParentSql); return; @@ -1762,6 +1797,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (quitAllSubquery(pSql, pParentSql, pSupporter)) { return; } + + if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) { + return; + } tscAsyncResultOnError(pParentSql); @@ -2767,14 +2806,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { int32_t code = pParentSql->res.code; - SSqlObj *userSql = NULL; - if (pParentSql->param) { - userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql; - } - - if (userSql == NULL) { - userSql = pParentSql; - } + SSqlObj *userSql = pParentSql->rootObj; if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) { if (userSql != pParentSql) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 1b4dff4b1b..2098e9aa87 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -3538,7 +3538,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t pNew->pTscObj = pSql->pTscObj; pNew->signature = pNew; - pNew->sqlstr = strdup(pSql->sqlstr); + pNew->sqlstr = strdup(pSql->sqlstr); + pNew->rootObj = pSql->rootObj; tsem_init(&pNew->rspSem, 0, 0); SSqlCmd* pnCmd = &pNew->cmd; @@ -3814,7 +3815,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { // todo refactor tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); - if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) { + SSqlObj *rootObj = pParentSql->rootObj; + + if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && rootObj->retry < rootObj->maxRetry)) { pParentSql->res.code = code; tscAsyncResultOnError(pParentSql); @@ -3824,29 +3827,32 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { tscFreeSubobj(pParentSql); tfree(pParentSql->pSubs); - pParentSql->res.code = TSDB_CODE_SUCCESS; - pParentSql->retry++; + tscFreeSubobj(rootObj); + tfree(rootObj->pSubs); + + rootObj->res.code = TSDB_CODE_SUCCESS; + rootObj->retry++; - tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, - tstrerror(code), pParentSql->retry); + tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", rootObj->self, + tstrerror(code), rootObj->retry); - tscResetSqlCmd(&pParentSql->cmd, true); + tscResetSqlCmd(&rootObj->cmd, true); - code = tsParseSql(pParentSql, true); + code = tsParseSql(rootObj, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { return; } if (code != TSDB_CODE_SUCCESS) { - pParentSql->res.code = code; - tscAsyncResultOnError(pParentSql); + rootObj->res.code = code; + tscAsyncResultOnError(rootObj); return; } - SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); + SQueryInfo *pQueryInfo = tscGetQueryInfo(&rootObj->cmd); - executeQuery(pParentSql, pQueryInfo); + executeQuery(rootObj, pQueryInfo); return; } @@ -3898,7 +3904,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { pNew->sqlstr = strdup(pSql->sqlstr); pNew->fp = tscSubqueryCompleteCallback; pNew->fetchFp = tscSubqueryCompleteCallback; - pNew->maxRetry = pSql->maxRetry; + pNew->maxRetry = pSql->maxRetry; + pNew->rootObj = pSql->rootObj; pNew->cmd.resColumnId = TSDB_RES_COL_ID; -- GitLab