提交 159f1c7b 编写于 作者: L Liu Jicong

fix(tmq): drop stb after subscribing database

上级 b6f55fa0
......@@ -89,7 +89,7 @@ int metaBegin(SMeta* pMeta);
int metaCommit(SMeta* pMeta);
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
......
......@@ -212,7 +212,7 @@ _err:
return -1;
}
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) {
void *pKey = NULL;
int nKey = 0;
void *pData = NULL;
......@@ -229,7 +229,6 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
// drop all child tables
TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
......@@ -249,20 +248,18 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
break;
}
taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid));
taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid));
}
tdbTbcClose(pCtbIdxc);
metaWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild);
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild);
metaDropTableByUid(pMeta, uid, NULL);
}
taosArrayDestroy(pArray);
// drop super table
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
......
......@@ -49,8 +49,8 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
// TODO add reference to gurantee success
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
return -1;
}
char* tbName = strdup(mr.me.name);
......@@ -87,16 +87,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
tqDebug("task execute end, get %p", pDataBlock);
if (pDataBlock != NULL) {
tqAddBlockDataToRsp(pDataBlock, pRsp);
pRsp->blockNum++;
if (pRsp->withTbName) {
if (pOffset->type == TMQ_OFFSET__LOG) {
int64_t uid = pExec->pExecReader[0]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
} else {
pRsp->withTbName = 0;
}
}
tqAddBlockDataToRsp(pDataBlock, pRsp);
pRsp->blockNum++;
if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
......@@ -193,13 +195,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp);
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
......@@ -211,13 +214,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block, pReader) < 0) {
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
ASSERT(0);
}
tqAddBlockDataToRsp(&block, pRsp);
if (pRsp->withTbName) {
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
tqAddTbNameToRsp(pTq, uid, pRsp);
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
continue;
}
}
tqAddBlockDataToRsp(&block, pRsp);
tqAddBlockSchemaToRsp(pExec, workerId, pRsp);
pRsp->blockNum++;
}
......
......@@ -557,6 +557,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
SVDropStbReq req = {0};
int32_t rcode = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
SArray *tbUidList = NULL;
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
pRsp->pCont = NULL;
......@@ -570,7 +571,14 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
}
// process request
if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
tbUidList = taosArrayInit(8, sizeof(int64_t));
if (tbUidList == NULL) goto _exit;
if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) {
rcode = terrno;
goto _exit;
}
if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) {
rcode = terrno;
goto _exit;
}
......@@ -582,6 +590,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
// return rsp
_exit:
if (tbUidList) taosArrayDestroy(tbUidList);
pRsp->code = rcode;
tDecoderClear(&decoder);
return 0;
......
......@@ -88,7 +88,7 @@ class TDTestCase:
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tdLog.info("After waiting for a period of time, drop one stable")
time.sleep(10)
time.sleep(3)
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
tdLog.info("wait result from consumer, then check it")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册