diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f23b5f8526ab30b77770a12a1525e2c53c177691..18de4b75d0e57b1252c7e7ac23bbddea5c5bbd62 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -68,14 +68,14 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs int32_t vgId = TD_VID(pTq->pVnode); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); + tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId); if (pOffset->type == TMQ_OFFSET__LOG) { pRsp->rspOffset = *pOffset; return 0; } else { tqOffsetResetToLog(pOffset, pHandle->snapshotVer); if (qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType) < 0) { - tqDebug("prepare scan failed, return, consumer:0x%"PRIx64, pHandle->consumerId); + tqDebug("prepare scan failed, vgId:%d, consumer:0x%"PRIx64, vgId, pHandle->consumerId); pRsp->rspOffset = *pOffset; return 0; } diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index f99c7de93d95fa99de90a13eb2af8d527b876e0a..9b8f034e448643ddfcfd7c9eaa1a46f181078718 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t uint64_t tableListGetSize(const STableListInfo* pTableList); uint64_t tableListGetSuid(const STableListInfo* pTableList); STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); +int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 3f519568c4b11a3a63d9311c7ada01541027c4a8..c269367eb266bf3365b00d111a6bf1b327a10206 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -143,10 +143,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t fillHistoryVer1; int64_t fillHistoryVer2; - - // int8_t triggerSaved; - // int64_t deleteMarkSaved; - SStreamState* pState; + SStreamState* pState; } SStreamTaskInfo; typedef struct { @@ -168,15 +165,14 @@ typedef struct STaskStopInfo { } STaskStopInfo; struct SExecTaskInfo { - STaskIdInfo id; - uint32_t status; - STimeWindow window; - STaskCostInfo cost; - int64_t owner; // if it is in execution - int32_t code; - int32_t qbufQuota; // total available buffer (in KB) during execution query - - int64_t version; // used for stream to record wal version, why not move to sschemainfo + STaskIdInfo id; + uint32_t status; + STimeWindow window; + STaskCostInfo cost; + int64_t owner; // if it is in execution + int32_t code; + int32_t qbufQuota; // total available buffer (in KB) during execution query + int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; SSchemaInfo schemaInfo; STableListInfo* pTableInfoList; // this is a table list @@ -188,6 +184,7 @@ struct SExecTaskInfo { SLocalFetch localFetch; SArray* pResultBlockList; // result block list STaskStopInfo stopInfo; + SRWLatch lock; // secure the access of STableListInfo }; enum { diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 953d61495141e9b50d29313c030e7b74fc63a482..347a38247e31f2c9261d66714f20993321a3a609 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -34,7 +34,7 @@ struct STableListInfo { int32_t numOfOuputGroups; // the data block will be generated one by one int32_t* groupOffset; // keep the offset value for each group in the tableList SArray* pTableList; - SHashObj* map; // speedup acquire the tableQueryInfo by table uid + SHashObj* map; // speedup acquire the tableQueryInfo by table uid uint64_t suid; }; @@ -1800,6 +1800,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) return taosArrayGet(pTableList->pTableList, index); } +int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) { + int32_t numOfTables = taosArrayGetSize(pTableList->pTableList); + if (startIndex >= numOfTables) { + return -1; + } + + for (int32_t i = startIndex; i < numOfTables; ++i) { + STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i); + if (p->uid == uid) { + return i; + } + } + return -1; +} + uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); ASSERT(pTableList->map != NULL && slot != NULL); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 85f17c0d5389595156d0817816c9d636b979b041..5b71cdcac9c782281425a1053b5d8e60d904c7f3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -410,6 +410,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo } STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + taosWLockLatch(&pTaskInfo->lock); for (int32_t i = 0; i < numOfQualifiedTables; ++i) { uint64_t* uid = taosArrayGet(qa, i); @@ -424,6 +425,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(keyBuf); taosArrayDestroy(qa); + taosWUnLockLatch(&pTaskInfo->lock); return code; } } @@ -445,6 +447,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId); } + taosWUnLockLatch(&pTaskInfo->lock); if (keyBuf != NULL) { taosMemoryFree(keyBuf); } @@ -452,7 +455,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo taosArrayDestroy(qa); } else { // remove the table id in current list qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); + taosWLockLatch(&pTaskInfo->lock); code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList); + taosWUnLockLatch(&pTaskInfo->lock); } return code; @@ -1000,6 +1005,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { } return 0; } + bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; return pTaskInfo->streamInfo.recoverScanFinished; @@ -1080,8 +1086,11 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) { } int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - SOperatorInfo* pOperator = pTaskInfo->pRoot; + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + SOperatorInfo* pOperator = pTaskInfo->pRoot; + STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList; + const char* id = GET_TASKID(pTaskInfo); + pTaskInfo->streamInfo.prepareStatus = *pOffset; pTaskInfo->streamInfo.returned = 0; @@ -1095,20 +1104,23 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT // TODO add more check if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if(pOperator->numOfDownstream != 1){ - qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream); + qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id); return -1; } pOperator = pOperator->pDownstream[0]; } SStreamScanInfo* pInfo = pOperator->info; + STableScanInfo* pScanInfo = pInfo->pTableScanOp->info; + STableScanBase* pScanBaseInfo = &pScanInfo->base; + if (pOffset->type == TMQ_OFFSET__LOG) { - STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; - tsdbReaderClose(pTSInfo->base.dataReader); - pTSInfo->base.dataReader = NULL; + tsdbReaderClose(pScanBaseInfo->dataReader); + pScanBaseInfo->dataReader = NULL; + // let's seek to the next version in wal file if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) { - qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1); + qError("tqSeekVer failed ver:%, %s" PRId64, pOffset->version + 1, id); return -1; } } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { @@ -1117,120 +1129,120 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT int64_t uid = pOffset->uid; int64_t ts = pOffset->ts; + // this value may be changed if new tables are created + taosRLockLatch(&pTaskInfo->lock); + int32_t numOfTables = tableListGetSize(pTableListInfo); + if (uid == 0) { - if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) { - STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0); + if (numOfTables != 0) { + STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); uid = pTableInfo->uid; ts = INT64_MIN; } else { - qError("uid == 0 and tablelist size is 0"); + taosRUnLockLatch(&pTaskInfo->lock); + qError("no table in table list, %s", id); return -1; } } - /*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/ - /*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/ - STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); - - qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); pInfo->pTableScanOp->resultInfo.totalRows = 0; - bool found = false; - for (int32_t i = 0; i < numOfTables; i++) { - STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i); - if (pTableInfo->uid == uid) { - found = true; - pTableScanInfo->currentTable = i; - break; - } - } + // start from current accessed position + int32_t index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable); + taosRUnLockLatch(&pTaskInfo->lock); - // TODO after dropping table, table may not found - if(!found){ - qError("uid not found in tablelist %" PRId64, uid); + if (index >= 0) { + pScanInfo->currentTable = index; + } else { + qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id); return -1; } - if (pTableScanInfo->base.dataReader == NULL) { - STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); - int32_t num = tableListGetSize(pTaskInfo->pTableInfoList); - - if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num, - pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 || - pTableScanInfo->base.dataReader == NULL) { - qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid); + STableKeyInfo keyInfo = {.uid = uid}; + if (pScanBaseInfo->dataReader == NULL) { + int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1, + pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL); + if (code != TSDB_CODE_SUCCESS) { + qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id); + terrno = code; return -1; } - } + } else { + tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1); + int64_t oldSkey = pScanBaseInfo->cond.twindows.skey; + + // let's start from the next ts that returned to consumer. + pScanBaseInfo->cond.twindows.skey = ts + 1; + tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); - STableKeyInfo tki = {.uid = uid}; - tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1); - int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey; - pTableScanInfo->base.cond.twindows.skey = ts + 1; - tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond); - pTableScanInfo->base.cond.twindows.skey = oldSkey; - pTableScanInfo->scanTimes = 0; + // restore the key value + pScanBaseInfo->cond.twindows.skey = oldSkey; + pScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, - ts, pTableScanInfo->currentTable, numOfTables); + qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", + uid, ts, pScanInfo->currentTable, numOfTables, id); + } } else { - qError("invalid pOffset->type:%d", pOffset->type); - return -1; - } - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { - SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); + qError("invalid pOffset->type:%d, %s", pOffset->type, id); return -1; } - SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); - tsdbReaderClose(pInfo->dataReader); - pInfo->dataReader = NULL; + } else { // subType == TOPIC_SUB_TYPE__TABLE/DB + if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid); + return -1; + } - cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - tableListClear(pTaskInfo->pTableInfoList); + SMetaTableInfo mtInfo = getUidfromSnapShot(sContext); + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; - if (mtInfo.uid == 0) { - return 0; // no data - } + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); + tableListClear(pTableListInfo); - initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); - pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; + if (mtInfo.uid == 0) { + return 0; // no data + } - if (pTaskInfo->pTableInfoList == NULL) { - pTaskInfo->pTableInfoList = tableListCreate(); - } + initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo); + pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts; - tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0); +// if (pTableListInfo == NULL) { +// pTableListInfo = tableListCreate(); +// } - STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0); - int32_t size = tableListGetSize(pTaskInfo->pTableInfoList); + tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0); - tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); + STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); + int32_t size = tableListGetSize(pTableListInfo); - cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); - strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); - tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); - pTaskInfo->streamInfo.schema = mtInfo.schema; + tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL); - qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); - } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { - SStreamRawScanInfo* pInfo = pOperator->info; - SSnapContext* sContext = pInfo->sContext; - if (setForSnapShot(sContext, pOffset->uid) != 0) { - qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); - return -1; + cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); + strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName); + tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema); + pTaskInfo->streamInfo.schema = mtInfo.schema; + + qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts); + } else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) { + SStreamRawScanInfo* pInfo = pOperator->info; + SSnapContext* sContext = pInfo->sContext; + if (setForSnapShot(sContext, pOffset->uid) != 0) { + qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version); + return -1; + } + qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); + } else if (pOffset->type == TMQ_OFFSET__LOG) { + SStreamRawScanInfo* pInfo = pOperator->info; + tsdbReaderClose(pInfo->dataReader); + pInfo->dataReader = NULL; + qDebug("tmqsnap qStreamPrepareScan snapshot log"); } - qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts); - } else if (pOffset->type == TMQ_OFFSET__LOG) { - SStreamRawScanInfo* pInfo = pOperator->info; - tsdbReaderClose(pInfo->dataReader); - pInfo->dataReader = NULL; - qDebug("tmqsnap qStreamPrepareScan snapshot log"); } + return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 24a26d575a0c99a127afdda407b76b203054d13a..3a6ab4463ad974ad54a19278bf86d366ff7fdf38 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1989,6 +1989,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo)); pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES); + taosInitRWLatch(&pTaskInfo->lock); pTaskInfo->id.vgId = vgId; pTaskInfo->id.queryId = queryId; pTaskInfo->id.str = buildTaskId(taskId, queryId); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dff1abb9799f43eddd14847cbca26bc1936cc78..b2197017dfbf920d07201fed030a71540c359503 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -766,8 +766,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable); tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1); - qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables, - pInfo->currentTable, pTaskInfo->id.str); + qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables, + pInfo->currentTable, numOfTables, pTaskInfo->id.str); tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond); pInfo->scanTimes = 0; @@ -1569,19 +1569,16 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamScanInfo* pInfo = pOperator->info; + const char* id = GET_TASKID(pTaskInfo); - qDebug("start to exec queue scan"); + qDebug("start to exec queue scan, %s", id); if (pTaskInfo->streamInfo.submit.msgStr != NULL) { - if (pInfo->tqReader->msg2.msgStr == NULL) { - /*pInfo->tqReader->pMsg = pTaskInfo->streamInfo.pReq;*/ - /*const SSubmitReq* pSubmit = pInfo->tqReader->pMsg;*/ - /*if (tqReaderSetDataMsg(pInfo->tqReader, pSubmit, 0) < 0) {*/ - /*void* msgStr = pTaskInfo->streamInfo.*/ + if (pInfo->tqReader->msg2.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; if (tqReaderSetSubmitReq2(pInfo->tqReader, submit.msgStr, submit.msgLen, submit.ver) < 0) { - qError("submit msg messed up when initing stream submit block %p", submit.msgStr); + qError("submit msg messed up when initing stream submit block %p, %s", submit.msgStr, id); pInfo->tqReader->msg2 = (SPackedData){0}; pInfo->tqReader->setMsg = 0; ASSERT(0); @@ -1615,18 +1612,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { - qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows, - pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion); + qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows, + pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id); pTaskInfo->streamInfo.returned = 1; return pResult; } else { + // no data has return already, try to extract data in the WAL if (!pTaskInfo->streamInfo.returned) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; tsdbReaderClose(pTSInfo->base.dataReader); - qDebug("3"); + pTSInfo->base.dataReader = NULL; tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); - qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); + + qDebug("queue scan tsdb over, switch to wal ver:%" PRId64 " %s", pTaskInfo->streamInfo.snapshotVer + 1, id); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; @@ -1643,7 +1642,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqNextBlock(pInfo->tqReader, &ret) < 0) { // if the end is reached, terrno is 0 if (terrno != 0) { - qError("failed to get next log block since %s", terrstr()); + qError("failed to get next log block since %s, %s", terrstr(), id); } } @@ -1658,9 +1657,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } else if (ret.fetchType == FETCH_TYPE__META) { qError("unexpected ret.fetchType:%d", ret.fetchType); continue; - // pTaskInfo->streamInfo.lastStatus = ret.offset; - // pTaskInfo->streamInfo.metaBlk = ret.meta; - // return NULL; } else if (ret.fetchType == FETCH_TYPE__NONE || (ret.fetchType == FETCH_TYPE__SEP && pOperator->status == OP_EXEC_RECV)) { pTaskInfo->streamInfo.lastStatus = ret.offset; @@ -1672,7 +1668,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } } } else { - qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); + qError("unexpected streamInfo prepare type: %d %s", pTaskInfo->streamInfo.prepareStatus.type, id); return NULL; } }