diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b16b5a2d4bdf441b2d9a176ab04a6ccaee2fa596..99c5c72e2fa53048bf37700f2b6e7b940cadf010 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->version); - pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); - if (pSW->pSchema == NULL) { - return NULL; - } + if (pSW->nCols > 0) { + pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); + if (pSW->pSchema == NULL) { + return NULL; + } - for (int32_t i = 0; i < pSW->nCols; i++) { - buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + for (int32_t i = 0; i < pSW->nCols; i++) { + buf = taosDecodeSSchema(buf, &pSW->pSchema[i]); + } + } else { + pSW->pSchema = NULL; } return (void*)buf; } @@ -839,7 +843,7 @@ typedef struct { int64_t dbId; int32_t vgVersion; int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT - int64_t stateTs; // ms + int64_t stateTs; // ms } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); @@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE } static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) { - if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + taosMemoryFreeClear(pSubTopicEp->schema.pSchema); + pSubTopicEp->schema.nCols = 0; taosArrayDestroy(pSubTopicEp->vgs); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 55837c4a6aa32fe45fa1a3a7ee5105cc3f73cc28..d2e59ea8e114c4b6c313a27598c9dbde5a3c2bda 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer); qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1); if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) { + tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer); return NULL; } ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);