From 1c99eb1d4d56086622c32abefdeb809660aeb471 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 31 Aug 2022 15:27:41 +0800 Subject: [PATCH] fix set schema and tbname --- source/dnode/vnode/src/tq/tqExec.c | 4 ++-- source/libs/executor/src/executor.c | 4 ++++ source/libs/executor/src/scanoperator.c | 11 ++++++----- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index dd98fe3194..da596d07f9 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -85,11 +85,11 @@ int64_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; - tqDebug("tmqsnap task start to execute"); + tqDebug("tmq task start to execute"); if (qExecTask(task, &pDataBlock, &ts) < 0) { ASSERT(0); } - tqDebug("tmqsnap task execute end, get %p", pDataBlock); + tqDebug("tmq task execute end, get %p", pDataBlock); if (pDataBlock) { tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7e631ab3e9..f1ac9ef8b1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -856,6 +856,10 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pTaskInfo->tableqinfoList.pTableList, &pInfo->dataReader, NULL); + strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + pTaskInfo->streamInfo.schema = mtInfo.schema; + qDebug("tmqsnap qStreamPrepareScan snapshot data uid %ld ts %ld", mtInfo.uid, pOffset->ts); } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { SStreamRawScanInfo* pInfo = pOperator->info; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index a6b5d68a84..de6768b83a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1280,18 +1280,18 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; - qDebug("stream scan called"); + qDebug("queue scan called"); if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("stream scan tsdb return %d rows", pResult->info.rows); + qDebug("queue scan tsdb return %d rows", pResult->info.rows); return pResult; } else { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->dataReader); pTSInfo->dataReader = NULL; tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); - qDebug("stream scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); + qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { return NULL; } @@ -1310,7 +1310,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } // TODO clean data block if (pInfo->pRes->info.rows > 0) { - qDebug("stream scan log return %d rows", pInfo->pRes->info.rows); + qDebug("queue scan log return %d rows", pInfo->pRes->info.rows); return pInfo->pRes; } } else if (ret.fetchType == FETCH_TYPE__META) { @@ -1324,7 +1324,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ASSERT(pTaskInfo->streamInfo.lastStatus.version + 1 == pInfo->tqReader->pWalReader->curVersion); char formatBuf[80]; tFormatOffset(formatBuf, 80, &ret.offset); - qDebug("stream scan log return null, offset %s", formatBuf); + qDebug("queue scan log return null, offset %s", formatBuf); return NULL; } else { ASSERT(0); @@ -1349,6 +1349,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; + qDebug("stream scan called"); #if 0 SStreamState* pState = pTaskInfo->streamInfo.pState; if (pState) { -- GitLab