diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index fd445ff2aec71654fa5893cd0d847a0a6d73cc3e..4485e8df0c37f14a6e8e87b431d601fcd50305e1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6027,9 +6027,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { taosArrayDestroy(pRsp->blockDataLen); + pRsp->blockDataLen = NULL; taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); + pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); + pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); + pRsp->blockTbName = NULL; } int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 14adeb00808234e4afd454e4a1a6fc48cad4300e..fd798461047b63f790079521159d53b8c928ed48 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -471,8 +471,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH; mndReleaseConsumer(pMnode, pConsumerOld); if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { + tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); goto REB_FAIL; } + tDeleteSMqConsumerObj(pConsumerNew); + taosMemoryFree(pConsumerNew); } // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index eafbd8464daab011d1209bb68453a8882b6ed903..1cc771dbb4d021b008be4cdff58d9213f94b2a51 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -68,6 +68,7 @@ static void destroySTqHandle(void* data) { static void tqPushEntryFree(void* data) { STqPushEntry* p = *(void**)data; + tDeleteSMqDataRsp(&p->dataRsp); taosMemoryFree(p); } @@ -576,8 +577,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } } - taosWUnLockLatch(&pTq->pushLock); #endif + taosWUnLockLatch(&pTq->pushLock); if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) { code = -1; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 48e69f8f4d303dca8071e497c3b48665400bdd9b..45873f27447e95723b7105a8edc52d2104e61a39 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -425,6 +425,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); /*ASSERT(0);*/ + pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; } @@ -435,6 +436,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) { tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", pReader->msgIter.uid, pReader->cachedSchemaVer); /*ASSERT(0);*/ + pReader->cachedSchemaSuid = 0; terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; return -1; }