diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b556733254ae96ae4c7aac2005bd5116d53d6846..94c5b2e0b7ac1542542ee769a831d0f6124ba2a7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qError("submit msg messed up when initing stream submit block %p", pSubmit); pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; - ASSERT(0); + return NULL; } } @@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); + if (tqNextBlock(pInfo->tqReader, &ret) < 0) { + qError("failed to get next log block since %s", terrstr()); + return NULL; + } if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); if (setBlockIntoRes(pInfo, &ret.data, true) < 0) { - ASSERT(0); + return NULL; } if (pInfo->pRes->info.rows > 0) { pOperator->status = OP_EXEC_RECV; @@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return pInfo->pRes; } } else if (ret.fetchType == FETCH_TYPE__META) { - ASSERT(0); + qError("unexpected ret.fetchType:%d", ret.fetchType); + return NULL; // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; @@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; #endif } else { - ASSERT(0); + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); return NULL; } }