提交 471d853f 编写于 作者: A Alex Duan

[TS-238-D]<fix>(tsdb): single table del function ok

上级 4755a907
...@@ -245,6 +245,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -245,6 +245,7 @@ static int tsdbTruncateTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) { static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru); STsdbRepo *pRepo = TSDB_TRUNCATE_REPO(ptru);
SDiskID did = {0}; SDiskID did = {0};
SDFileSet *pWSet = TSDB_TRUNCATE_WSET(ptru);
tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, tsdbDebug("vgId:%d start to truncate data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
...@@ -253,21 +254,32 @@ static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) { ...@@ -253,21 +254,32 @@ static int tsdbFSetDelete(STruncateH *ptru, SDFileSet *pSet) {
return -1; return -1;
} }
// Create new fset as deleted fset // Create new fset as truncated fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id)); tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ptru->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) { if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(ptru); tsdbTruncateFSetEnd(ptru);
return -1; return -1;
} }
tsdbInitDFileSet(TSDB_TRUNCATE_WSET(ptru), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), // Only .head is created, use original .data/.last/.smad/.smal
FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_LATEST_FSET_VER); tsdbInitDFileSetEx(pWSet, pSet);
pWSet->state = 0;
SDFile *pHeadFile = TSDB_DFILE_IN_SET(pWSet, TSDB_FILE_HEAD);
tsdbInitDFile(pHeadFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFileSet(TSDB_TRUNCATE_WSET(ptru), true) < 0) { if (tsdbCreateDFile(pHeadFile, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to truncate data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError("vgId:%d failed to truncate table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbTruncateFSetEnd(ptru); tsdbCloseDFile(pHeadFile);
tsdbRemoveDFile(pHeadFile);
return -1;
}
tsdbCloseDFile(pHeadFile);
if (tsdbOpenDFileSet(pWSet, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file set %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tstrerror(terrno));
return -1; return -1;
} }
...@@ -628,7 +640,7 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) { ...@@ -628,7 +640,7 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) {
} }
if(from != -1) { if(from != -1) {
int delCnt = from; int delCnt = from + 1;
memmove(pItem->pInfo->blocks, pItem->pInfo->blocks + delCnt, (numOfBlocks - delCnt) * sizeof(SBlock)); memmove(pItem->pInfo->blocks, pItem->pInfo->blocks + delCnt, (numOfBlocks - delCnt) * sizeof(SBlock));
delAll += delCnt; delAll += delCnt;
numOfBlocks -= delCnt; numOfBlocks -= delCnt;
...@@ -641,14 +653,19 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) { ...@@ -641,14 +653,19 @@ int tsdbRemoveDelBlocks(STruncateH *ptru, STableTruncateH * pItem) {
} }
static void tsdbAddBlock(STruncateH *ptru, STableTruncateH *pItem, SBlock *pBlock) { static void tsdbAddBlock(STruncateH *ptru, STableTruncateH *pItem, SBlock *pBlock) {
taosArrayPush(ptru->aSupBlk, (const void *)pBlock); // append sub if have
// have sub block
if (pBlock->numOfSubBlocks > 1) { if (pBlock->numOfSubBlocks > 1) {
int64_t offset = taosArrayGetSize(ptru->aSubBlk) * sizeof(SBlock);
SBlock *jBlock = POINTER_SHIFT(pItem->pInfo, pBlock->offset);; SBlock *jBlock = POINTER_SHIFT(pItem->pInfo, pBlock->offset);;
for (int j = 0; j < pBlock->numOfSubBlocks; j++) { for (int j = 0; j < pBlock->numOfSubBlocks; j++) {
taosArrayPush(ptru->aSubBlk, (const void *)jBlock++); taosArrayPush(ptru->aSubBlk, (const void *)jBlock++);
} }
// set new offset if have sub
pBlock->offset = offset;
} }
// append super
taosArrayPush(ptru->aSupBlk, (const void *)pBlock);
} }
// need modify blocks // need modify blocks
...@@ -690,7 +707,7 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) { ...@@ -690,7 +707,7 @@ static int tsdbModifyBlocks(STruncateH *ptru, STableTruncateH *pItem) {
if (solve == BLOCK_READ) { if (solve == BLOCK_READ) {
tsdbAddBlock(ptru, pItem, pBlock); tsdbAddBlock(ptru, pItem, pBlock);
continue; continue;
} }
// border block need load to delete no-use data // border block need load to delete no-use data
if (tsdbLoadBlockData(pReadh, pBlock, pItem->pInfo) < 0) { if (tsdbLoadBlockData(pReadh, pBlock, pItem->pInfo) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册