未验证 提交 1753f76d 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #12699 from taosdata/feature/tq

enh(tq): update tb uid when droping table
......@@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) {
tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, wait_time);
if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false);
if (rspObj) {
......
......@@ -848,7 +848,7 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)cgroup, false);
// app id
// client id
char clientId[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
tstrncpy(varDataVal(clientId), pConsumer->clientId, TSDB_CGROUP_LEN);
varDataSetLen(clientId, strlen(varDataVal(clientId)));
......
......@@ -128,6 +128,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols);
......
......@@ -163,6 +163,7 @@ typedef struct {
int8_t withSchema;
int8_t withTag;
char* qmsg;
SHashObj* pDropTbUid;
STqPushHandle pushHandle;
// SRWLatch lock;
SWalReadHandle* pWalReader;
......
......@@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
......@@ -104,7 +104,7 @@ int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeep
int tsdbClose(STsdb** pTsdb);
int tsdbBegin(STsdb* pTsdb);
int tsdbCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, const SSubmitReq *pMsg);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, const SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int tsdbInsertTableData(STsdb* pTsdb, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp);
tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableGroupInfo* groupList, uint64_t qId,
......@@ -118,7 +118,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
......
......@@ -255,7 +255,7 @@ _err:
return -1;
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL;
......@@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid;
taosArrayPush(tbUids, &me.uid);
} else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime;
suid = 0;
......@@ -906,4 +907,4 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
_err:
metaULock(pMeta);
return -1;
}
\ No newline at end of file
}
......@@ -104,16 +104,26 @@ static void tdSRowDemo() {
taosMemoryFree(pTSChema);
}
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) {
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
void* pIter = NULL;
STqExec* pExec = NULL;
while (1) {
pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break;
pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue;
if (pExec->subType == TOPIC_SUB_TYPE__DB) {
if (isAdd) {
continue;
} else {
int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
}
for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true);
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0);
}
}
......@@ -582,7 +592,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) {
while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) {
......@@ -915,9 +925,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = {
.reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta,
......@@ -925,9 +936,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
};
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]);
} else {
pExec->task[i] = NULL;
}
} else {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
}
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0;
......
......@@ -64,22 +64,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
}
if (pHandle->pBlock == NULL) return false;
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
if (pHandle->tbIdHash == NULL) {
return true;
}
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
return true;
/*} else {*/
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/
}
}
return false;
}
bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
ASSERT(pHandle->tbIdHash == NULL);
void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret == NULL) {
return true;
}
}
return false;
......
......@@ -125,6 +125,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
// commit if need
if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
......@@ -386,7 +391,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
tDecoderClear(&decoder);
tqUpdateTbUidList(pVnode->pTq, tbUids);
tqUpdateTbUidList(pVnode->pTq, tbUids, true);
tdUpdateTbUidList(pVnode->pSma, pStore);
tdUidStoreFree(pStore);
......@@ -517,6 +522,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0};
SEncoder encoder = {0};
int ret;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL;
......@@ -533,13 +539,16 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
}
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
/* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq);
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS;
......@@ -553,7 +562,10 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
taosArrayPush(rsp.pArray, &dropTbRsp);
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
_exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册