未验证 提交 c3cc2599 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #18157 from taosdata/feature/stream

fix: reset cache when table is dropped
......@@ -6027,9 +6027,13 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
pRsp->blockSchema = NULL;
taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree);
pRsp->blockTbName = NULL;
}
int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
......
......@@ -471,8 +471,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
pConsumerNew->updateType = CONSUMER_UPDATE__TOUCH;
mndReleaseConsumer(pMnode, pConsumerOld);
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) {
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
goto REB_FAIL;
}
tDeleteSMqConsumerObj(pConsumerNew);
taosMemoryFree(pConsumerNew);
}
// 3.2 set new consumer
consumerNum = taosArrayGetSize(pOutput->newConsumers);
......
......@@ -68,6 +68,7 @@ static void destroySTqHandle(void* data) {
static void tqPushEntryFree(void* data) {
STqPushEntry* p = *(void**)data;
tDeleteSMqDataRsp(&p->dataRsp);
taosMemoryFree(p);
}
......@@ -576,8 +577,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
}
taosWUnLockLatch(&pTq->pushLock);
#endif
taosWUnLockLatch(&pTq->pushLock);
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
......
......@@ -425,6 +425,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
......@@ -435,6 +436,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
pReader->msgIter.uid, pReader->cachedSchemaVer);
/*ASSERT(0);*/
pReader->cachedSchemaSuid = 0;
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
return -1;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册