未验证 提交 50f394de 编写于 作者: X Xiaoyu Wang 提交者: GitHub

Merge pull request #20012 from taosdata/FIX/TS-2656-main

enh: refactor some asserts in walRead.c and doQueueScan.
...@@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1573,7 +1573,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qError("submit msg messed up when initing stream submit block %p", pSubmit); qError("submit msg messed up when initing stream submit block %p", pSubmit);
pInfo->tqReader->pMsg = NULL; pInfo->tqReader->pMsg = NULL;
pTaskInfo->streamInfo.pReq = NULL; pTaskInfo->streamInfo.pReq = NULL;
ASSERT(0); return NULL;
} }
} }
...@@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1628,11 +1628,14 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) { if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) { while (1) {
SFetchRet ret = {0}; SFetchRet ret = {0};
tqNextBlock(pInfo->tqReader, &ret); if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
qError("failed to get next log block since %s", terrstr());
return NULL;
}
if (ret.fetchType == FETCH_TYPE__DATA) { if (ret.fetchType == FETCH_TYPE__DATA) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
if (setBlockIntoRes(pInfo, &ret.data, true) < 0) { if (setBlockIntoRes(pInfo, &ret.data, true) < 0) {
ASSERT(0); return NULL;
} }
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV; pOperator->status = OP_EXEC_RECV;
...@@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1640,7 +1643,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return pInfo->pRes; return pInfo->pRes;
} }
} else if (ret.fetchType == FETCH_TYPE__META) { } else if (ret.fetchType == FETCH_TYPE__META) {
ASSERT(0); qError("unexpected ret.fetchType:%d", ret.fetchType);
return NULL;
// pTaskInfo->streamInfo.lastStatus = ret.offset; // pTaskInfo->streamInfo.lastStatus = ret.offset;
// pTaskInfo->streamInfo.metaBlk = ret.meta; // pTaskInfo->streamInfo.metaBlk = ret.meta;
// return NULL; // return NULL;
...@@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { ...@@ -1667,7 +1671,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
return NULL; return NULL;
#endif #endif
} else { } else {
ASSERT(0); qError("unexpected streamInfo prepare type: %d", pTaskInfo->streamInfo.prepareStatus.type);
return NULL; return NULL;
} }
} }
......
...@@ -241,7 +241,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -241,7 +241,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
if (pRead->curInvalid || pRead->curVersion != fetchVer) { if (pRead->curInvalid || pRead->curVersion != fetchVer) {
if (walReadSeekVer(pRead, fetchVer) < 0) { if (walReadSeekVer(pRead, fetchVer) < 0) {
ASSERT(0);
pRead->curVersion = fetchVer; pRead->curVersion = fetchVer;
pRead->curInvalid = 1; pRead->curInvalid = 1;
return -1; return -1;
...@@ -262,7 +261,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { ...@@ -262,7 +261,6 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0);
pRead->curInvalid = 1; pRead->curInvalid = 1;
return -1; return -1;
} }
...@@ -299,7 +297,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -299,7 +297,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; pRead->curInvalid = 1;
ASSERT(0);
return -1; return -1;
} }
...@@ -308,7 +305,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -308,7 +305,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
pRead->pHead->head.version, ver); pRead->pHead->head.version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1; return -1;
} }
...@@ -316,7 +312,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) { ...@@ -316,7 +312,6 @@ static int32_t walFetchBodyNew(SWalReader *pRead) {
wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver); wError("vgId:%d, wal fetch body error:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
ASSERT(0);
return -1; return -1;
} }
...@@ -335,7 +330,6 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) { ...@@ -335,7 +330,6 @@ static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
if (code < 0) { if (code < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
pRead->curInvalid = 1; pRead->curInvalid = 1;
ASSERT(0);
return -1; return -1;
} }
...@@ -384,7 +378,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) { ...@@ -384,7 +378,6 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
} else { } else {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
ASSERT(0);
pRead->curInvalid = 1; pRead->curInvalid = 1;
return -1; return -1;
} }
...@@ -447,7 +440,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -447,7 +440,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) { if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
if (pReadHead->bodyLen < 0) { if (pReadHead->bodyLen < 0) {
ASSERT(0);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s", wError("vgId:%d, wal fetch body error:%" PRId64 ", read request index:%" PRId64 ", since %s",
pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno)); pRead->pWal->cfg.vgId, pReadHead->version, ver, tstrerror(terrno));
...@@ -457,12 +449,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -457,12 +449,10 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
terrno = TSDB_CODE_WAL_FILE_CORRUPTED; terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
} }
pRead->curInvalid = 1; pRead->curInvalid = 1;
ASSERT(0);
return -1; return -1;
} }
if (pReadHead->version != ver) { if (pReadHead->version != ver) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", read request index:%" PRId64, pRead->pWal->cfg.vgId,
pReadHead->version, ver); pReadHead->version, ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
...@@ -471,7 +461,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) { ...@@ -471,7 +461,6 @@ int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
} }
if (walValidBodyCksum(*ppHead) != 0) { if (walValidBodyCksum(*ppHead) != 0) {
ASSERT(0);
wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, wError("vgId:%d, wal fetch body error, index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId,
ver); ver);
pRead->curInvalid = 1; pRead->curInvalid = 1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册