提交 31fb2a31 编写于 作者: C Cary Xu

enh: drop child tables support for rsma

上级 91bbca0b
...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* ...@@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq*
int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList);
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
...@@ -208,7 +208,7 @@ int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); ...@@ -208,7 +208,7 @@ int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd);
void tdUidStoreDestory(STbUidStore* pStore); void tdUidStoreDestory(STbUidStore* pStore);
void* tdUidStoreFree(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore);
......
...@@ -474,7 +474,7 @@ _err: ...@@ -474,7 +474,7 @@ _err:
return -1; return -1;
} }
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) {
void *pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int rc = 0; int rc = 0;
...@@ -496,6 +496,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi ...@@ -496,6 +496,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
taosArrayPush(tbUids, &uid); taosArrayPush(tbUids, &uid);
} }
if ((type == TSDB_CHILD_TABLE) && tbUid) {
*tbUid = uid;
}
tdbFree(pData); tdbFree(pData);
return 0; return 0;
} }
......
...@@ -34,7 +34,7 @@ typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; ...@@ -34,7 +34,7 @@ typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter;
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);
static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo,
int8_t idx); int8_t idx);
static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo,
...@@ -175,7 +175,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { ...@@ -175,7 +175,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) { static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) {
SRSmaInfo *pRSmaInfo = NULL; SRSmaInfo *pRSmaInfo = NULL;
if (!suid || !tbUids) { if (!suid || !tbUids) {
...@@ -199,7 +199,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) ...@@ -199,7 +199,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pRSmaInfo->taskInfo[i]) { if (pRSmaInfo->taskInfo[i]) {
if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) { if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) {
tdReleaseRSmaInfo(pSma, pRSmaInfo); tdReleaseRSmaInfo(pSma, pRSmaInfo);
smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i,
terrstr()); terrstr());
...@@ -215,12 +215,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) ...@@ -215,12 +215,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) { int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) {
if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -229,7 +229,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) { ...@@ -229,7 +229,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
SArray *pTbUids = *(SArray **)pIter; SArray *pTbUids = *(SArray **)pIter;
if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) {
taosHashCancelIterate(pStore->uidHash, pIter); taosHashCancelIterate(pStore->uidHash, pIter);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
...@@ -1118,7 +1118,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { ...@@ -1118,7 +1118,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) {
goto _err; goto _err;
} }
if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) { if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) {
smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid,
terrstr()); terrstr());
goto _err; goto _err;
......
...@@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * ...@@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *
SSubmitBlkRsp r = {0}; SSubmitBlkRsp r = {0};
tGetSubmitMsgNext(&msgIter, &pBlock); tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break; if (pBlock == NULL) break;
if (tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r) < 0) { if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) {
return -1; return -1;
} }
......
...@@ -534,7 +534,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR ...@@ -534,7 +534,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR
} }
tqUpdateTbUidList(pVnode->pTq, tbUids, true); tqUpdateTbUidList(pVnode->pTq, tbUids, true);
if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) { if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) {
goto _exit; goto _exit;
} }
tdUidStoreFree(pStore); tdUidStoreFree(pStore);
...@@ -692,6 +692,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -692,6 +692,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
SEncoder encoder = {0}; SEncoder encoder = {0};
int32_t ret; int32_t ret;
SArray *tbUids = NULL; SArray *tbUids = NULL;
STbUidStore *pStore = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL; pRsp->pCont = NULL;
...@@ -715,9 +716,10 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -715,9 +716,10 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0}; SVDropTbRsp dropTbRsp = {0};
tb_uid_t tbUid = 0;
/* code */ /* code */
ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids); ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, VND_IS_RSMA(pVnode) ? &tbUid : NULL);
if (ret < 0) { if (ret < 0) {
if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) {
dropTbRsp.code = TSDB_CODE_SUCCESS; dropTbRsp.code = TSDB_CODE_SUCCESS;
...@@ -726,15 +728,18 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq ...@@ -726,15 +728,18 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq
} }
} else { } else {
dropTbRsp.code = TSDB_CODE_SUCCESS; dropTbRsp.code = TSDB_CODE_SUCCESS;
if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid);
} }
taosArrayPush(rsp.pArray, &dropTbRsp); taosArrayPush(rsp.pArray, &dropTbRsp);
} }
tqUpdateTbUidList(pVnode->pTq, tbUids, false); tqUpdateTbUidList(pVnode->pTq, tbUids, false);
tdUpdateTbUidList(pVnode->pSma, pStore, false);
_exit: _exit:
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
tdUidStoreFree(pStore);
tDecoderClear(&decoder); tDecoderClear(&decoder);
tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret);
pRsp->pCont = rpcMallocCont(pRsp->contLen); pRsp->pCont = rpcMallocCont(pRsp->contLen);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册