提交 3fff8aba 编写于 作者: H Haojun Liao

[td-1373]

上级 a89368e6
...@@ -465,6 +465,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { ...@@ -465,6 +465,8 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) { static void updateQueryTimeRange(SQueryInfo* pQueryInfo, STimeWindow* win) {
assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey); assert(pQueryInfo->window.skey <= win->skey && pQueryInfo->window.ekey >= win->ekey);
pQueryInfo->window = *win; pQueryInfo->window = *win;
} }
int32_t tscCompareTidTags(const void* p1, const void* p2) { int32_t tscCompareTidTags(const void* p1, const void* p2) {
...@@ -963,19 +965,18 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -963,19 +965,18 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { static SJoinSupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) {
int32_t notInvolved = 0; int32_t notInvolved = 0;
SJoinSupporter* pSupporter = NULL;
SSubqueryState* pState = &pSql->subState; SSubqueryState* pState = &pSql->subState;
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] == NULL) { if (pSql->pSubs[i] == NULL) {
notInvolved++; notInvolved++;
} else { // } else {
pSupporter = (SJoinSupporter*)pSql->pSubs[i]->param; // (SJoinSupporter*)pSql->pSubs[i]->param;
} }
} }
pState->numOfRemain = numOfFetch; pState->numOfRemain = numOfFetch;
return pSupporter; return NULL;
} }
void tscFetchDatablockFromSubquery(SSqlObj* pSql) { void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
...@@ -983,14 +984,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -983,14 +984,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
int32_t numOfFetch = 0; int32_t numOfFetch = 0;
bool hasData = true; bool hasData = true;
// if the subquery is NULL, it does not involved in the final result generation
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
// if the subquery is NULL, it does not involved in the final result generation
SSqlObj* pSub = pSql->pSubs[i]; SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) { if (pSub == NULL) {
continue; continue;
} }
SSqlRes *pRes = &pSub->res; SSqlRes *pRes = &pSub->res;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (!tscHasReachLimitation(pQueryInfo, pRes)) { if (!tscHasReachLimitation(pQueryInfo, pRes)) {
...@@ -1014,8 +1017,61 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1014,8 +1017,61 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
tscBuildResFromSubqueries(pSql); tscBuildResFromSubqueries(pSql);
return; return;
} else if (numOfFetch <= 0) { } else if (numOfFetch <= 0) {
pSql->res.completed = true;
freeJoinSubqueryObj(pSql); bool tryNextVnode = false;
SSqlObj* pp = pSql->pSubs[0];
SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) {
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
if (pSql->pSubs[i] != NULL) {
pSql->subState.numOfRemain++;
}
}
}
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSub = pSql->pSubs[i];
if (pSub == NULL) {
continue;
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0);
if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pQueryInfo->numOfTables == 1);
// for projection query, need to try next vnode if current vnode is exhausted
int32_t numOfVgroups = 0; // TODO refactor
if (pTableMetaInfo->pVgroupTables != NULL) {
numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
} else {
numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
}
if ((++pTableMetaInfo->vgroupIndex) < numOfVgroups) {
tscDebug("%p no result in current vnode anymore, try next vnode, vgIndex:%d", pSub,
pTableMetaInfo->vgroupIndex);
pSub->cmd.command = TSDB_SQL_SELECT;
pSub->fp = tscJoinQueryCallback;
tscProcessSql(pSub);
tryNextVnode = true;
} else {
tscDebug("%p no result in current subquery anymore", pSub);
}
}
}
if (tryNextVnode) {
return;
} else {
pSql->res.completed = true;
freeJoinSubqueryObj(pSql);
}
if (pSql->res.code == TSDB_CODE_SUCCESS) { if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pSql->param, pSql, 0); (*pSql->fp)(pSql->param, pSql, 0);
...@@ -1027,15 +1083,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -1027,15 +1083,17 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
} }
// TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled
// retrieve data from current vnode.
tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch); tscDebug("%p retrieve data from %d subqueries", pSql, numOfFetch);
SJoinSupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); SJoinSupporter* pSupporter = NULL;
tscUpdateSubqueryStatus(pSql, numOfFetch);
for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SSqlObj* pSql1 = pSql->pSubs[i]; SSqlObj* pSql1 = pSql->pSubs[i];
if (pSql1 == NULL) { if (pSql1 == NULL) {
continue; continue;
} }
SSqlRes* pRes1 = &pSql1->res; SSqlRes* pRes1 = &pSql1->res;
SSqlCmd* pCmd1 = &pSql1->cmd; SSqlCmd* pCmd1 = &pSql1->cmd;
......
...@@ -4775,6 +4775,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) { ...@@ -4775,6 +4775,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
return false; return false;
} }
} else { } else {
STSElem elem = tsBufGetElem(pRuntimeEnv->pTSBuf);
if (tVariantCompare(&elem.tag, &pRuntimeEnv->pCtx[0].tag) != 0) {
STSElem elem1 = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, pQInfo->vgId, &pRuntimeEnv->pCtx[0].tag);
if (elem1.vnode < 0) {
return false;
}
}
tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur); tsBufSetCursor(pRuntimeEnv->pTSBuf, &pRuntimeEnv->cur);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册