diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b556733254ae96ae4c7aac2005bd5116d53d6846..94c5b2e0b7ac1542542ee769a831d0f6124ba2a7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qError("submit msg messed up when initing stream submit block %p", pSubmit); pInfo->tqReader->pMsg = NULL; pTaskInfo->streamInfo.pReq = NULL; - ASSERT(0); + return NULL; } } @@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { while (1) { SFetchRet ret = {0}; - tqNextBlock(pInfo->tqReader, &ret); + if (tqNextBlock(pInfo->tqReader, &ret) < 0) { + qError("failed to get next log block since %s", terrstr()); + return NULL; + } if (ret.fetchType == FETCH_TYPE__DATA) { blockDataCleanup(pInfo->pRes); if (setBlockIntoRes(pInfo, &ret.data, true) < 0) { - ASSERT(0); + return NULL; } if (pInfo->pRes->info.rows > 0) { pOperator->status = OP_EXEC_RECV; @@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return pInfo->pRes; } } else if (ret.fetchType == FETCH_TYPE__META) { - ASSERT(0); + qError("unexpected ret.fetchType:%d", ret.fetchType); + return NULL; // pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.metaBlk = ret.meta; // return NULL; @@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { return NULL; #endif } else { - ASSERT(0); + qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type); return NULL; } } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 0f8f140bd8a2a24e82f997b0029b8b946b1f2f1b..5e09af5b2e9e8e5c55eb8e3f4414ecd48582f6b9 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -241,7 +241,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (walReadSeekVer(pRead, fetchVer) < 0) { - ASSERT(0); pRead->curVersion = fetchVer; pRead->curInvalid = 1; return -1; @@ -262,7 +261,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - ASSERT(0); pRead->curInvalid = 1; return -1; } @@ -299,7 +297,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } pRead->curInvalid = 1; - ASSERT(0); return -1; } @@ -308,7 +305,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { pRead->pHead->head.version, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); return -1; } @@ -316,7 +312,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1; terrno = TSDB_CODE_WAL_FILE_CORRUPTED; - ASSERT(0); return -1; } @@ -335,7 +330,6 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { if (code < 0) { terrno = TAOS_SYSTEM_ERROR(errno); pRead->curInvalid = 1; - ASSERT(0); return -1; } @@ -384,7 +378,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { } else { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } - ASSERT(0); pRead->curInvalid = 1; return -1; } @@ -447,7 +440,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen < 0) { - ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); @@ -457,12 +449,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { terrno = TSDB_CODE_WAL_FILE_CORRUPTED; } pRead->curInvalid = 1; - ASSERT(0); return -1; } if (pReadHead->version != ver) { - ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, pReadHead->version, ver); pRead->curInvalid = 1; @@ -471,7 +461,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { } if (walValidBodyCksum(*ppHead) != 0) { - ASSERT(0); wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); pRead->curInvalid = 1;