提交 2174b5e4 编写于 作者: L Liu Jicong

enh(tq): update tb uid when droping table

上级 2f9c63ae
...@@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { ...@@ -1435,7 +1435,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while (1) { while (1) {
tmqHandleAllDelayedTask(tmq); tmqHandleAllDelayedTask(tmq);
tmqPollImpl(tmq, wait_time); if (tmqPollImpl(tmq, wait_time) < 0) return NULL;
rspObj = tmqHandleAllRsp(tmq, wait_time, false); rspObj = tmqHandleAllRsp(tmq, wait_time, false);
if (rspObj) { if (rspObj) {
......
...@@ -128,6 +128,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -128,6 +128,7 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols); int32_t *pNumOfRows, int16_t *pNumOfCols);
......
...@@ -163,6 +163,7 @@ typedef struct { ...@@ -163,6 +163,7 @@ typedef struct {
int8_t withSchema; int8_t withSchema;
int8_t withTag; int8_t withTag;
char* qmsg; char* qmsg;
SHashObj* pDropTbUid;
STqPushHandle pushHandle; STqPushHandle pushHandle;
// SRWLatch lock; // SRWLatch lock;
SWalReadHandle* pWalReader; SWalReadHandle* pWalReader;
......
...@@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* ...@@ -82,7 +82,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
......
...@@ -255,7 +255,7 @@ _err: ...@@ -255,7 +255,7 @@ _err:
return -1; return -1;
} }
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
TBC *pTbDbc = NULL; TBC *pTbDbc = NULL;
TBC *pUidIdxc = NULL; TBC *pUidIdxc = NULL;
TBC *pNameIdxc = NULL; TBC *pNameIdxc = NULL;
...@@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) { ...@@ -336,6 +336,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq) {
if (type == TSDB_CHILD_TABLE) { if (type == TSDB_CHILD_TABLE) {
ctime = me.ctbEntry.ctime; ctime = me.ctbEntry.ctime;
suid = me.ctbEntry.suid; suid = me.ctbEntry.suid;
taosArrayPush(tbUids, &me.uid);
} else if (type == TSDB_NORMAL_TABLE) { } else if (type == TSDB_NORMAL_TABLE) {
ctime = me.ntbEntry.ctime; ctime = me.ntbEntry.ctime;
suid = 0; suid = 0;
...@@ -906,4 +907,4 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -906,4 +907,4 @@ static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
_err: _err:
metaULock(pMeta); metaULock(pMeta);
return -1; return -1;
} }
\ No newline at end of file
...@@ -111,7 +111,17 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -111,7 +111,17 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
pIter = taosHashIterate(pTq->execs, pIter); pIter = taosHashIterate(pTq->execs, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
pExec = (STqExec*)pIter; pExec = (STqExec*)pIter;
if (pExec->subType == TOPIC_SUB_TYPE__DB) continue; if (pExec->subType == TOPIC_SUB_TYPE__DB) {
if (isAdd) {
continue;
} else {
int32_t sz = taosArrayGetSize(tbUidList);
for (int32_t i = 0; i < sz; i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pExec->pDropTbUid, &tbUid, sizeof(int64_t), NULL, 0);
}
}
}
for (int32_t i = 0; i < 5; i++) { for (int32_t i = 0; i < 5; i++) {
int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd); int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, isAdd);
ASSERT(code == 0); ASSERT(code == 0);
...@@ -582,7 +592,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -582,7 +592,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp.withSchema = 1; rsp.withSchema = 1;
STqReadHandle* pReader = pExec->pExecReader[workerId]; STqReadHandle* pReader = pExec->pExecReader[workerId];
tqReadHandleSetMsg(pReader, pCont, 0); tqReadHandleSetMsg(pReader, pCont, 0);
while (tqNextDataBlock(pReader)) { while (tqNextDataBlockFilterOut(pReader, pExec->pDropTbUid)) {
SSDataBlock block = {0}; SSDataBlock block = {0};
if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows, if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.uid, &block.info.rows,
&block.info.numOfCols) < 0) { &block.info.numOfCols) < 0) {
...@@ -915,9 +925,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -915,9 +925,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
req.qmsg = NULL; req.qmsg = NULL;
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 5; i++) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); for (int32_t i = 0; i < 5; i++) {
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pExec->pExecReader[i], .reader = pExec->pExecReader[i],
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
...@@ -925,9 +936,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -925,9 +936,12 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]); ASSERT(pExec->task[i]);
} else {
pExec->task[i] = NULL;
} }
} else {
for (int32_t i = 0; i < 5; i++) {
pExec->pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
}
pExec->pDropTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }
taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0; return 0;
......
...@@ -64,22 +64,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -64,22 +64,28 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
} }
if (pHandle->pBlock == NULL) return false; if (pHandle->pBlock == NULL) return false;
/*pHandle->pBlock->uid = htobe64(pHandle->pBlock->uid);*/
/*if (pHandle->tbUid == pHandle->pBlock->uid) {*/
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
return true; return true;
} }
void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t)); void* ret = taosHashGet(pHandle->tbIdHash, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret != NULL) { if (ret != NULL) {
/*printf("retrieve one tb %ld\n", pHandle->pBlock->uid);*/
/*pHandle->pBlock->tid = htonl(pHandle->pBlock->tid);*/
/*pHandle->pBlock->sversion = htonl(pHandle->pBlock->sversion);*/
/*pHandle->pBlock->dataLen = htonl(pHandle->pBlock->dataLen);*/
/*pHandle->pBlock->schemaLen = htonl(pHandle->pBlock->schemaLen);*/
/*pHandle->pBlock->numOfRows = htons(pHandle->pBlock->numOfRows);*/
return true; return true;
/*} else {*/ }
/*printf("skip one tb %ld\n", pHandle->pBlock->uid);*/ }
return false;
}
bool tqNextDataBlockFilterOut(STqReadHandle* pHandle, SHashObj* filterOutUids) {
while (1) {
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
return false;
}
if (pHandle->pBlock == NULL) return false;
ASSERT(pHandle->tbIdHash == NULL);
void* ret = taosHashGet(filterOutUids, &pHandle->msgIter.uid, sizeof(int64_t));
if (ret == NULL) {
return true;
} }
} }
return false; return false;
......
...@@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -62,11 +62,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead); len = pMsg->contLen - sizeof(SMsgHead);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
switch (pMsg->msgType) { switch (pMsg->msgType) {
/* META */ /* META */
case TDMT_VND_CREATE_STB: case TDMT_VND_CREATE_STB:
...@@ -125,6 +120,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg ...@@ -125,6 +120,11 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version); vDebug("vgId:%d process %s request success, version: %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
vError("vgId:%d failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (vnodeShouldCommit(pVnode)) {
vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d commit at version %" PRId64, TD_VID(pVnode), version);
...@@ -517,7 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -517,7 +517,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SDecoder decoder = {0}; SDecoder decoder = {0};
SEncoder encoder = {0}; SEncoder encoder = {0};
int ret; int ret;
SArray *tbUids; SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
...@@ -543,7 +543,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in ...@@ -543,7 +543,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
SVDropTbRsp dropTbRsp = {0}; SVDropTbRsp dropTbRsp = {0};
/* code */ /* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq); ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids);
if (ret < 0) { if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS; dropTbRsp.code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册