diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index be8f417df7dddf2dc5c6d8e0a5a15339898d3006..ca7341d8e91e600d8436acea53f2417241c3d016 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -58,6 +58,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ pSubQueryInfo1->tsBuf = output1; pSubQueryInfo2->tsBuf = output2; + TSKEY st = taosGetTimestampUs(); + // no result generated, return directly if (pSupporter1->pTSBuf == NULL || pSupporter2->pTSBuf == NULL) { tscDebug("%p at least one ts-comp is empty, 0 for secondary query after ts blocks intersecting", pSql); @@ -123,6 +125,9 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts)); tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts)); + + tVariantDestroy(&elem1.tag); + tVariantDestroy(&elem2.tag); } else { pLimit->offset -= 1; } @@ -157,9 +162,10 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSupporter* pSupporter1, SJ tsBufDestroy(pSupporter1->pTSBuf); tsBufDestroy(pSupporter2->pTSBuf); + TSKEY et = taosGetTimestampUs(); tscDebug("%p input1:%" PRId64 ", input2:%" PRId64 ", final:%" PRId64 " in %d vnodes for secondary query after ts blocks " - "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d", pSql, numOfInput1, numOfInput2, output1->numOfTotal, - output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1)); + "intersecting, skey:%" PRId64 ", ekey:%" PRId64 ", numOfVnode:%d, elasped time:%"PRId64" us", pSql, numOfInput1, numOfInput2, output1->numOfTotal, + output1->numOfVnodes, win->skey, win->ekey, tsBufGetNumOfVnodes(output1), et - st); return output1->numOfTotal; } @@ -952,11 +958,21 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR // update the records for each subquery in parent sql object. for (int32_t i = 0; i < pState->numOfSub; ++i) { if (pParentSql->pSubs[i] == NULL) { + tscDebug("%p %p sub:%d not retrieve data", pParentSql, NULL, i); continue; } SSqlRes* pRes1 = &pParentSql->pSubs[i]->res; - pRes1->numOfClauseTotal += pRes1->numOfRows; + + if (pRes1->row > 0 && pRes1->numOfRows > 0) { + tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64 " (not retrieve)", pParentSql, pParentSql->pSubs[i], i, + pRes1->numOfRows, pRes1->numOfTotal); + assert(pRes1->row < pRes1->numOfRows); + } else { + pRes1->numOfClauseTotal += pRes1->numOfRows; + tscDebug("%p sub:%p index:%d numOfRows:%"PRId64" total:%"PRId64, pParentSql, pParentSql->pSubs[i], i, + pRes1->numOfRows, pRes1->numOfTotal); + } } // data has retrieved to client, build the join results @@ -998,8 +1014,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (!tscHasReachLimitation(pQueryInfo, pRes)) { if (pRes->row >= pRes->numOfRows) { + // no data left in current result buffer hasData = false; + // The current query is completed for the active vnode, try next vnode if exists + // If it is completed, no need to fetch anymore. if (!pRes->completed) { numOfFetch++; } @@ -1016,20 +1035,24 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (hasData) { tscBuildResFromSubqueries(pSql); return; - } else if (numOfFetch <= 0) { + } + // If at least one subquery is completed in current vnode, try the next vnode in case of multi-vnode + // super table projection query. + if (numOfFetch <= 0) { bool tryNextVnode = false; SSqlObj* pp = pSql->pSubs[0]; SQueryInfo* pi = tscGetQueryInfoDetail(&pp->cmd, 0); - if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { + // get the number of subquery that need to retrieve the next vnode. + if (tscNonOrderedProjectionQueryOnSTable(pi, 0)) { for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] != NULL) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub != NULL && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { pSql->subState.numOfRemain++; } } - } for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { @@ -1040,7 +1063,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); - if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0)) { + if (tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && pSub->res.row >= pSub->res.numOfRows && pSub->res.completed) { STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); assert(pQueryInfo->numOfTables == 1); @@ -1068,11 +1091,11 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { if (tryNextVnode) { return; - } else { - pSql->res.completed = true; - freeJoinSubqueryObj(pSql); } - + + pSql->res.completed = true; + freeJoinSubqueryObj(pSql); + if (pSql->res.code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, 0); } else { @@ -2130,7 +2153,7 @@ static char* getResultBlockPosition(SSqlCmd* pCmd, SSqlRes* pRes, int32_t column assert(pInfo->pSqlExpr != NULL); *bytes = pInfo->pSqlExpr->resBytes; - char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows; + char* pData = pRes->data + pInfo->pSqlExpr->offset * pRes->numOfRows + pRes->row * (*bytes); return pData; } @@ -2142,11 +2165,13 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { int32_t numOfRes = INT32_MAX; for (int32_t i = 0; i < pSql->subState.numOfSub; ++i) { - if (pSql->pSubs[i] == NULL) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { continue; } - numOfRes = (int32_t)(MIN(numOfRes, pSql->pSubs[i]->res.numOfRows)); + int32_t remain = pSub->res.numOfRows - pSub->res.row; + numOfRes = (int32_t)(MIN(numOfRes, remain)); } if (numOfRes == 0) { @@ -2172,14 +2197,23 @@ static void doBuildResFromSubqueries(SSqlObj* pSql) { size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo); for(int32_t i = 0; i < numOfExprs; ++i) { SColumnIndex* pIndex = &pRes->pColumnIndex[i]; - SSqlRes *pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; - SSqlCmd *pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd; + SSqlRes* pRes1 = &pSql->pSubs[pIndex->tableIndex]->res; + SSqlCmd* pCmd1 = &pSql->pSubs[pIndex->tableIndex]->cmd; char* pData = getResultBlockPosition(pCmd1, pRes1, pIndex->columnIndex, &bytes); memcpy(data, pData, bytes * numOfRes); data += bytes * numOfRes; - pRes1->row = numOfRes; + } + + for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) { + SSqlObj* pSub = pSql->pSubs[i]; + if (pSub == NULL) { + continue; + } + + pSub->res.row += numOfRes; + assert(pSub->res.row <= pSub->res.numOfRows); } pRes->numOfRows = numOfRes; diff --git a/src/query/src/qTsbuf.c b/src/query/src/qTsbuf.c index 740bbfcf2d8142e6e8bff163114f39ffc360f654..eefb0b56a43f97a9bd21e66f1d8d99f7662e3af3 100644 --- a/src/query/src/qTsbuf.c +++ b/src/query/src/qTsbuf.c @@ -400,11 +400,11 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pDa if ((tVariantCompare(&pTSBuf->block.tag, tag) != 0) && ptsData->len > 0) { // new arrived data with different tags value, save current value into disk first writeDataToDisk(pTSBuf); + tVariantAssign(&pTSBuf->block.tag, tag); } else { expandBuffer(ptsData, len); } - tVariantAssign(&pTSBuf->block.tag, tag); memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); // todo check return value @@ -662,7 +662,7 @@ bool tsBufNextPos(STSBuf* pTSBuf) { return false; } - int32_t blockIndex = pCur->order == TSDB_ORDER_ASC ? 0 : pBlockInfo->numOfBlocks - 1; + int32_t blockIndex = (pCur->order == TSDB_ORDER_ASC) ? 0 : (pBlockInfo->numOfBlocks - 1); tsBufGetBlock(pTSBuf, pCur->vgroupIndex + step, blockIndex); break; @@ -688,8 +688,7 @@ void tsBufResetPos(STSBuf* pTSBuf) { } STSElem tsBufGetElem(STSBuf* pTSBuf) { - STSElem elem1 = {.vnode = -1}; - + STSElem elem1 = {.vnode = -1}; if (pTSBuf == NULL) { return elem1; }