diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 133bd116ba5e8f5468711b3d43a56d6ef5331d3d..82b0f852ab5b9d615051c2b2589ab4efda8af1a5 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -145,7 +145,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) { return; } - // local reducer has handle this situation during super table non-projection query. + // local merge has handle this situation during super table non-projection query. if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) { pRes->numOfTotalInCurrentClause += pRes->numOfRows; } @@ -222,9 +222,30 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi tscResetForNextRetrieve(pRes); // handle the sub queries of join query - if (pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscFetchDatablockFromSubquery(pSql); - } else { + } else if (pRes->completed && pCmd->command == TSDB_SQL_FETCH) { + if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes. + tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode); + return; + } else { + /* + * all available virtual node has been checked already, now we need to check + * for the next subclause queries + */ + if (pCmd->clauseIndex < pCmd->numOfClause - 1) { + tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode); + return; + } + + /* + * 1. has reach the limitation + * 2. no remain virtual nodes to be retrieved anymore + */ + (*pSql->fetchFp)(param, pSql, 0); + } + return; + } else { // current query is not completed, continue retrieve from node if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) { pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH; } diff --git a/src/client/src/tscSecondaryMerge.c b/src/client/src/tscSecondaryMerge.c index 71d50b56f91e54ba6e7ae8bfccc167b35dbaae9a..a3a0441ff899a4c4cf317b1e9d7d39909069bddf 100644 --- a/src/client/src/tscSecondaryMerge.c +++ b/src/client/src/tscSecondaryMerge.c @@ -283,7 +283,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd // used to keep the latest input row pReducer->pTempBuffer = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage)); - pReducer->discardData = (tFilePage *)calloc(1, pReducer->rowSize + sizeof(tFilePage)); pReducer->discard = false; @@ -311,6 +310,8 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd return; } + size = tscSqlExprNumOfExprs(pQueryInfo); + pReducer->pTempBuffer->numOfElems = 0; pReducer->pResInfo = calloc(size, sizeof(SResultInfo)); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 43370c70372d9f86bb3c318772afc67beb6f12d3..bc717ed88cda2a06ce8ce055e8aeb59b5b335f62 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1445,8 +1445,9 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) { } pRes->row = 0; + pRes->completed = (pRes->numOfRows == 0); - uint8_t code = pRes->code; + int32_t code = pRes->code; if (pRes->code == TSDB_CODE_SUCCESS) { (*pSql->fp)(pSql->param, pSql, pRes->numOfRows); } else { diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 14f0fa07caab1daac854cc9372fedc88e161ee12..87292f4fe659adf2c0d54ad08143fbcf2ba36126 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -463,11 +463,11 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { return NULL; } - // current data are exhausted, fetch more data - if (pRes->row >= pRes->numOfRows && pRes->completed != true && + // current data set are exhausted, fetch more data from node + if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql)) && (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE || - pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE || + pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE || pCmd->command == TSDB_SQL_FETCH || pCmd->command == TSDB_SQL_SHOW || pCmd->command == TSDB_SQL_SELECT || diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 02af71ee66a8b24f6a0a42170316c00296c2abb7..4bcfd6e8225e67adce509d8f61ce426dabffe26d 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -1177,7 +1177,7 @@ int32_t tscHandleMasterJoinQuery(SSqlObj* pSql) { } } - pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_METRIC_JOIN_RETRIEVE; + pSql->cmd.command = (pSql->numOfSubs <= 0)? TSDB_SQL_RETRIEVE_EMPTY_RESULT:TSDB_SQL_TABLE_JOIN_RETRIEVE; return TSDB_CODE_SUCCESS; } @@ -1923,7 +1923,7 @@ void **doSetResultRowData(SSqlObj *pSql, bool finalResult) { assert(pRes->row >= 0 && pRes->row <= pRes->numOfRows); - if(pCmd->command == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + if(pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) { if (pRes->completed) { tfree(pRes->tsrow); } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f9531ec1aa57688860006770ba375fd599eddeb4..0062c4dfc12857c175b82d274b79af32ece7b9ee 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -386,7 +386,7 @@ void tscPartiallyFreeSqlObj(SSqlObj* pSql) { int32_t cmd = pCmd->command; if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || - cmd == TSDB_SQL_METRIC_JOIN_RETRIEVE) { + cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { tscRemoveFromSqlList(pSql); } @@ -1647,9 +1647,11 @@ void doRemoveTableMetaInfo(SQueryInfo* pQueryInfo, int32_t index, bool removeFro void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, const char* address, bool removeFromCache) { tscTrace("%p deref the table meta in cache, numOfTables:%d", address, pQueryInfo->numOfTables); - int32_t index = pQueryInfo->numOfTables; - while (index >= 0) { - doRemoveTableMetaInfo(pQueryInfo, --index, removeFromCache); + for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i); + + tscClearTableMetaInfo(pTableMetaInfo, removeFromCache); + free(pTableMetaInfo); } tfree(pQueryInfo->pTableMetaInfo); @@ -1701,16 +1703,8 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo, bool removeFromCache) taosCacheRelease(tscCacheHandle, (void**)&(pTableMetaInfo->pTableMeta), removeFromCache); tfree(pTableMetaInfo->vgroupList); - if (pTableMetaInfo->tagColList != NULL) { - size_t numOfTags = taosArrayGetSize(pTableMetaInfo->tagColList); - for(int32_t i = 0; i < numOfTags; ++i) { // todo do NOT use the allocated object - SColumn* pCol = taosArrayGetP(pTableMetaInfo->tagColList, i); - tfree(pCol); - } - - taosArrayDestroy(pTableMetaInfo->tagColList); - pTableMetaInfo->tagColList = NULL; - } + tscColumnListDestroy(pTableMetaInfo->tagColList); + pTableMetaInfo->tagColList = NULL; } void tscResetForNextRetrieve(SSqlRes* pRes) { @@ -2001,22 +1995,28 @@ char* tscGetErrorMsgPayload(SSqlCmd* pCmd) { return pCmd->payload; } /** * If current vnode query does not return results anymore (pRes->numOfRows == 0), try the next vnode if exists, - * in case of multi-vnode super table projection query and the result does not reach the limitation. + * while multi-vnode super table projection query and the result does not reach the limitation. */ bool hasMoreVnodesToTry(SSqlObj* pSql) { -// SSqlCmd* pCmd = &pSql->cmd; -// SSqlRes* pRes = &pSql->res; + SSqlCmd* pCmd = &pSql->cmd; + SSqlRes* pRes = &pSql->res; + if (pCmd->command != TSDB_SQL_FETCH) { + return false; + } -// SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex); -// STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); -// if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->pMetricMeta == NULL)) { + STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); + assert(pRes->completed); + + // for normal table, do not try any more if result are exhausted + if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo) || (pTableMetaInfo->vgroupList == NULL)) { return false; -// } + } -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; -// return pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && -// (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < totalVnode - 1); + int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && + (!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1); } void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { @@ -2032,12 +2032,11 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { assert(pRes->numOfRows == 0 && tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) && !tscHasReachLimitation(pQueryInfo, pRes)); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); - int32_t totalVnode = 0; -// int32_t totalVnode = pTableMetaInfo->pMetricMeta->numOfVnodes; - - while (++pTableMetaInfo->vgroupIndex < totalVnode) { + + int32_t totalVgroups = pTableMetaInfo->vgroupList->numOfVgroups; + while (++pTableMetaInfo->vgroupIndex < totalVgroups) { tscTrace("%p current vnode:%d exhausted, try next:%d. total vnode:%d. current numOfRes:%d", pSql, - pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVnode, pRes->numOfTotalInCurrentClause); + pTableMetaInfo->vgroupIndex - 1, pTableMetaInfo->vgroupIndex, totalVgroups, pRes->numOfTotalInCurrentClause); /* * update the limit and offset value for the query on the next vnode, @@ -2053,10 +2052,10 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { } pQueryInfo->limit.offset = pRes->offset; - assert((pRes->offset >= 0 && pRes->numOfRows == 0) || (pRes->offset == 0 && pRes->numOfRows >= 0)); - tscTrace("%p new query to next vnode, vnode index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, pSql, - pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); + + tscTrace("%p new query to next vgroup, index:%d, limit:%" PRId64 ", offset:%" PRId64 ", glimit:%" PRId64, + pSql, pTableMetaInfo->vgroupIndex, pQueryInfo->limit.limit, pQueryInfo->limit.offset, pQueryInfo->clauseLimit); /* * For project query with super table join, the numOfSub is equalled to the number of all subqueries. @@ -2070,43 +2069,46 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) { tscResetForNextRetrieve(pRes); // in case of async query, set the callback function - void* fp1 = pSql->fp; +// void* fp1 = pSql->fp; pSql->fp = fp; - if (fp1 != NULL) { - assert(fp != NULL); - } - - int32_t ret = tscProcessSql(pSql); // todo check for failure +// if (fp1 != NULL) { +// assert(fp != NULL); +// } - // in case of async query, return now - if (fp != NULL) { - return; - } - - if (ret != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; + int32_t ret = tscProcessSql(pSql); + if (ret == TSDB_CODE_SUCCESS) { return; + } else {// todo check for failure } - - // retrieve data - assert(pCmd->command == TSDB_SQL_SELECT); - pCmd->command = TSDB_SQL_FETCH; - - if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { - pSql->res.code = ret; - return; - } - - // if the result from current virtual node are empty, try next if exists. otherwise, return the results. - if (pRes->numOfRows > 0) { - break; - } + // in case of async query, return now +// if (fp != NULL) { +// return; +// } +// +// if (ret != TSDB_CODE_SUCCESS) { +// pSql->res.code = ret; +// return; +// } +// +// // retrieve data +// assert(pCmd->command == TSDB_SQL_SELECT); +// pCmd->command = TSDB_SQL_FETCH; +// +// if ((ret = tscProcessSql(pSql)) != TSDB_CODE_SUCCESS) { +// pSql->res.code = ret; +// return; +// } +// +// // if the result from current virtual node are empty, try next if exists. otherwise, return the results. +// if (pRes->numOfRows > 0) { +// break; +// } } - if (pRes->numOfRows == 0) { - tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); - } +// if (pRes->numOfRows == 0) { +// tscTrace("%p all vnodes exhausted, prj query completed. total res:%d", pSql, totalVnode, pRes->numOfTotal); +// } } void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { diff --git a/src/query/inc/qsqltype.h b/src/query/inc/qsqltype.h index 08d30be925796654ec127de997e6a236ea39f6fc..4087be49eeac3042f04013b9e929fce30d323cea 100644 --- a/src/query/inc/qsqltype.h +++ b/src/query/inc/qsqltype.h @@ -61,7 +61,7 @@ enum _sql_type { TSDB_SQL_LOCAL, // SQL below for client local TSDB_SQL_DESCRIBE_TABLE, TSDB_SQL_RETRIEVE_LOCALMERGE, - TSDB_SQL_METRIC_JOIN_RETRIEVE, + TSDB_SQL_TABLE_JOIN_RETRIEVE, /* * build empty result instead of accessing dnode to fetch result diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index be86f64899786aa5bbe68e62971aa997fa6799eb..ef6ff8a70bf3cb9ba08fa63e34b29bdfaa7693f4 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -2398,7 +2398,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { // todo extract methods if (isIntervalQuery(pQuery) && pRuntimeEnv->windowResInfo.prevSKey == 0) { TSKEY skey1, ekey1; - STimeWindow w = {0}; + STimeWindow w = TSWINDOW_INITIALIZER; SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (QUERY_IS_ASC_QUERY(pQuery)) { diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1349095ea56fca9668e300675f5d43b57257b68f..a5cba70219a542963cc92b7e276866a8f1461c53 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -54,12 +54,12 @@ typedef struct SQueryFilePos { typedef struct SDataBlockLoadInfo { SFileGroup* fileGroup; int32_t slot; - int32_t sid; + int32_t tid; SArray* pLoadedCols; } SDataBlockLoadInfo; typedef struct SLoadCompBlockInfo { - int32_t sid; /* table sid */ + int32_t tid; /* table tid */ int32_t fileId; int32_t fileListIndex; } SLoadCompBlockInfo; @@ -127,12 +127,12 @@ static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) { pBlockLoadInfo->slot = -1; - pBlockLoadInfo->sid = -1; + pBlockLoadInfo->tid = -1; pBlockLoadInfo->fileGroup = NULL; } static void tsdbInitCompBlockLoadInfo(SLoadCompBlockInfo* pCompBlockLoadInfo) { - pCompBlockLoadInfo->sid = -1; + pCompBlockLoadInfo->tid = -1; pCompBlockLoadInfo->fileId = -1; pCompBlockLoadInfo->fileListIndex = -1; } @@ -423,7 +423,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo bool blockLoaded = false; SArray* sa = getDefaultLoadColumns(pQueryHandle, true); - if (pCheckInfo->pDataCols == NULL) { + if (pCheckInfo->pDataCols == NULL) { // todo: why not the real data? pCheckInfo->pDataCols = tdNewDataCols(pRepo->tsdbMeta->maxRowBytes, pRepo->tsdbMeta->maxCols, pRepo->config.maxRowsPerFileBlock); } @@ -434,7 +434,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup; pBlockLoadInfo->slot = pQueryHandle->cur.slot; - pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid; + pBlockLoadInfo->tid = pCheckInfo->pTableObj->tableId.tid; blockLoaded = true; } @@ -614,7 +614,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i); int32_t bytes = pCol->info.bytes; - for (int32_t j = 0; j < numOfCols; ++j) { + for (int32_t j = 0; j < numOfCols; ++j) { //todo opt performance SDataCol* src = &pQueryHandle->rhelper.pDataCols[0]->cols[j]; if (pCol->info.colId == src->colId) { @@ -1197,7 +1197,9 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) { } else { // data block has been loaded, todo extract method SDataBlockLoadInfo* pBlockLoadInfo = &pHandle->dataBlockLoadInfo; - if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->sid == pCheckInfo->pTableObj->tableId.tid) { + + if (pBlockLoadInfo->slot == pHandle->cur.slot && pBlockLoadInfo->fileGroup->fileId == pHandle->cur.fid && + pBlockLoadInfo->tid == pCheckInfo->pTableObj->tableId.tid) { return pHandle->pColumns; } else { SCompBlock* pBlock = pBlockInfoEx->pBlock.compBlock; diff --git a/tests/script/general/import/replica1.sim b/tests/script/general/import/replica1.sim index b2b7291623a7a18320c78b36a14e20d940eedab6..1bd1419496aebd532682c5632e52776dca538566 100644 --- a/tests/script/general/import/replica1.sim +++ b/tests/script/general/import/replica1.sim @@ -111,7 +111,8 @@ sql import into tb values(1520000034001, 34001) sql import into tb values(1520000050001, 50001) sql select * from tb; print $rows -if $rows != 19 then +if $rows != 19 then + print expect 19, actual: $rows return -1 endi