From 335cb6513edc8044e31efaf6e19218b82885ba8c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Oct 2022 19:32:57 +0800 Subject: [PATCH] other:merge 3.0 --- source/dnode/vnode/src/sma/smaRollup.c | 1 + source/libs/executor/inc/executorimpl.h | 33 ++++++++++--------- source/libs/executor/src/executil.c | 7 +++- source/libs/executor/src/scanoperator.c | 9 +++-- source/libs/executor/src/timewindowoperator.c | 2 ++ 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index af21ba83b7..0ed9ec210f 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -671,6 +671,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma while (1) { uint64_t ts; bool hasMore = false; +// ASSERT(0); int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); if (code < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fc9524e828..50c62b9017 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -340,21 +340,23 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SArray* queryConds; // array of queryTableDataCond + STsdbReader* pReader; + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; @@ -371,7 +373,6 @@ typedef struct STableMergeScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; - STsdbReader* pReader; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index dbef032041..56158e567e 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1681,9 +1681,14 @@ int32_t addTableIntoTableList(STableListInfo* pTableList, uint64_t uid, uint64_t STableKeyInfo keyInfo = {.uid = uid, .groupId = gid}; taosArrayPush(pTableList->pTableList, &keyInfo); - if (pTableList->oneTableForEachGroup || pTableList->numOfOuputGroups > 1) { + if (!pTableList->oneTableForEachGroup) { + if (pTableList->map == NULL) { + pTableList->map = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + } + taosHashPut(pTableList->map, &uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId)); } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index fec9217bdb..32d2684a0a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -624,11 +624,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { ASSERT(binfo.uid != 0); pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); - ASSERT(pBlock->info.groupId != 0); -// uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); -// if (groupId) { -// pBlock->info.groupId = *groupId; -// } uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -1570,6 +1565,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock if (groupIdPre) { pInfo->pRes->info.groupId = *groupIdPre; } else { + ASSERT(0); pInfo->pRes->info.groupId = 0; } @@ -4438,6 +4434,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; + int64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; @@ -4514,8 +4511,10 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { } tsdbReaderClose(pInfo->pReader); pInfo->pReader = NULL; + return NULL; } + static SSDataBlock* getTableDataBlock2(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8e1b15f315..45e3c959ce 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -5384,6 +5384,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); minTs = TMIN(minTs, pBlock->info.window.skey); + qDebug("-------------------------groupId:%ld", pBlock->info.groupId); + doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.groupId, pUpdatedMap); } pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); -- GitLab