From 3fff8aba71e783acdaead2303e1b97f035d7ee2a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 23 Oct 2020 22:54:47 +0800 Subject: [PATCH] [td-1373] --- src/client/src/tscSubquery.c | 78 +++++++++++++++++++++++++++++++----- src/query/src/qExecutor.c | 8 ++++ 2 files changed, 76 insertions(+), 10 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d141a3e807..be8f417df7 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -465,6 +465,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); pQueryInfo->window = *win; + + } int32_t tscCompareTidTags(const void* p1, const void* p2) { @@ -963,19 +965,18 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { int32_t notInvolved = 0; - SJoinSupporter* pSupporter = NULL; SSubqueryState* pState = &pSql->subState; for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { if (pSql->pSubs[i] == NULL) { notInvolved++; - } else { - pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; +// } else { +// (SJoinSupporter*)pSql->pSubs[i]->param; } } pState->numOfRemain = numOfFetch; - return pSupporter; + return NULL; } void tscFetchDatablockFromSubquery(SSqlObj* pSql) { @@ -983,14 +984,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { int32_t numOfFetch = 0; bool hasData = true; + + // if the subquery is NULL, it does not involved in the final result generation for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - // if the subquery is NULL, it does not involved in the final result generation SSqlObj* pSub = pSql->pSubs[i]; if (pSub == NULL) { continue; } - + SSqlRes *pRes = &pSub->res; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); if (!tscHasReachLimitation(pQueryInfo, pRes)) { @@ -1014,8 +1017,61 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { tscBuildResFromSubqueries(pSql); return; } else if (numOfFetch <= 0) { - pSql->res.completed = true; - freeJoinSubqueryObj(pSql); + + bool tryNextVnode = false; + + SSqlObj* pp = pSql->pSubs[0]; + SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0); + if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { + + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + if (pSql->pSubs[i] != NULL) { + pSql->subState.numOfRemain++; + } + } + + } + + for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); + + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pQueryInfo->numOfTables == 1); + + // for projection query, need to try next vnode if current vnode is exhausted + int32_t numOfVgroups = 0; // TODO refactor + if (pTableMetaInfo->pVgroupTables != NULL) { + numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables); + } else { + numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + } + + if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) { + tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub, + pTableMetaInfo->vgroupIndex); + pSub->cmd.command = TSDB_SQL_SELECT; + pSub->fp = tscJoinQueryCallback; + + tscProcessSql(pSub); + tryNextVnode = true; + } else { + tscDebug("%p no result in current subquery anymore", pSub); + } + } + } + + if (tryNextVnode) { + return; + } else { + pSql->res.completed = true; + freeJoinSubqueryObj(pSql); + } if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, 0); @@ -1027,15 +1083,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { } // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled + // retrieve data from current vnode. tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); - SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); + SJoinSupporter* pSupporter = NULL; + tscUpdateSubqueryStatus(pSql, numOfFetch); for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; if (pSql1 == NULL) { continue; } - + SSqlRes* pRes1 = &pSql1->res; SSqlCmd* pCmd1 = &pSql1->cmd; diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index a71cb39826..dd14523c2a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -4775,6 +4775,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { return false; } } else { + STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf); + if (tVariantCompare(&elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) { + STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pRuntimeEnv->pCtx[0].tag); + if (elem1.vnode < 0) { + return false; + } + } + tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); } } -- GitLab