提交 74f27466 编写于 作者: H Hongze Cheng

drop super table

上级 75c47328
...@@ -71,64 +71,71 @@ _err: ...@@ -71,64 +71,71 @@ _err:
} }
int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
TBC *pNameIdxc = NULL; void *pKey = NULL;
TBC *pUidIdxc = NULL; int nKey = 0;
TBC *pCtbIdxc = NULL; void *pData = NULL;
SCtbIdxKey *pCtbIdxKey; int nData = 0;
const void *pKey = NULL; int c = 0;
int nKey; int rc = 0;
const void *pData = NULL;
int nData; // check if super table exists
int c, ret; rc = tdbTbGet(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pData, &nData);
if (rc < 0 || *(tb_uid_t *)pData != pReq->suid) {
// prepare uid idx cursor terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); return -1;
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c != 0) {
terrno = TSDB_CODE_VND_TB_NOT_EXIST;
tdbTbcClose(pUidIdxc);
goto _err;
} }
// prepare name idx cursor // drop all child tables
tdbTbcOpen(pMeta->pNameIdx, &pNameIdxc, &pMeta->txn); TBC *pCtbIdxc = NULL;
ret = tdbTbcMoveTo(pNameIdxc, pReq->name, strlen(pReq->name) + 1, &c); SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
if (ret < 0 || c != 0) {
ASSERT(0);
}
tdbTbcDelete(pUidIdxc);
tdbTbcDelete(pNameIdxc);
tdbTbcClose(pUidIdxc);
tdbTbcClose(pNameIdxc);
// loop to drop each child table
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
if (ret < 0 || (c < 0 && tdbTbcMoveToNext(pCtbIdxc) < 0)) { if (rc < 0) {
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
goto _exit; metaWLock(pMeta);
goto _drop_super_table;
} }
for (;;) { for (;;) {
tdbTbcGet(pCtbIdxc, &pKey, &nKey, NULL, NULL); rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL);
pCtbIdxKey = (SCtbIdxKey *)pKey; if (rc < 0) break;
if (((SCtbIdxKey *)pKey)->suid < pReq->suid) {
continue;
} else if (((SCtbIdxKey *)pKey)->suid > pReq->suid) {
break;
}
if (pCtbIdxKey->suid > pReq->suid) break; taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid));
}
// drop the child table (TODO) tdbTbcClose(pCtbIdxc);
if (tdbTbcMoveToNext(pCtbIdxc) < 0) break; metaWLock(pMeta);
for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) {
tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild);
metaDropTableByUid(pMeta, uid);
} }
taosArrayDestroy(pArray);
// drop super table
_drop_super_table:
tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData);
tdbTbDelete(pMeta->pTbDb, &(STbDbKey){.version = *(int64_t *)pData, .uid = pReq->suid}, sizeof(STbDbKey),
&pMeta->txn);
tdbTbDelete(pMeta->pNameIdx, pReq->name, strlen(pReq->name) + 1, &pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pMeta->txn);
metaULock(pMeta);
_exit: _exit:
tdbFree(pKey);
tdbFree(pData);
metaDebug("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid); metaDebug("vgId:%d super table %s uid:%" PRId64 " is dropped", TD_VID(pMeta->pVnode), pReq->name, pReq->suid);
return 0; return 0;
_err:
metaError("vgId:%d failed to drop super table %s uid:%" PRId64 " since %s", TD_VID(pMeta->pVnode), pReq->name,
pReq->suid, tstrerror(terrno));
return -1;
} }
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
...@@ -608,14 +615,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -608,14 +615,14 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
// TODO : need to update tag index // TODO : need to update tag index
} }
ctbEntry.version = version; ctbEntry.version = version;
if(pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON){ if (pTagSchema->nCols == 1 && pTagSchema->pSchema[0].type == TSDB_DATA_TYPE_JSON) {
ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal); ctbEntry.ctbEntry.pTags = taosMemoryMalloc(pAlterTbReq->nTagVal);
if(ctbEntry.ctbEntry.pTags == NULL){ if (ctbEntry.ctbEntry.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err; goto _err;
} }
memcpy((void*)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal); memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}else{ } else {
SKVRowBuilder kvrb = {0}; SKVRowBuilder kvrb = {0};
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags; const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
SKVRow pNewTag = NULL; SKVRow pNewTag = NULL;
...@@ -649,7 +656,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -649,7 +656,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc1); tDecoderClear(&dc1);
tDecoderClear(&dc2); tDecoderClear(&dc2);
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void*)ctbEntry.ctbEntry.pTags); if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf); if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf); if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册