提交 38c48654 编写于 作者: W wpan

fix join retry issue

上级 2e452fce
......@@ -44,6 +44,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->fp = fp;
pSql->fetchFp = fp;
pSql->rootObj = pSql;
registerSqlObj(pSql);
......
......@@ -577,7 +577,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if ((TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY)) &&
!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_PROJECTION_QUERY)) ||
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)) {
(TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_NEST_SUBQUERY)) || (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_STABLE_SUBQUERY) && pQueryInfo->distinct)
|| (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_QUERY))) {
// do nothing in case of super table subquery
} else {
pSql->retry += 1;
......@@ -2972,7 +2973,7 @@ int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVg
pNew->self, numOfTable, numOfVgroupList, numOfUdf, pNew->cmd.payloadLen);
pNew->fp = fp;
pNew->param = (void *)pSql->self;
pNew->param = (void *)pSql->rootObj->self;
tscDebug("0x%"PRIx64" metaRid from 0x%" PRIx64 " to 0x%" PRIx64 , pSql->self, pSql->metaRid, pNew->self);
......@@ -3118,6 +3119,12 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SSqlCmd* pCmd2 = &pSql->rootObj->cmd;
pCmd2->pTableMetaMap = tscCleanupTableMetaMap(pCmd2->pTableMetaMap);
pCmd2->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
pSql->rootObj->retryReason = pSql->retryReason;
SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
......
......@@ -1728,6 +1728,37 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
// tscFieldInfoUpdateOffset(pQueryInfo);
}
bool tscReparseSql(SSqlObj *sql, int32_t code){
if (!((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && sql->retry < sql->maxRetry)) {
return true;
}
sql->res.code = TSDB_CODE_SUCCESS;
sql->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", sql->self,
tstrerror(code), sql->retry);
tscResetSqlCmd(&sql->cmd, true);
code = tsParseSql(sql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return false;
}
if (code != TSDB_CODE_SUCCESS) {
sql->res.code = code;
tscAsyncResultOnError(sql);
return false;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&sql->cmd);
executeQuery(sql, pQueryInfo);
return false;
}
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SSqlObj* pSql = (SSqlObj*)tres;
......@@ -1747,6 +1778,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
......@@ -1762,6 +1797,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
if (!tscReparseSql(pParentSql->rootObj, pParentSql->res.code)) {
return;
}
tscAsyncResultOnError(pParentSql);
......@@ -2767,14 +2806,7 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
if (!TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_JOIN_SEC_STAGE)) {
int32_t code = pParentSql->res.code;
SSqlObj *userSql = NULL;
if (pParentSql->param) {
userSql = ((SRetrieveSupport*)pParentSql->param)->pParentSql;
}
if (userSql == NULL) {
userSql = pParentSql;
}
SSqlObj *userSql = pParentSql->rootObj;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && userSql->retry < userSql->maxRetry) {
if (userSql != pParentSql) {
......
......@@ -3538,7 +3538,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNew->pTscObj = pSql->pTscObj;
pNew->signature = pNew;
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->rootObj = pSql->rootObj;
tsem_init(&pNew->rspSem, 0, 0);
SSqlCmd* pnCmd = &pNew->cmd;
......@@ -3814,7 +3815,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
SSqlObj *rootObj = pParentSql->rootObj;
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && rootObj->retry < rootObj->maxRetry)) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
......@@ -3824,29 +3827,32 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
tscFreeSubobj(rootObj);
tfree(rootObj->pSubs);
rootObj->res.code = TSDB_CODE_SUCCESS;
rootObj->retry++;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", rootObj->self,
tstrerror(code), rootObj->retry);
tscResetSqlCmd(&pParentSql->cmd, true);
tscResetSqlCmd(&rootObj->cmd, true);
code = tsParseSql(pParentSql, true);
code = tsParseSql(rootObj, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return;
}
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
rootObj->res.code = code;
tscAsyncResultOnError(rootObj);
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&rootObj->cmd);
executeQuery(pParentSql, pQueryInfo);
executeQuery(rootObj, pQueryInfo);
return;
}
......@@ -3898,7 +3904,8 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->sqlstr = strdup(pSql->sqlstr);
pNew->fp = tscSubqueryCompleteCallback;
pNew->fetchFp = tscSubqueryCompleteCallback;
pNew->maxRetry = pSql->maxRetry;
pNew->maxRetry = pSql->maxRetry;
pNew->rootObj = pSql->rootObj;
pNew->cmd.resColumnId = TSDB_RES_COL_ID;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册