diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index af21ba83b77c2566f373e6033035f3a5e1c68906..0ed9ec210f77b77579959f4bb7791b51067301f6 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -671,6 +671,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; bool hasMore = false; +// ASSERT(0); int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fc9524e8282af67feee1cf5057c8fac4087dda41..50c62b9017802b61f903fea6e719f5528b7af294 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,21 +340,23 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SArray* queryConds; // array of queryTableDataCond + STsdbReader* pReader; + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; @@ -371,7 +373,6 @@ typedef struct STableMergeScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - STsdbReader* pReader; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index dbef0320419431c9da67de833e9627d896256a45..56158e567eb2bc7dc85e7c9394dcc4417eb071a0 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1681,9 +1681,14 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; taosArrayPush(pTableList->pTableList, &keyInfo); - if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) { + if (!pTableList->oneTableForEachGroup) { + if (pTableList->map == NULL) { + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + } + taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fec9217bdbf8783ab9e9fc27464673caa1638746..32d2684a0a6e3a22bccdd0176915e9f3b6766f64 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -624,11 +624,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ASSERT(binfo.uid != 0); pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); - ASSERT(pBlock->info.groupId != 0); -// uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); -// if (groupId) { -// pBlock->info.groupId = *groupId; -// } uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -1570,6 +1565,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; } else { + ASSERT(0); pInfo->pRes->info.groupId = 0; } @@ -4438,6 +4434,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; + int64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; @@ -4514,8 +4511,10 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { } tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; + return NULL; } + static SSDataBlock* getTableDataBlock2(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8e1b15f3154d59b509b7b486dc39b924294659c7..45e3c959ce087fe4d5a05ea2ef90c1d08f9846de 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5384,6 +5384,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); + qDebug("-------------------------groupId:%ld", pBlock->info.groupId); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);