提交 2f9c63ae 编写于 作者: L Liu Jicong

enh(tq): update tb uid when droping table

上级 82b2ae41
...@@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * ...@@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false); colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id // client id
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0}; char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN); tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId))); varDataSetLen(clientId, strlen(varDataVal(clientId)));
......
...@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep ...@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb); int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb); int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb); int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId, tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
...@@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); ...@@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
...@@ -104,7 +104,7 @@ static void tdSRowDemo() { ...@@ -104,7 +104,7 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema); taosMemoryFree(pTSChema);
} }
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL; void* pIter = NULL;
STqExec* pExec = NULL; STqExec* pExec = NULL;
while (1) { while (1) {
...@@ -113,7 +113,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { ...@@ -113,7 +113,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
pExec = (STqExec*)pIter; pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0); ASSERT(code == 0);
} }
} }
......
...@@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, ...@@ -386,7 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids); tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore); tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
...@@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -517,6 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0}; SDecoder decoder = {0};
SEncoder encoder = {0}; SEncoder encoder = {0};
int ret; int ret;
SArray *tbUids;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
...@@ -533,7 +534,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -533,7 +534,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
} }
// process req // process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp)); rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) { for (int iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0}; SVDropTbRsp dropTbRsp = {0};
...@@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -553,7 +557,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
taosArrayPush(rsp.pArray, &dropTbRsp); taosArrayPush(rsp.pArray, &dropTbRsp);
} }
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
_exit: _exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册