diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index aa05c782cc35b59ffcc82bbf0078d1116db392b6..09936607d9d36a71d7bd109fb6e6c24b828eb4be 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -238,6 +238,11 @@ void sortRemoveDuplicates(STableDataBlocks* dataBuf); void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex); +bool hasMoreVnodesToTry(SSqlObj *pSql); +void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); +void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); + + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 823fd359cc7af8aa2cd899eeea7e6c1d25651eae..1d06d382641c719312da447cfa011dafbbd42060 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -457,6 +457,8 @@ extern int tsInsertHeadSize; extern int tscNumOfThreads; extern SIpStrList tscMgmtIpList; +typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int numOfRows); + #ifdef __cplusplus } #endif diff --git a/src/client/src/tscAst.c b/src/client/src/tscAst.c index b9b00bb944fda52a5e28edda69922763795897fa..59a8bab2041a78750da4043e24b315469954aaba 100644 --- a/src/client/src/tscAst.c +++ b/src/client/src/tscAst.c @@ -17,6 +17,7 @@ #include "taosmsg.h" #include "tast.h" #include "tlog.h" +#include "tscSQLParser.h" #include "tscSyntaxtreefunction.h" #include "tschemautil.h" #include "tsdb.h" @@ -26,7 +27,6 @@ #include "tstoken.h" #include "ttypes.h" #include "tutil.h" -#include "tscSQLParser.h" /* * @@ -117,7 +117,7 @@ static tSQLSyntaxNode *tSQLSyntaxNodeCreate(SSchema *pSchema, int32_t numOfCols, do { SSQLToken tableToken = {0}; extractTableNameFromToken(pToken, &tableToken); - + size_t len = strlen(pSchema[i].name); if (strncmp(pToken->z, pSchema[i].name, pToken->n) == 0 && pToken->n == len) break; } while (++i < numOfCols); @@ -271,7 +271,7 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha } // get the operator of expr - uint8_t optr = getBinaryExprOptr(&t0); + uint8_t optr = getBinaryExprOptr(&t0); if (optr == 0) { pError("not support binary operator:%d", t0.type); tSQLSyntaxNodeDestroy(pLeft, NULL); @@ -326,7 +326,7 @@ static tSQLSyntaxNode *createSyntaxTree(SSchema *pSchema, int32_t numOfCols, cha pn->colId = -1; return pn; } else { - uint8_t localOptr = getBinaryExprOptr(&t0); + uint8_t localOptr = getBinaryExprOptr(&t0); if (localOptr == 0) { pError("not support binary operator:%d", t0.type); free(pBinExpr); @@ -422,17 +422,17 @@ void tSQLBinaryExprToString(tSQLBinaryExpr *pExpr, char *dst, int32_t *len) { if (pExpr == NULL) { *dst = 0; *len = 0; - return; + return; } - int32_t lhs = tSQLBinaryExprToStringImpl(pExpr->pLeft, dst, pExpr->pLeft->nodeType); + int32_t lhs = tSQLBinaryExprToStringImpl(pExpr->pLeft, dst, pExpr->pLeft->nodeType); dst += lhs; *len = lhs; - char *start = tSQLOptrToString(pExpr->nSQLBinaryOptr, dst); + char *start = tSQLOptrToString(pExpr->nSQLBinaryOptr, dst); *len += (start - dst); - *len += tSQLBinaryExprToStringImpl(pExpr->pRight, start, pExpr->pRight->nodeType); + *len += tSQLBinaryExprToStringImpl(pExpr->pRight, start, pExpr->pRight->nodeType); } static void UNUSED_FUNC destroySyntaxTree(tSQLSyntaxNode *pNode) { tSQLSyntaxNodeDestroy(pNode, NULL); } @@ -648,7 +648,7 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults /* * traverse the result and apply the function to each item to check if the item is qualified or not */ -static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { +static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE); // brutal force scan the result list and check for each item in the list diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 0e22214147af8c7595c08fe86b1e3db2828a7740..ad60635afe463c1f2bdbb80ff0f40a53f34f1fc2 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -26,19 +26,18 @@ #include "tutil.h" #include "tnote.h" -void tscProcessFetchRow(SSchedMsg *pMsg); -void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows); -static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows); -static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows); +static void tscProcessFetchRow(SSchedMsg *pMsg); +static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRows, void (*fp)()); /* - * proxy function to perform sequentially query&retrieve operation. - * If sql queries upon metric and two-stage merge procedure is not needed, - * it will sequentially query&retrieve data for all vnodes in pCmd->pMetricMeta + * Proxy function to perform sequentially query&retrieve operation. + * If sql queries upon a super table and two-stage merge procedure is not involved (when employ the projection + * query), it will sequentially query&retrieve data for all vnodes */ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows); +static void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows); // TODO return the correct error code to client in tscQueueAsyncError void taos_query_a(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, int), void *param) { @@ -118,38 +117,23 @@ static void tscProcessAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOf SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - // sequentially retrieve data from remain vnodes first, query vnode specified by vnodeIdx - if (numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0)) { - // vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - assert(pMeterMetaInfo->vnodeIndex >= 0); - - /* reach the maximum number of output rows, abort */ - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { - (*pSql->fetchFp)(param, tres, 0); - return; - } - - /* update the limit value according to current retrieval results */ - pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - pQueryInfo->limit.offset = pRes->offset; - - if ((++(pMeterMetaInfo->vnodeIndex)) < pMeterMetaInfo->pMetricMeta->numOfVnodes) { - tscTrace("%p retrieve data from next vnode:%d", pSql, pMeterMetaInfo->vnodeIndex); - - pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. - - tscResetForNextRetrieve(pRes); - pSql->fp = tscProcessAsyncRetrieveNextVnode; - tscProcessSql(pSql); - return; - } - } else { // localreducer has handle this situation - if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { - pRes->numOfTotal += pRes->numOfRows; + if (numOfRows == 0) { + if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. + tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); + } else { + /* + * 1. has reach the limitation + * 2. no remain virtual nodes to be retrieved anymore + */ + (*pSql->fetchFp)(param, pSql, 0); } + + return; + } + + // local reducer has handle this situation during super table non-projection query. + if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC) { + pRes->numOfTotal += pRes->numOfRows; } (*pSql->fetchFp)(param, tres, numOfRows); @@ -185,14 +169,18 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo } /* - * retrieve callback for fetch rows proxy. It serves as the callback function of querying vnode + * retrieve callback for fetch rows proxy. + * The below two functions both serve as the callback function of query virtual node. + * query callback first, and then followed by retrieve callback */ -static void tscProcessAsyncRetrieveNextVnode(void *param, TAOS_RES *tres, int numOfRows) { +static void tscAsyncQueryRowsForNextVnode(void *param, TAOS_RES *tres, int numOfRows) { + // query completed, continue to retrieve tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchRowsProxy); } -static void tscProcessAsyncContinueRetrieve(void *param, TAOS_RES *tres, int numOfRows) { - tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncRetrieve); +void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows) { + // query completed, continue to retrieve + tscProcessAsyncRetrieveImpl(param, tres, numOfRows, tscProcessAsyncFetchSingleRowProxy); } void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), void *param) { @@ -247,11 +235,15 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), pSql->fetchFp = fp; pSql->param = param; - + if (pRes->row >= pRes->numOfRows) { tscResetForNextRetrieve(pRes); - pSql->fp = tscProcessAsyncRetrieve; - pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + pSql->fp = tscProcessAsyncFetchSingleRowProxy; + + if (pCmd->command != TSDB_SQL_RETRIEVE_METRIC && pCmd->command < TSDB_SQL_LOCAL) { + pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; + } + tscProcessSql(pSql); } else { SSchedMsg schedMsg; @@ -263,49 +255,31 @@ void taos_fetch_row_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, TAOS_ROW), } } -void tscProcessAsyncRetrieve(void *param, TAOS_RES *tres, int numOfRows) { +void tscProcessAsyncFetchSingleRowProxy(void *param, TAOS_RES *tres, int numOfRows) { SSqlObj *pSql = (SSqlObj *)tres; SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + if (numOfRows == 0) { - // sequentially retrieve data from remain vnodes. - if (tscProjectionQueryOnSTable(pQueryInfo, 0)) { + if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. + tscTryQueryNextVnode(pSql, tscAsyncQuerySingleRowForNextVnode); + } else { /* - * vnode is denoted by vnodeIdx, continue to query vnode specified by vnodeIdx till all vnode have been retrieved + * 1. has reach the limitation + * 2. no remain virtual nodes to be retrieved anymore */ - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - assert(pMeterMetaInfo->vnodeIndex >= 0); - - /* reach the maximum number of output rows, abort */ - if (pCmd->globalLimit > 0 && pRes->numOfTotal >= pCmd->globalLimit) { - (*pSql->fetchFp)(pSql->param, pSql, NULL); - return; - } - - /* update the limit value according to current retrieval results */ - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - pQueryInfo->limit.limit = pCmd->globalLimit - pRes->numOfTotal; - - if ((++pMeterMetaInfo->vnodeIndex) <= pMeterMetaInfo->pMetricMeta->numOfVnodes) { - pSql->cmd.command = TSDB_SQL_SELECT; // reset flag to launch query first. - - tscResetForNextRetrieve(pRes); - pSql->fp = tscProcessAsyncContinueRetrieve; - tscProcessSql(pSql); - return; - } - } else { (*pSql->fetchFp)(pSql->param, pSql, NULL); } - } else { - for (int i = 0; i < pCmd->numOfCols; ++i) - pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; - pRes->row++; - - (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); + return; } + + for (int i = 0; i < pCmd->numOfCols; ++i) + pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; + pRes->row++; + + (*pSql->fetchFp)(pSql->param, pSql, pSql->res.tsrow); } void tscProcessFetchRow(SSchedMsg *pMsg) { @@ -314,12 +288,12 @@ void tscProcessFetchRow(SSchedMsg *pMsg) { SSqlCmd *pCmd = &pSql->cmd; SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - assert(pCmd->numOfCols == pQueryInfo->fieldsInfo.numOfOutputCols); - for (int i = 0; i < pCmd->numOfCols; ++i) + for (int i = 0; i < pCmd->numOfCols; ++i) { pRes->tsrow[i] = TSC_GET_RESPTR_BASE(pRes, pQueryInfo, i, pQueryInfo->order) + pRes->bytes[i] * pRes->row; + } + pRes->row++; - (*pSql->fetchFp)(pSql->param, pSql, pRes->tsrow); } diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 3000be213a335b8683e6e7a31cea3579df5bad28..ce5c490e8730e680bb2f405b8dcda9a2f33b0ccd 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -164,8 +164,7 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor } // todo handle failed to create sub query -SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, - /*int32_t* numOfComplete, int32_t* gc,*/ int32_t index) { +SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index) { SJoinSubquerySupporter* pSupporter = calloc(1, sizeof(SJoinSubquerySupporter)); if (pSupporter == NULL) { return NULL; @@ -175,13 +174,15 @@ SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pS pSupporter->pState = pState; pSupporter->subqueryIndex = index; - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); pSupporter->interval = pQueryInfo->nAggTimeInterval; pSupporter->limit = pQueryInfo->limit; - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, index); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, pSql->cmd.clauseIndex, index); pSupporter->uid = pMeterMetaInfo->pMeterMeta->uid; + + assert (pSupporter->uid != 0); getTmpfilePath("join-", pSupporter->path); pSupporter->f = fopen(pSupporter->path, "w"); @@ -235,38 +236,48 @@ bool needSecondaryQuery(SQueryInfo* pQueryInfo) { int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { int32_t numOfSub = 0; SJoinSubquerySupporter* pSupporter = NULL; - + + /* + * If the columns are not involved in the final select clause, the secondary query will not be launched + * for the subquery. + */ + SSubqueryState* pState = NULL; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { pSupporter = pSql->pSubs[i]->param; - pSupporter->pState->numOfCompleted = 0; - - /* - * If the columns are not involved in the final select clause, the secondary query will not be launched - * for the subquery. - */ if (pSupporter->exprsInfo.numOfExprs > 0) { ++numOfSub; } } - + + assert(numOfSub > 0); + // scan all subquery, if one sub query has only ts, ignore it - tscTrace( - "%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " - "select clause", - pSql, pSql->numOfSubs, numOfSub); + tscTrace("%p start to launch secondary subqueries, total:%d, only:%d needs to query, others are not retrieve in " + "select clause", pSql, pSql->numOfSubs, numOfSub); - int32_t j = 0; + /* + * the subqueries that do not actually launch the secondary query to virtual node is set as completed. + */ + pState = pSupporter->pState; + pState->numOfTotal = pSql->numOfSubs; + pState->numOfCompleted = (pSql->numOfSubs - numOfSub); + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlObj* pSub = pSql->pSubs[i]; pSupporter = pSub->param; - pSupporter->pState->numOfTotal = numOfSub; if (pSupporter->exprsInfo.numOfExprs == 0) { + tscTrace("%p subquery %d, not need to launch query, ignore it", pSql, i); + tscDestroyJoinSupporter(pSupporter); - taos_free_result(pSub); + tscFreeSqlObj(pSub); + + pSql->pSubs[i] = NULL; continue; } + // todo refactor to avoid the memory problem handling SSqlObj* pNew = createSubqueryObj(pSql, (int16_t)i, tscJoinQueryCallback, pSupporter, NULL); if (pNew == NULL) { pSql->numOfSubs = i; // revise the number of subquery @@ -279,7 +290,8 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { tscClearSubqueryInfo(&pNew->cmd); - pSql->pSubs[j++] = pNew; + pSql->pSubs[i] = pNew; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(&pSub->cmd, 0); @@ -307,6 +319,7 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { // todo refactor function name SQueryInfo* pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); + assert(pNew->numOfSubs == 0 && pNew->cmd.numOfClause == 1 && pNewQueryInfo->numOfTables == 1); tscAddTimestampColumn(pQueryInfo, TSDB_FUNC_TS, 0); tscFieldInfoCalOffset(pNewQueryInfo); @@ -333,13 +346,16 @@ int32_t tscLaunchSecondSubquery(SSqlObj* pSql) { #ifdef _DEBUG_VIEW tscPrintSelectClause(&pNew->cmd, 0); #endif - + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, 0, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, + pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); + tscProcessSql(pNew); } - // revise the number of subs - pSql->numOfSubs = j; - return 0; } @@ -537,13 +553,35 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) { } } +static SJoinSubquerySupporter* tscUpdateSubqueryStatus(SSqlObj* pSql, int32_t numOfFetch) { + int32_t notInvolved = 0; + SJoinSubquerySupporter* pSupporter = NULL; + SSubqueryState* pState = NULL; + + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + notInvolved++; + } else { + pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[i]->param; + pState = pSupporter->pState; + } + } + + pState->numOfTotal = pSql->numOfSubs; + pState->numOfCompleted = pSql->numOfSubs - numOfFetch; + + return pSupporter; +} + void tscFetchDatablockFromSubquery(SSqlObj* pSql) { int32_t numOfFetch = 0; - assert(pSql->numOfSubs >= 1); - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { // this subquery does not need to involve in secondary query + continue; + } + SSqlRes *pRes = &pSql->pSubs[i]->res; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->pSubs[i]->cmd, 0); @@ -562,28 +600,27 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { } if (numOfFetch <= 0) { - return ; + return; } // TODO multi-vnode retrieve for projection query with limitation has bugs, since the global limiation is not handled tscTrace("%p retrieve data from %d subqueries", pSql, numOfFetch); - SJoinSubquerySupporter* pSupporter = (SJoinSubquerySupporter*)pSql->pSubs[0]->param; - pSupporter->pState->numOfTotal = numOfFetch; // wait for all subqueries completed - pSupporter->pState->numOfCompleted = 0; - + SJoinSubquerySupporter* pSupporter = tscUpdateSubqueryStatus(pSql, numOfFetch); + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { SSqlObj* pSql1 = pSql->pSubs[i]; - + if (pSql1 == NULL) { + continue; + } + SSqlRes* pRes1 = &pSql1->res; SSqlCmd* pCmd1 = &pSql1->cmd; pSupporter = (SJoinSubquerySupporter*)pSql1->param; // wait for all subqueries completed - pSupporter->pState->numOfTotal = numOfFetch; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd1, 0); - assert(pRes1->numOfRows >= 0 && pQueryInfo->numOfTables == 1); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -605,6 +642,16 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { // wait for all subquery completed tsem_wait(&pSql->rspSem); + + // update the records for each subquery + for(int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + continue; + } + + SSqlRes* pRes1 = &pSql->pSubs[i]->res; + pRes1->numOfTotalInCurrentClause += pRes1->numOfRows; + } } // all subqueries return, set the result output index @@ -618,7 +665,7 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { return; // the column transfer support struct has been built } - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, 0); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutputCols); for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutputCols; ++i) { @@ -626,15 +673,15 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) { int32_t tableIndexOfSub = -1; for (int32_t j = 0; j < pQueryInfo->numOfTables; ++j) { - SSqlObj* pSub = pSql->pSubs[j]; - - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pSub->cmd, 0, 0); + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, j); if (pMeterMetaInfo->pMeterMeta->uid == pExpr->uid) { tableIndexOfSub = j; break; } } + assert(tableIndexOfSub >= 0 && tableIndexOfSub < pQueryInfo->numOfTables); + SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd; SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0); @@ -710,7 +757,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { if (atomic_add_fetch_32(&pSupporter->pState->numOfCompleted, 1) >= pSupporter->pState->numOfTotal) { tscSetupOutputColumnIndex(pParentSql); - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); /** diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index f67319bc43efa460c2bae9b0130ccf176aa2c520..cc46f0216768accc26e692c3cdf730a25dee4d4d 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -989,7 +989,6 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { pMeterMetaInfo = tscAddEmptyMeterMetaInfo(pQueryInfo); } else { pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); -// assert(pQueryInfo->numOfTables == 1); } if ((code = tscAllocPayload(pCmd, TSDB_PAYLOAD_SIZE)) != TSDB_CODE_SUCCESS) { @@ -1053,6 +1052,8 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { } void *fp = pSql->fp; + ptrdiff_t pos = pSql->asyncTblPos - pSql->sqlstr; + if ((code = tscCheckIfCreateTable(&str, pSql)) != TSDB_CODE_SUCCESS) { /* * For async insert, after get the metermeta from server, the sql string will not be @@ -1062,7 +1063,7 @@ int doParseInsertSql(SSqlObj *pSql, char *str) { */ if (fp != NULL) { if (TSDB_CODE_ACTION_IN_PROGRESS == code) { - tscTrace("async insert and waiting to get meter meta, then continue parse sql: %s", pSql->asyncTblPos); + tscTrace("async insert and waiting to get meter meta, then continue parse sql from offset: %" PRId64, pos); return code; } @@ -1277,11 +1278,8 @@ int tsParseInsertSql(SSqlObj *pSql) { SQueryInfo *pQueryInfo = NULL; tscGetQueryInfoDetailSafely(pCmd, pCmd->clauseIndex, &pQueryInfo); - if (sToken.type == TK_INSERT) { - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT); - } else { - TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_IMPORT); - } + uint16_t type = (sToken.type == TK_INSERT)? TSDB_QUERY_TYPE_INSERT:TSDB_QUERY_TYPE_IMPORT; + TSDB_QUERY_SET_TYPE(pQueryInfo->type, type); sToken = tStrGetToken(pSql->sqlstr, &index, false, 0, NULL); if (sToken.type != TK_INTO) { diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 2c31c42157461c47272d40ed04093ec833640481..70ebced203acd5d3257ef028061bd5d4bcb68323 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -42,12 +42,6 @@ typedef struct SColumnList { SColumnIndex ids[TSDB_MAX_COLUMNS]; } SColumnList; -typedef struct SColumnIdListRes { - SSchema* pSchema; - int32_t numOfCols; - SColumnList list; -} SColumnIdListRes; - static SSqlExpr* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t outputIndex, int32_t colIdx, int32_t tableIndex); static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo); @@ -518,6 +512,8 @@ int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo) { if ((code = doCheckForQuery(pSql, pQuerySql, i)) != TSDB_CODE_SUCCESS) { return code; } + + tscPrintSelectClause(pCmd, i); } // set the command/global limit parameters from the first subclause to the sqlcmd object @@ -1078,9 +1074,10 @@ static void extractColumnNameFromString(tSQLExprItem* pItem) { int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable) { assert(pSelection != NULL && pCmd != NULL); - const char* msg1 = "invalid column name/illegal column type in arithmetic expression"; + const char* msg1 = "invalid column name, or illegal column type"; const char* msg2 = "functions can not be mixed up"; const char* msg3 = "not support query expression"; + const char* msg4 = "columns from different table mixed up in arithmetic expression"; SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex); @@ -1112,17 +1109,17 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel } else if (pItem->pNode->nSQLOptr >= TK_PLUS && pItem->pNode->nSQLOptr <= TK_REM) { // arithmetic function in select - SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - SSchema* pSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); - - SColumnIdListRes columnList = {.pSchema = pSchema, .numOfCols = pMeterMetaInfo->pMeterMeta->numOfColumns, - .list = {0}}; - - int32_t ret = - validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList.list); - if (ret != TSDB_CODE_SUCCESS) { + SColumnList columnList = {0}; + if (validateArithmeticSQLExpr(pItem->pNode, pQueryInfo, &columnList) != TSDB_CODE_SUCCESS) { return invalidSqlErrMsg(pQueryInfo->msg, msg1); } + + int32_t tableIndex = columnList.ids[0].tableIndex; + for(int32_t f = 1; f < columnList.num; ++f) { + if (columnList.ids[f].tableIndex != tableIndex) { + return invalidSqlErrMsg(pQueryInfo->msg, msg4); + } + } char arithmeticExprStr[1024] = {0}; char* p = arithmeticExprStr; @@ -1132,10 +1129,10 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel } // expr string is set as the parameter of function - SColumnIndex index = {0}; + SColumnIndex index = {.tableIndex = tableIndex}; SSqlExpr* pExpr = tscSqlExprInsert(pQueryInfo, outputIndex, TSDB_FUNC_ARITHM, &index, TSDB_DATA_TYPE_DOUBLE, sizeof(double), sizeof(double)); - addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), 0); + addExprParams(pExpr, arithmeticExprStr, TSDB_DATA_TYPE_BINARY, strlen(arithmeticExprStr), index.tableIndex); /* todo alias name should use the original sql string */ if (pItem->aliasName != NULL) { @@ -1144,7 +1141,7 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel strncpy(pExpr->aliasName, arithmeticExprStr, TSDB_COL_NAME_LEN); } - insertResultField(pQueryInfo, i, &columnList.list, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName); + insertResultField(pQueryInfo, i, &columnList, sizeof(double), TSDB_DATA_TYPE_DOUBLE, pExpr->aliasName); } else { /* * not support such expression @@ -2883,7 +2880,7 @@ static int32_t validateSQLExpr(tSQLExpr* pExpr, SQueryInfo* pQueryInfo, SColumnL return TSDB_CODE_INVALID_SQL; } - pList->ids[pList->num++].columnIndex = index.columnIndex; + pList->ids[pList->num++] = index; } else if (pExpr->nSQLOptr == TK_FLOAT && (isnan(pExpr->val.dKey) || isinf(pExpr->val.dKey))) { return TSDB_CODE_INVALID_SQL; } else if (pExpr->nSQLOptr >= TK_MIN && pExpr->nSQLOptr <= TK_LAST_ROW) { @@ -5142,7 +5139,8 @@ void tscPrintSelectClause(SSqlCmd* pCmd, int32_t subClauseIndex) { for (int32_t i = 0; i < pQueryInfo->exprsInfo.numOfExprs; ++i) { SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i); - int32_t size = sprintf(str + offset, "%s(%d)", aAggs[pExpr->functionId].aName, pExpr->colInfo.colId); + int32_t size = sprintf(str + offset, "%s(uid:%" PRId64 ", %d)", aAggs[pExpr->functionId].aName, + pExpr->uid, pExpr->colInfo.colId); offset += size; if (i < pQueryInfo->exprsInfo.numOfExprs - 1) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 3b88ab250ff432b3075f0fa58e90bc28c8c8d692..16a8931dd62e429e732d7b35c2a1ea28b952a9cb 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -725,6 +725,13 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu pNewQueryInfo->colList.numOfCols++; } } + + tscTrace("%p subquery:%p tableIndex:%d, vnodeIdx:%d, type:%d, transfer to ts_comp query to retrieve timestamps, " + "exprInfo:%d, colList:%d, fieldsInfo:%d, name:%s", + pSql, pNew, tableIndex, pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, + pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pNewQueryInfo->pMeterInfo[0]->name); + } else { SQueryInfo *pNewQueryInfo = tscGetQueryInfoDetail(&pNew->cmd, 0); pNewQueryInfo->type |= TSDB_QUERY_TYPE_SUBQUERY; @@ -734,6 +741,7 @@ int32_t tscLaunchJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSubquerySu tscPrintSelectClause(&pNew->cmd, 0); #endif + return tscProcessSql(pNew); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 84adb40c00df884a1939be1cd549c0f1b62c951d..b89c6e49b0ada2de4aa6d7ea6d9481b947151f02 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -201,7 +201,7 @@ int taos_query_imp(STscObj *pObj, SSqlObj *pSql) { pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfTotalInCurrentClause = 0; - + pSql->asyncTblPos = NULL; if (NULL != pSql->pTableHashList) { taosCleanUpHashTable(pSql->pTableHashList); @@ -446,6 +446,10 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { bool allSubqueryExhausted = true; for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] == NULL) { + continue; + } + SSqlRes *pRes1 = &pSql->pSubs[i]->res; SSqlCmd *pCmd1 = &pSql->pSubs[i]->cmd; @@ -456,7 +460,7 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { /* * if the global limitation is not reached, and current result has not exhausted, or next more vnodes are - * available, go on + * available, goes on */ if (pMetaInfo->vnodeIndex < pMetaInfo->pMetricMeta->numOfVnodes && pRes1->row < pRes1->numOfRows && (!tscHasReachLimitation(pQueryInfo1, pRes1))) { @@ -483,37 +487,25 @@ static bool tscHashRemainDataInSubqueryResultSet(SSqlObj *pSql) { return hasData; } -static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { +static void **tscBuildResFromSubqueries(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; while (1) { - if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj - tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); - - SSubqueryState *pState = NULL; - - for (int32_t i = 0; i < pSql->numOfSubs; ++i) { - SSqlObj *pChildObj = pSql->pSubs[i]; - - SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param; - pState = pSupporter->pState; - - tscDestroyJoinSupporter(pChildObj->param); - taos_free_result(pChildObj); - } - - free(pState); - return NULL; - } - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex); if (pRes->tsrow == NULL) { pRes->tsrow = malloc(POINTER_BYTES * pQueryInfo->exprsInfo.numOfExprs); } bool success = false; - if (pSql->numOfSubs >= 2) { // do merge result + + int32_t numOfTableHasRes = 0; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + if (pSql->pSubs[i] != 0) { + numOfTableHasRes++; + } + } + + if (numOfTableHasRes >= 2) { // do merge result SSqlRes *pRes1 = &pSql->pSubs[0]->res; SSqlRes *pRes2 = &pSql->pSubs[1]->res; @@ -528,8 +520,13 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { pRes2->row++; } } else { // only one subquery - SSqlRes *pRes1 = &pSql->pSubs[0]->res; - doSetResultRowData(pSql->pSubs[0]); + SSqlObj *pSub = pSql->pSubs[0]; + if (pSub == NULL) { + pSub = pSql->pSubs[1]; + } + + SSqlRes *pRes1 = &pSub->res; + doSetResultRowData(pSub); success = (pRes1->row++ < pRes1->numOfRows); } @@ -543,104 +540,40 @@ static void **tscJoinResultsetFromBuf(SSqlObj *pSql) { pRes->tsrow[i] = pRes1->tsrow[columnIndex]; } - break; - } else { // continue retrieve data from vnode - tscFetchDatablockFromSubquery(pSql); - if (pRes->code != TSDB_CODE_SUCCESS) { - return NULL; - } - } - } - - return pRes->tsrow; -} - -/** - * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, - * in case of multi-vnode super table projection query and the result does not reach the limitation. - */ -static bool hasMoreVnodesToTry(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes); -} - -static void tscTryQueryNextVnode(SSqlObj *pSql) { - SSqlCmd *pCmd = &pSql->cmd; - SSqlRes *pRes = &pSql->res; - - SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - - /* - * no result returned from the current virtual node anymore, try the next vnode if exists - * if case of: multi-vnode super table projection query - */ - assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); - - SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); - int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; - - while (++pMeterMetaInfo->vnodeIndex < totalVnode) { - tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pRes->numOfTotalInCurrentClause++; - /* - * update the limit and offset value for the query on the next vnode, - * according to current retrieval results - * - * NOTE: - * if the pRes->offset is larger than 0, the start returned position has not reached yet. - * Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0. - * The pRes->offset value will be updated by virtual node, during query execution. - */ - if (pQueryInfo->clauseLimit >= 0) { - pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause; - } - - pQueryInfo->limit.offset = pRes->offset; + break; + } else { // continue retrieve data from vnode + if (!tscHashRemainDataInSubqueryResultSet(pSql)) { // free all sub sqlobj + tscTrace("%p at least one subquery exhausted, free all other %d subqueries", pSql, pSql->numOfSubs - 1); - assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + SSubqueryState *pState = NULL; - /* - * For project query with super table join, the numOfSub is equalled to the number of all subqueries. - * Therefore, we need to reset the value of numOfSubs to be 0. - * - * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. - */ - pSql->numOfSubs = 0; + for (int32_t i = 0; i < pSql->numOfSubs; ++i) { + SSqlObj *pChildObj = pSql->pSubs[i]; + if (pChildObj == NULL) { + continue; + } - pCmd->command = TSDB_SQL_SELECT; - assert(pSql->fp == NULL); + SJoinSubquerySupporter *pSupporter = (SJoinSubquerySupporter *)pChildObj->param; + pState = pSupporter->pState; - int32_t ret = tscProcessSql(pSql); // todo check for failure - if (ret != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return; - } + tscDestroyJoinSupporter(pChildObj->param); + taos_free_result(pChildObj); + } - // retrieve data - assert(pCmd->command == TSDB_SQL_SELECT); - pCmd->command = TSDB_SQL_FETCH; - - if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return; - } + free(pState); + return NULL; + } - // if the result from current virtual node are empty, try next if exists. otherwise, return the results. - if (pRes->numOfRows > 0) { - break; + tscFetchDatablockFromSubquery(pSql); + if (pRes->code != TSDB_CODE_SUCCESS) { + return NULL; + } } } - if (pRes->numOfRows == 0) { - tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); - } + return pRes->tsrow; } TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { @@ -657,7 +590,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { if (pRes->code == TSDB_CODE_SUCCESS) { tscTrace("%p data from all subqueries have been retrieved to client", pSql); - return tscJoinResultsetFromBuf(pSql); + return tscBuildResFromSubqueries(pSql); } else { tscTrace("%p retrieve data from subquery failed, code:%d", pSql, pRes->code); return NULL; @@ -679,7 +612,7 @@ TAOS_ROW taos_fetch_row_impl(TAOS_RES *res) { tscProcessSql(pSql); // retrieve data from virtual node if (hasMoreVnodesToTry(pSql)) { - tscTryQueryNextVnode(pSql); + tscTryQueryNextVnode(pSql, NULL); } /* @@ -702,7 +635,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + if (pSql == NULL || pSql->signature != pSql) { globalCode = TSDB_CODE_DISCONNECTED; return NULL; @@ -713,23 +646,23 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { * instead of two-stage merge */ TAOS_ROW rows = taos_fetch_row_impl(res); - + // current subclause is completed, try the next subclause while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + pSql->cmd.command = pQueryInfo->command; pCmd->clauseIndex++; assert(pSql->fp == NULL); - + pRes->numOfTotal += pRes->numOfTotalInCurrentClause; pRes->numOfTotalInCurrentClause = 0; pRes->rspType = 0; - + pSql->numOfSubs = 0; tfree(pSql->pSubs); - + tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); tscProcessSql(pSql); @@ -744,7 +677,7 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { SSqlObj *pSql = (SSqlObj *)res; SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; - + int nRows = 0; if (pSql == NULL || pSql->signature != pSql) { @@ -759,26 +692,26 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { // current subclause is completed, try the next subclause while (rows == NULL && pCmd->clauseIndex < pCmd->numOfClause - 1) { - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); - + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + pSql->cmd.command = pQueryInfo->command; pCmd->clauseIndex++; - + pRes->numOfTotal += pRes->numOfTotalInCurrentClause; pRes->numOfTotalInCurrentClause = 0; pRes->rspType = 0; - + pSql->numOfSubs = 0; tfree(pSql->pSubs); - + assert(pSql->fp == NULL); - + tscTrace("%p try data in the next subclause:%d, total subclause:%d", pSql, pCmd->clauseIndex, pCmd->numOfClause); tscProcessSql(pSql); - + nRows = taos_fetch_block_impl(res, rows); } - + return nRows; } @@ -822,6 +755,11 @@ void taos_free_result(TAOS_RES *res) { // set freeFlag to 1 in retrieve message if there are un-retrieved results SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + if (pQueryInfo == NULL) { + tscFreeSqlObjPartial(pSql); + return; + } + pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE; SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); @@ -1041,7 +979,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) { pRes->numOfRows = 1; pRes->numOfTotal = 0; pRes->numOfTotalInCurrentClause = 0; - + tscTrace("%p Valid SQL: %s pObj:%p", pSql, sql, pObj); int32_t sqlLen = strlen(sql); @@ -1172,7 +1110,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { pRes->numOfTotal = 0; // the number of getting table meta from server pRes->numOfTotalInCurrentClause = 0; - + pRes->code = 0; assert(pSql->fp == NULL); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 7b1a71d0ce8cf61d6d007508b53dadef93e634c4..19016497ac0f42c1ac77ffe5c5ede310db09943d 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1888,7 +1888,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void #endif char* name = pMeterMetaInfo->name; - SMeterMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { @@ -1907,13 +1906,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void pMeterMetaInfo->tagColumnIndex); } - assert(pFinalInfo->pMeterMeta != NULL); + assert(pFinalInfo->pMeterMeta != NULL && pNewQueryInfo->numOfTables == 1); if (UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo)) { assert(pFinalInfo->pMetricMeta != NULL); } - tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d", pSql, pNew, tableIndex, - pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type); + tscTrace("%p new subquery %p, tableIndex:%d, vnodeIdx:%d, type:%d, exprInfo:%d, colList:%d," + "fieldInfo:%d, name:%s", pSql, pNew, tableIndex, + pMeterMetaInfo->vnodeIndex, pNewQueryInfo->type, pNewQueryInfo->exprsInfo.numOfExprs, pNewQueryInfo->colList.numOfCols, + pNewQueryInfo->fieldsInfo.numOfOutputCols, pFinalInfo->name); + return pNew; } @@ -1990,3 +1992,102 @@ bool tscHasReachLimitation(SQueryInfo* pQueryInfo, SSqlRes* pRes) { } char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } + +/** + * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, + * in case of multi-vnode super table projection query and the result does not reach the limitation. + */ +bool hasMoreVnodesToTry(SSqlObj *pSql) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + return pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes); +} + +void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp) { + SSqlCmd *pCmd = &pSql->cmd; + SSqlRes *pRes = &pSql->res; + + SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + + /* + * no result returned from the current virtual node anymore, try the next vnode if exists + * if case of: multi-vnode super table projection query + */ + assert(pRes->numOfRows == 0 && tscProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); + + SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, 0); + int32_t totalVnode = pMeterMetaInfo->pMetricMeta->numOfVnodes; + + while (++pMeterMetaInfo->vnodeIndex < totalVnode) { + tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, + pMeterMetaInfo->vnodeIndex - 1, pMeterMetaInfo->vnodeIndex, totalVnode, pRes->numOfTotalInCurrentClause); + + /* + * update the limit and offset value for the query on the next vnode, + * according to current retrieval results + * + * NOTE: + * if the pRes->offset is larger than 0, the start returned position has not reached yet. + * Therefore, the pRes->numOfRows, as well as pRes->numOfTotalInCurrentClause, must be 0. + * The pRes->offset value will be updated by virtual node, during query execution. + */ + if (pQueryInfo->clauseLimit >= 0) { + pQueryInfo->limit.limit = pQueryInfo->clauseLimit - pRes->numOfTotalInCurrentClause; + } + + pQueryInfo->limit.offset = pRes->offset; + + assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); + tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, + pMeterMetaInfo->vnodeIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + + /* + * For project query with super table join, the numOfSub is equalled to the number of all subqueries. + * Therefore, we need to reset the value of numOfSubs to be 0. + * + * For super table join with projection query, if anyone of the subquery is exhausted, the query completed. + */ + pSql->numOfSubs = 0; + pCmd->command = TSDB_SQL_SELECT; + + tscResetForNextRetrieve(pRes); + + // in case of async query, set the callback function + void* fp1 = pSql->fp; + if (fp1 != NULL) { + assert(fp != NULL); + pSql->fp = fp; + } + + int32_t ret = tscProcessSql(pSql); // todo check for failure + if (fp != NULL) { + return; + } + + if (ret != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return; + } + + // retrieve data + assert(pCmd->command == TSDB_SQL_SELECT); + pCmd->command = TSDB_SQL_FETCH; + + if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { + pSql->res.code = ret; + return; + } + + // if the result from current virtual node are empty, try next if exists. otherwise, return the results. + if (pRes->numOfRows > 0) { + break; + } + } + + if (pRes->numOfRows == 0) { + tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); + } +}