From 2a08e2a5492ae2b9a0dec42bbd9c2b49cdc24356 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Thu, 16 Feb 2023 15:02:53 +0800 Subject: [PATCH] enh: refactor some asserts in doQueueScan --- source/libs/executor/src/scanoperator.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b556733254..94c5b2e0b7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qError("submit msg messed up when initing stream submit block %p", pSubmit); pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; - ASSERT(0); + return NULL; } } @@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); + if (tqNextBlock(pInfo->tqReader, &ret) < 0) { + qError("failed to get next log block since %s", terrstr()); + return NULL; + } if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); if (setBlockIntoRes(pInfo, &ret.data, true) < 0) { - ASSERT(0); + return NULL; } if (pInfo->pRes->info.rows > 0) { pOperator->status = OP_EXEC_RECV; @@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return pInfo->pRes; } } else if (ret.fetchType == FETCH_TYPE__META) { - ASSERT(0); + qError("unexpected ret.fetchType:%d", ret.fetchType); + return NULL; // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; @@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; #endif } else { - ASSERT(0); + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); return NULL; } } -- GitLab