diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e9225f3d6edf1bea49ae7aee163eb4bca536ae9a..24e2b13f46f4c1217d20b92ce52ed7205c80f338 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -87,9 +87,9 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { } int32_t numOfChildEp = taosArrayGetSize(pTask->childEpInfo); - SReadHandle mgHandle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; + SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState }; - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, 0); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); streamSetupTrigger(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 362b8204fccf6dda706bf56b2d4136cf9e5844b3..d98fbab1929bb0e1af43deb04933e479889f9a9b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -804,9 +804,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo); - SReadHandle mgHandle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState}; + SReadHandle handle = {.vnode = NULL, .numOfVgroups = numOfVgroups, .pStateBackend = pTask->pState}; + initStorageAPI(&handle.api); - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle, vgId); + pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId); if (pTask->exec.pExecutor == NULL) { return -1; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 0b89dbc80bf602e725b0fbe9204cf5d34c238a13..b448e8f2b0095cd5dfc02347960d01caf78bd07e 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -443,7 +443,6 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols); tSimpleHashCleanup(pFillSup->pResMap); pFillSup->pResMap = NULL; - ASSERT(0); releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal, &pFillSup->pAPI->stateStore); //????? pFillSup->cur.pRowVal = NULL; cleanupExprSupp(&pFillSup->notFillExprSup); @@ -493,7 +492,6 @@ static void resetFillWindow(SResultRowData* pRowData) { void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStorageAPI* pAPI) { resetFillWindow(&pFillSup->prev); - ASSERT(0); releaseOutputBuf(NULL, NULL, (SResultRow*)pFillSup->cur.pRowVal, &pAPI->stateStore); //??? resetFillWindow(&pFillSup->cur); resetFillWindow(&pFillSup->next); @@ -1335,7 +1333,7 @@ static int32_t initResultBuf(SStreamFillSupporter* pFillSup) { } static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval, - SExprInfo* pFillExprInfo, int32_t numOfFillCols) { + SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI) { SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter)); if (!pFillSup) { return NULL; @@ -1348,6 +1346,7 @@ static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNod pFillSup->type = convertFillType(pPhyFillNode->mode); pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols; pFillSup->interval = *pInterval; + pFillSup->pAPI = pAPI; int32_t code = initResultBuf(pFillSup); if (code != TSDB_CODE_SUCCESS) { @@ -1427,7 +1426,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval; int32_t numOfFillCols = 0; SExprInfo* pFillExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &numOfFillCols); - pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4bd8faf8e935a1354ceaacc408bd9c4226aa37a4..39d619172b0602d4b315520ac574772708b6cb29 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1070,9 +1070,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou pInfo->groupId = groupCol[rowIndex]; } -void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) { +void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t ver) { pTableScanInfo->base.cond.twindows = *pWin; - pTableScanInfo->base.cond.endVersion = version; + pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); @@ -1182,17 +1182,19 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ win.ekey = TMAX(win.ekey, endData[*pRowIndex]); continue; } + if (win.skey == endData[*pRowIndex] && groupId == gpData[*pRowIndex]) { win.skey = TMIN(win.skey, startData[*pRowIndex]); continue; } + ASSERT(!(win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) || !(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0))); break; } - ASSERT(0); -// resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); + STableScanInfo* pTScanInfo = pInfo->pTableScanOp->info; + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; return true; } @@ -1650,13 +1652,13 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - ASSERT(0); // qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, // pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, // pInfo->tqReader->pWalReader->curVersion); tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); return pResult; } + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader); @@ -1673,7 +1675,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (1) { bool hasResult = pAPI->tqReaderFn.tqReaderNextBlockInWal(pInfo->tqReader, id); - ASSERT(0); SSDataBlock* pRes = NULL; struct SWalReader* pWalReader = pAPI->tqReaderFn.tqReaderGetWalReader(pInfo->tqReader); @@ -1757,8 +1758,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { - ASSERT(0); -// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1934,8 +1934,7 @@ FETCH_NEXT_BLOCK: pBlock->info.calWin.ekey = INT64_MAX; pBlock->info.dataLoad = 1; if (pInfo->pUpdateInfo) { - ASSERT(0); -// pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); } blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { @@ -3097,9 +3096,9 @@ _error: static SSDataBlock* doTableCountScan(SOperatorInfo* pOperator); static void destoryTableCountScanOperator(void* param); static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, - SSDataBlock* pRes, char* dbName, tb_uid_t stbUid); + SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI); static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, - SSDataBlock* pRes, char* dbName); + SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI); static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, SSDataBlock* pRes, char* dbName); static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountScanOperatorInfo* pInfo, @@ -3385,11 +3384,11 @@ static void buildVnodeGroupedTableCount(SOperatorInfo* pOperator, STableCountSca } if (pInfo->currGrpIdx < taosArrayGetSize(pInfo->stbUidList)) { tb_uid_t stbUid = *(tb_uid_t*)taosArrayGet(pInfo->stbUidList, pInfo->currGrpIdx); - buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid); + buildVnodeGroupedStbTableCount(pInfo, pSupp, pRes, dbName, stbUid, pAPI); pInfo->currGrpIdx++; } else if (pInfo->currGrpIdx == taosArrayGetSize(pInfo->stbUidList)) { - buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName); + buildVnodeGroupedNtbTableCount(pInfo, pSupp, pRes, dbName, pAPI); pInfo->currGrpIdx++; } else { @@ -3434,7 +3433,7 @@ static void buildVnodeFilteredTbCount(SOperatorInfo* pOperator, STableCountScanO } static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, - SSDataBlock* pRes, char* dbName) { + SSDataBlock* pRes, char* dbName, SStorageAPI* pAPI) { char fullStbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pSupp->groupByDbName) { snprintf(fullStbName, TSDB_TABLE_FNAME_LEN, "%s.%s", dbName, ""); @@ -3442,18 +3441,18 @@ static void buildVnodeGroupedNtbTableCount(STableCountScanOperatorInfo* pInfo, S uint64_t groupId = calcGroupId(fullStbName, strlen(fullStbName)); pRes->info.id.groupId = groupId; - int64_t ntbNum = 0;//metaGetNtbNum(pInfo->readHandle.vnode); - ASSERT(0); - if (ntbNum != 0) { - fillTableCountScanDataBlock(pSupp, dbName, "", ntbNum, pRes); + + int64_t numOfTables = 0;//metaGetNtbNum(pInfo->readHandle.vnode); + pAPI->metaFn.getBasicInfo(pInfo->readHandle.vnode, NULL, NULL, NULL, &numOfTables); + if (numOfTables != 0) { + fillTableCountScanDataBlock(pSupp, dbName, "", numOfTables, pRes); } } static void buildVnodeGroupedStbTableCount(STableCountScanOperatorInfo* pInfo, STableCountScanSupp* pSupp, - SSDataBlock* pRes, char* dbName, tb_uid_t stbUid) { + SSDataBlock* pRes, char* dbName, tb_uid_t stbUid, SStorageAPI* pAPI) { char stbName[TSDB_TABLE_NAME_LEN] = {0}; - ASSERT(0); -// metaGetTableSzNameByUid(pInfo->readHandle.vnode, stbUid, stbName); + pAPI->metaFn.getTableNameByUid(pInfo->readHandle.vnode, stbUid, stbName); char fullStbName[TSDB_TABLE_FNAME_LEN] = {0}; if (pSupp->groupByDbName) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cf58892920c772cb959fb66e8516107b31ef6b19..290bc43cbe540dbc222b99658732e4ac0c1b3da8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2755,7 +2755,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); - pInfo->pState = taosMemoryCalloc(1, sizeof(void)); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, @@ -2917,8 +2917,10 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, return TSDB_CODE_OUT_OF_MEMORY; } + pSup->stateStore = *pStore; + initDummyFunction(pSup->pDummyCtx, pCtx, numOfOutput); - pSup->pState = taosMemoryCalloc(1, sizeof(void)); + pSup->pState = taosMemoryCalloc(1, sizeof(SStreamState)); *(pSup->pState) = *pState; pSup->stateStore.streamStateSetNumber(pSup->pState, -1); @@ -2940,7 +2942,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SqlFunctionCtx* pCtx, return terrno; } - pSup->stateStore = *pStore; int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, "function", tsTempDir); for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].saveHandle.pBuf = pSup->pResultBuf;