diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 9cdc3171f5c68f967400cef45a0854396f55bf55..09cbed73920c8142b8dbb16d35aa729f8ac9f381 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -85,12 +85,15 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S tqAddBlockDataToRsp(pDataBlock, pRsp); if (pRsp->withTbName) { + pRsp->withTbName = 0; +#if 1 int64_t uid; int64_t ts; if (qGetStreamScanStatus(task, &uid, &ts) < 0) { ASSERT(0); } tqAddTbNameToRsp(pTq, uid, pRsp, workerId); +#endif } pRsp->blockNum++; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c057485f51c8d74c38322cb116ae66e5e25149f2..10bd0520ad407146a251466e8f8b9ed49735203a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -392,6 +392,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { binfo.capacity = binfo.rows; blockDataEnsureCapacity(pBlock, binfo.rows); pBlock->info = binfo; + ASSERT(binfo.uid != 0); uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -419,6 +420,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { pTableScanInfo->lastStatus.uid = pBlock->info.uid; pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey; + ASSERT(pBlock->info.uid != 0); return pBlock; } return NULL; @@ -438,6 +440,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) { while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) { SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { + ASSERT(p->info.uid != 0); return p; } pTableScanInfo->curTWinIdx += 1; @@ -517,6 +520,35 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) { // check status if (pInfo->lastStatus.uid == pInfo->expStatus.uid && pInfo->lastStatus.ts == pInfo->expStatus.ts) { + while (1) { + SSDataBlock* result = doTableScanGroup(pOperator); + if (result) { + return result; + } + // if no data, switch to next table and continue scan + pInfo->currentTable++; + if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) { + return NULL; + } + STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable); + /*pTableInfo->uid */ + tsdbSetTableId(pInfo->dataReader, pTableInfo->uid); + tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); + pInfo->scanTimes = 0; + pInfo->curTWinIdx = 0; + } + } + // reset to exp table and window start from ts + tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid); + SQueryTableDataCond tmpCond = pInfo->cond; + tmpCond.twindows[0] = (STimeWindow){ + .skey = pInfo->expStatus.ts, + .ekey = INT64_MAX, + }; + tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); + pInfo->scanTimes = 0; + pInfo->curTWinIdx = 0; + while (1) { SSDataBlock* result = doTableScanGroup(pOperator); if (result) { return result; @@ -532,23 +564,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0); pInfo->scanTimes = 0; pInfo->curTWinIdx = 0; - pInfo->lastStatus.ts = pInfo->expStatus.ts; - pInfo->lastStatus.uid = pInfo->expStatus.uid; - return doTableScan(pOperator); } - // reset to exp table and window start from ts - tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid); - SQueryTableDataCond tmpCond = pInfo->cond; - tmpCond.twindows[0] = (STimeWindow){ - .skey = pInfo->expStatus.ts, - .ekey = INT64_MAX, - }; - tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0); - pInfo->scanTimes = 0; - pInfo->curTWinIdx = 0; - pInfo->lastStatus.ts = pInfo->expStatus.ts; - pInfo->lastStatus.uid = pInfo->expStatus.uid; - return doTableScan(pOperator); } if (pInfo->currentGroupId == -1) {