未验证 提交 c6913c62 编写于 作者: D dapan1121 提交者: GitHub

Merge pull request #7672 from taosdata/hotfix/TD-6276

[td-6276]fix join retry issue
...@@ -376,6 +376,7 @@ typedef struct SSqlObj { ...@@ -376,6 +376,7 @@ typedef struct SSqlObj {
SSubqueryState subState; SSubqueryState subState;
struct SSqlObj **pSubs; struct SSqlObj **pSubs;
struct SSqlObj *rootObj;
int64_t metaRid; int64_t metaRid;
int64_t svgroupRid; int64_t svgroupRid;
......
...@@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para ...@@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->maxRetry = TSDB_MAX_REPLICA; pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp; pSql->fp = fp;
pSql->fetchFp = fp; pSql->fetchFp = fp;
pSql->rootObj = pSql;
registerSqlObj(pSql); registerSqlObj(pSql);
......
...@@ -577,7 +577,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { ...@@ -577,7 +577,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY | if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) && TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) || !TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) { (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)
|| (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY))) {
// do nothing in case of super table subquery // do nothing in case of super table subquery
} else { } else {
pSql->retry += 1; pSql->retry += 1;
...@@ -2972,7 +2973,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg ...@@ -2972,7 +2973,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen); pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen);
pNew->fp = fp; pNew->fp = fp;
pNew->param = (void *)pSql->self; pNew->param = (void *)pSql->rootObj->self;
tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self); tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self);
...@@ -3120,6 +3121,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) { ...@@ -3120,6 +3121,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap); pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pSql->rootObj->retryReason = pSql->retryReason;
SArray* pNameList = taosArrayInit(1, POINTER_BYTES); SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES); SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
......
...@@ -874,6 +874,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { ...@@ -874,6 +874,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
pCmd->resColumnId = TSDB_RES_COL_ID; pCmd->resColumnId = TSDB_RES_COL_ID;
...@@ -982,6 +983,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -982,6 +983,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pSql->pTscObj = taos; pSql->pTscObj = taos;
pSql->signature = pSql; pSql->signature = pSql;
pSql->rootObj = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist); int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str); free(str);
......
...@@ -679,6 +679,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c ...@@ -679,6 +679,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->rootObj = pSql;
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
SSqlRes *pRes = &pSql->res; SSqlRes *pRes = &pSql->res;
......
...@@ -127,6 +127,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char* ...@@ -127,6 +127,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
pSql->signature = pSql; pSql->signature = pSql;
pSql->pTscObj = pObj; pSql->pTscObj = pObj;
pSql->pSubscription = pSub; pSql->pSubscription = pSub;
pSql->rootObj = pSql;
pSub->pSql = pSql; pSub->pSql = pSql;
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
......
...@@ -860,6 +860,37 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq ...@@ -860,6 +860,37 @@ static bool checkForDuplicateTagVal(SSchema* pColSchema, SJoinSupporter* p1, SSq
return true; return true;
} }
bool tscReparseSql(SSqlObj *sql, int32_t code){
if (!((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && sql->retry < sql->maxRetry)) {
return true;
}
sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", sql->self,
tstrerror(code), sql->retry);
tscResetSqlCmd(&sql->cmd, true);
code = tsParseSql(sql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return false;
}
if (code != TSDB_CODE_SUCCESS) {
sql->res.code = code;
tscAsyncResultOnError(sql);
return false;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&sql->cmd);
executeQuery(sql, pQueryInfo);
return false;
}
static void setTidTagType(SJoinSupporter* p, uint8_t type) { static void setTidTagType(SJoinSupporter* p, uint8_t type) {
for (int32_t i = 0; i < p->num; ++i) { for (int32_t i = 0; i < p->num; ++i) {
STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize); STidTags * tag = (STidTags*) varDataVal(p->pIdTagList + i * p->tagSize);
...@@ -1102,6 +1133,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1102,6 +1133,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1119,6 +1154,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1119,6 +1154,10 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
} }
...@@ -1256,6 +1295,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1256,6 +1295,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1272,6 +1315,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1272,6 +1315,10 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
} }
...@@ -1401,6 +1448,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1401,6 +1448,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (quitAllSubquery(pSql, pParentSql, pSupporter)) { if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1728,6 +1779,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { ...@@ -1728,6 +1779,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
// tscFieldInfoUpdateOffset(pQueryInfo); // tscFieldInfoUpdateOffset(pQueryInfo);
} }
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres; SSqlObj* pSql = (SSqlObj*)tres;
...@@ -1747,6 +1799,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1747,6 +1799,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
...@@ -1762,6 +1818,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { ...@@ -1762,6 +1818,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (quitAllSubquery(pSql, pParentSql, pSupporter)) { if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return; return;
} }
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -2767,14 +2827,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO ...@@ -2767,14 +2827,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) { if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
int32_t code = pParentSql->res.code; int32_t code = pParentSql->res.code;
SSqlObj *userSql = NULL; SSqlObj *userSql = pParentSql->rootObj;
if (pParentSql->param) {
userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql;
}
if (userSql == NULL) {
userSql = pParentSql;
}
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) { if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) { if (userSql != pParentSql) {
......
...@@ -3458,6 +3458,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -3458,6 +3458,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
pNew->rootObj = pSql->rootObj;
SSqlCmd* pCmd = &pNew->cmd; SSqlCmd* pCmd = &pNew->cmd;
pCmd->command = cmd; pCmd->command = cmd;
...@@ -3538,7 +3539,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3538,7 +3539,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj; pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew; pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0); tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd; SSqlCmd* pnCmd = &pNew->cmd;
...@@ -3814,7 +3816,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3814,7 +3816,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor // todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self); tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) { SSqlObj *rootObj = pParentSql->rootObj;
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && rootObj->retry < rootObj->maxRetry)) {
pParentSql->res.code = code; pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
...@@ -3824,29 +3828,32 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3824,29 +3828,32 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscFreeSubobj(pParentSql); tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs); tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS; tscFreeSubobj(rootObj);
pParentSql->retry++; tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", rootObj->self,
tstrerror(code), pParentSql->retry); tstrerror(code), rootObj->retry);
tscResetSqlCmd(&pParentSql->cmd, true); tscResetSqlCmd(&rootObj->cmd, true);
code = tsParseSql(pParentSql, true); code = tsParseSql(rootObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
} }
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code; rootObj->res.code = code;
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(rootObj);
return; return;
} }
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(&rootObj->cmd);
executeQuery(pParentSql, pQueryInfo); executeQuery(rootObj, pQueryInfo);
return; return;
} }
...@@ -3898,7 +3905,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3898,7 +3905,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->sqlstr = strdup(pSql->sqlstr); pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback; pNew->fp = tscSubqueryCompleteCallback;
pNew->fetchFp = tscSubqueryCompleteCallback; pNew->fetchFp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry; pNew->maxRetry = pSql->maxRetry;
pNew->rootObj = pSql->rootObj;
pNew->cmd.resColumnId = TSDB_RES_COL_ID; pNew->cmd.resColumnId = TSDB_RES_COL_ID;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册