diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index fc5781cb4d1e13f54c61415f060db46c50613ed5..d67a361c21777e0dd164f4cdf89bd90145968bf8 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -567,6 +567,7 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -605,7 +606,7 @@ TEST(testCase, projection_query_tables) { } taos_free_result(pRes); - for(int32_t i = 0; i < 10000000; i += 20) { + for(int32_t i = 0; i < 100000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -625,7 +626,7 @@ TEST(testCase, projection_query_tables) { printf("start to insert next table\n"); - for(int32_t i = 0; i < 10000000; i += 20) { + for(int32_t i = 0; i < 100000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) { taos_close(pConn); } -#endif - TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 68d4216bae23c5cbf766ee887d6ade0c2e24e1a1..db992f85d4cd37ffea4f26481b1c82eae65f3f3d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -126,6 +126,8 @@ STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); int32_t tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList); + int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index db5e90743d030776b23327f1dc1280bb1e19d3f9..096208b96196dd3a83089e8c027d576f408f0270 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -231,3 +231,14 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { return 0; } + +int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + ASSERT(pHandle->tbIdHash != NULL); + + for(int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*) taosArrayGet(tbUidList, i); + taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t)); + } + + return 0; +} diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index fa840e1cd61dee1f796e9569b403e94f8062bee9..6d308d7221983ab677859b037725b3161f4e51c5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -125,6 +125,33 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } +static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) { + SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); + + // let's discard the tables those are not created according to the queried super table. + SMetaReader mr = {0}; + metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); + for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); + + int32_t code = metaGetTableEntryByUid(&mr, *id); + if (code != TSDB_CODE_SUCCESS) { + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); + continue; + } + + ASSERT(mr.me.type == TSDB_CHILD_TABLE); + if (mr.me.ctbEntry.suid != pScanInfo->tableUid) { + continue; + } + + taosArrayPush(qa, id); + } + + metaReaderClear(&mr); + return qa; +} + int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; @@ -134,41 +161,24 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo pInfo = pInfo->pDownstream[0]; } + int32_t code = 0; SStreamBlockScanInfo* pScanInfo = pInfo->info; - if (isAdd) { - SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); - - SMetaReader mr = {0}; - metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); - for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { - int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); - - int32_t code = metaGetTableEntryByUid(&mr, *id); - if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); - continue; - } - - ASSERT(mr.me.type == TSDB_CHILD_TABLE); - if (mr.me.ctbEntry.suid != pScanInfo->tableUid) { - continue; - } + if (isAdd) { // add new table id + SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); - taosArrayPush(qa, id); - } + qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); + code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); + taosArrayDestroy(qa); - metaReaderClear(&mr); + } else { // remove the table id in current list + SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); - qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); - int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - } else { - assert(0); + qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); + code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, tableIdList); + taosArrayDestroy(qa); } - return TSDB_CODE_SUCCESS; + return code; } int32_t qGetQueriedTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, int32_t* tversion) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 37aeb367f521d389b53a52b2e45c0dda6bcbbb13..750554e8288fb8dca1d875ca0841794ff46252c0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2062,15 +2062,7 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p pAggInfo->groupId = groupId; } -/** - * For interval query of both super table and table, copy the data in ascending order, since the output results are - * ordered in SWindowResutl already. While handling the group by query for both table and super table, - * all group result are completed already. - * - * @param pQInfo - * @param result - */ -int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, +int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t start = pGroupResInfo->index; @@ -2087,6 +2079,15 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn continue; } + if (pBlock->info.groupId == 0) { + pBlock->info.groupId = pPos->groupId; + } else { + // current value belongs to different group, it can't be packed into one datablock + if (pBlock->info.groupId != pPos->groupId) { + break; + } + } + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { break; } @@ -2100,9 +2101,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn if (pCtx[j].fpSet.finalize) { int32_t code = pCtx[j].fpSet.finalize(&pCtx[j], pBlock); if (TAOS_FAILED(code)) { - qError("%s build result data block error, code %s", GET_TASKID(taskInfo), tstrerror(code)); - taskInfo->code = code; - longjmp(taskInfo->env, code); + qError("%s build result data block error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); + longjmp(pTaskInfo->env, code); } } else if (strcmp(pCtx[j].pExpr->pExpr->_function.functionName, "_select_value") == 0) { // do nothing, todo refactor @@ -2124,7 +2124,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn } } - // qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)); + qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId); blockDataUpdateTsWindow(pBlock); return 0; } @@ -2145,10 +2145,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG return; } + // clear the existed group id + pBlock->info.groupId = 0; doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs); - - // add condition (pBlock->info.rows >= 1) just to runtime happy - blockDataUpdateTsWindow(pBlock); } static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, @@ -3656,7 +3655,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { doSetOperatorCompleted(pOperator); } - doSetOperatorCompleted(pOperator); return (blockDataGetNumOfRows(pInfo->pRes) != 0) ? pInfo->pRes : NULL; }