From cfcbefba48ac61db673c4e0aca0a4dc54da80ef5 Mon Sep 17 00:00:00 2001 From: hjLiao Date: Wed, 13 May 2020 16:32:33 +0800 Subject: [PATCH] [td-225] fix bugs in multi-vnode projection query. --- src/client/src/tscAsync.c | 27 +++++++- src/client/src/tscServer.c | 3 +- src/client/src/tscSql.c | 6 +- src/client/src/tscSubquery.c | 4 +- src/client/src/tscUtil.c | 126 ++++++++++++++++++----------------- src/query/inc/qsqltype.h | 2 +- 6 files changed, 96 insertions(+), 72 deletions(-) diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 9f207936df..8fba577909 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -145,7 +145,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { return; } - // local reducer has handle this situation during super table non-projection query. + // local merge has handle this situation during super table non-projection query. if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { pRes->numOfTotalInCurrentClause += pRes->numOfRows; } @@ -222,9 +222,30 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi tscResetForNextRetrieve(pRes); // handle the sub queries of join query - if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); - } else { + } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) { + if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. + tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); + return; + } else { + /* + * all available virtual node has been checked already, now we need to check + * for the next subclause queries + */ + if (pCmd->clauseIndex < pCmd->numOfClause - 1) { + tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); + return; + } + + /* + * 1. has reach the limitation + * 2. no remain virtual nodes to be retrieved anymore + */ + (*pSql->fetchFp)(param, pSql, 0); + } + return; + } else { // current query is not completed, continue retrieve from node if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index b958bbe15c..889433770a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1444,8 +1444,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { } pRes->row = 0; + pRes->completed = (pRes->numOfRows == 0); - uint8_t code = pRes->code; + int32_t code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 14f0fa07ca..87292f4fe6 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -463,11 +463,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - // current data are exhausted, fetch more data - if (pRes->row >= pRes->numOfRows && pRes->completed != true && + // current data set are exhausted, fetch more data from node + if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || - pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE || + pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_SELECT || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 1e4a142a90..cf14525c49 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1203,7 +1203,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } - pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE; + pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -1949,7 +1949,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pRes->completed) { tfree(pRes->tsrow); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 171d40b932..f0bc8d9755 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -386,7 +386,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { int32_t cmd = pCmd->command; if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || - cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscRemoveFromSqlList(pSql); } @@ -1644,9 +1644,11 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); - int32_t index = pQueryInfo->numOfTables; - while (index >= 0) { - doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); + for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); + + tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); + free(pTableMetaInfo); } tfree(pQueryInfo->pTableMetaInfo); @@ -1698,16 +1700,8 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); tfree(pTableMetaInfo->vgroupList); - if (pTableMetaInfo->tagColList != NULL) { - size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); - for(int32_t i = 0; i < numOfTags; ++i) { // todo do NOT use the allocated object - SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); - tfree(pCol); - } - - taosArrayDestroy(pTableMetaInfo->tagColList); - pTableMetaInfo->tagColList = NULL; - } + tscColumnListDestroy(pTableMetaInfo->tagColList); + pTableMetaInfo->tagColList = NULL; } void tscResetForNextRetrieve(SSqlRes* pRes) { @@ -1991,22 +1985,28 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } /** * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, - * in case of multi-vnode super table projection query and the result does not reach the limitation. + * while multi-vnode super table projection query and the result does not reach the limitation. */ bool hasMoreVnodesToTry(SSqlObj* pSql) { -// SSqlCmd* pCmd = &pSql->cmd; -// SSqlRes* pRes = &pSql->res; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + if (pCmd->command != TSDB_SQL_FETCH) { + return false; + } -// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pRes->completed); + + // for normal table, do not try any more if result are exhausted + if (!UTIL_TABLE_IS_SUPERTABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) { return false; -// } + } -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; -// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1); + int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && + (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2022,12 +2022,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t totalVnode = 0; -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - - while (++pTableMetaInfo->vgroupIndex < totalVnode) { + + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + while (++pTableMetaInfo->vgroupIndex < totalVgroups) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfTotalInCurrentClause); /* * update the limit and offset value for the query on the next vnode, @@ -2043,10 +2042,10 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { } pQueryInfo->limit.offset = pRes->offset; - assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + + tscTrace("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, + pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); /* * For project query with super table join, the numOfSub is equalled to the number of all subqueries. @@ -2060,43 +2059,46 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { tscResetForNextRetrieve(pRes); // in case of async query, set the callback function - void* fp1 = pSql->fp; +// void* fp1 = pSql->fp; pSql->fp = fp; - if (fp1 != NULL) { - assert(fp != NULL); - } - - int32_t ret = tscProcessSql(pSql); // todo check for failure +// if (fp1 != NULL) { +// assert(fp != NULL); +// } - // in case of async query, return now - if (fp != NULL) { - return; - } - - if (ret != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; + int32_t ret = tscProcessSql(pSql); + if (ret == TSDB_CODE_SUCCESS) { return; + } else {// todo check for failure } - - // retrieve data - assert(pCmd->command == TSDB_SQL_SELECT); - pCmd->command = TSDB_SQL_FETCH; - - if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return; - } - - // if the result from current virtual node are empty, try next if exists. otherwise, return the results. - if (pRes->numOfRows > 0) { - break; - } + // in case of async query, return now +// if (fp != NULL) { +// return; +// } +// +// if (ret != TSDB_CODE_SUCCESS) { +// pSql->res.code = ret; +// return; +// } +// +// // retrieve data +// assert(pCmd->command == TSDB_SQL_SELECT); +// pCmd->command = TSDB_SQL_FETCH; +// +// if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { +// pSql->res.code = ret; +// return; +// } +// +// // if the result from current virtual node are empty, try next if exists. otherwise, return the results. +// if (pRes->numOfRows > 0) { +// break; +// } } - if (pRes->numOfRows == 0) { - tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); - } +// if (pRes->numOfRows == 0) { +// tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); +// } } void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { diff --git a/src/query/inc/qsqltype.h b/src/query/inc/qsqltype.h index 08d30be925..4087be49ee 100644 --- a/src/query/inc/qsqltype.h +++ b/src/query/inc/qsqltype.h @@ -61,7 +61,7 @@ enum _sql_type { TSDB_SQL_LOCAL, // SQL below for client local TSDB_SQL_DESCRIBE_TABLE, TSDB_SQL_RETRIEVE_LOCALMERGE, - TSDB_SQL_METRIC_JOIN_RETRIEVE, + TSDB_SQL_TABLE_JOIN_RETRIEVE, /* * build empty result instead of accessing dnode to fetch result -- GitLab