diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 772f9049e582577b23faffe399e9ac0961e4b9fd..fd2abbb5595bdd6df254b0f3bbb3ab26b56bf806 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -400,7 +400,7 @@ TEST(testCase, show_vgroup_Test) { taos_free_result(pRes); taos_close(pConn); } -#endif + TEST(testCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -653,6 +653,7 @@ TEST(testCase, projection_query_stables) { taos_free_result(pRes); taos_close(pConn); } +#endif TEST(testCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -661,7 +662,7 @@ TEST(testCase, agg_query_tables) { TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); - pRes = taos_query(pConn, "select count(*) from tu"); + pRes = taos_query(pConn, "select length('abc') from tu"); if (taos_errno(pRes) != 0) { printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 32a5140da2526c3bcd266aea4ed73ba5b689f0c4..b1f15482a9a8da98808aee1b50ff168d6ccba674 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -609,6 +609,12 @@ typedef struct SSessionAggOperatorInfo { SColumnInfoData timeWindowData; // query time window info for scalar function execution. } SSessionAggOperatorInfo; +typedef struct STimeSliceOperatorInfo { + SOptrBasicInfo binfo; + SInterval interval; + SGroupResInfo groupResInfo; // multiple results build supporter +} STimeSliceOperatorInfo; + typedef struct SStateWindowOperatorInfo { SOptrBasicInfo binfo; SAggSupporter aggSup; @@ -707,6 +713,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, @@ -716,7 +724,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createTagScanOperatorInfo(SReaderHandle* pReaderHandle, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ee5c6759890d8691c4025755d83705eed6fbf342..1bc84f85d7a480e5ef6bd0a2d82ef416add67593 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5429,58 +5429,48 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { return NULL; } - STableIntervalOperatorInfo* pIntervalInfo = pOperator->info; - - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + STimeSliceOperatorInfo* pSliceInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } - return pIntervalInfo->binfo.pRes; + return pSliceInfo->binfo.pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; - STimeWindow win = pQueryAttr->window; - + int32_t order = TSDB_ORDER_ASC; +// STimeWindow win = pQueryAttr->window; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - if (pBlock == NULL) { break; } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput); - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pIntervalInfo->binfo.pCtx, pBlock, pQueryAttr->order.order); - hashAllIntervalAgg(pOperator, &pIntervalInfo->binfo.resultRowInfo, pBlock, 0); + setInputDataBlock(pOperator, pSliceInfo->binfo.pCtx, pBlock, order); + hashAllIntervalAgg(pOperator, &pSliceInfo->binfo.resultRowInfo, pBlock, 0); } // restore the value - pQueryAttr->order.order = order; - pQueryAttr->window = win; - pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo); + closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pIntervalInfo->binfo.pCtx, pOperator->numOfOutput); + finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo); - // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); + initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); + // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); - if (pIntervalInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } - return pIntervalInfo->binfo.pRes->info.rows == 0 ? NULL : pIntervalInfo->binfo.pRes; + return pSliceInfo->binfo.pRes->info.rows == 0 ? NULL : pSliceInfo->binfo.pRes; } static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { @@ -6238,28 +6228,34 @@ _error: return NULL; } -SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, - SExprInfo* pExpr, int32_t numOfOutput) { - STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) { + STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } - // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - - pOperator->name = "AllTimeIntervalAggOperator"; + pOperator->name = "TimeSliceOperator"; // pOperator->operatorType = OP_AllTimeWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->getNextFn = doAllIntervalAgg; - pOperator->closeFn = destroyBasicOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doAllIntervalAgg; + pOperator->closeFn = destroyBasicOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; } SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) {