提交 4cddf201 编写于 作者: A Alex Duan

[TS-238]<feature><tsdb>: compatible with no sma file data version

上级 69767f17
...@@ -25,6 +25,7 @@ typedef struct { ...@@ -25,6 +25,7 @@ typedef struct {
// addition info // addition info
tsem_t* pSem; tsem_t* pSem;
bool memNull; // pRepo->mem is NULL, this is true bool memNull; // pRepo->mem is NULL, this is true
int32_t affectedRows;
SShellSubmitRspMsg *pRsp; SShellSubmitRspMsg *pRsp;
// base ControlData // base ControlData
......
...@@ -123,7 +123,7 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -123,7 +123,7 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) { if ((REPO_FS(pRepo)->cstatus->pmf == NULL) || (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0)) {
pRepo->deleteState = TSDB_NO_DELETE; pRepo->deleteState = TSDB_NO_DELETE;
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
tsdbInfo("vgId:%d delete over, no meta or data file", REPO_ID(pRepo)); tsdbInfo(":SDEL vgId:%d delete over, no meta or data file", REPO_ID(pRepo));
return -1; return -1;
} }
...@@ -134,21 +134,25 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) { ...@@ -134,21 +134,25 @@ static int tsdbDeleteImplCommon(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo) {
tsdbStartDeleteTrans(pRepo); tsdbStartDeleteTrans(pRepo);
if (tsdbDeleteMeta(pRepo) < 0) { if (tsdbDeleteMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to delete META data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to delete META data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;
} }
tsdbError(":SDEL vgId:%d delete meta ok", REPO_ID(pRepo));
if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates, affectedTables) < 0) { if (tsdbDeleteTSData(pRepo, pCtlInfo, aUpdates, affectedTables) < 0) {
tsdbError("vgId:%d failed to delete TS data since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to delete TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err; goto _err;
} }
tsdbInfo("SDEL: vgId:%d Deleted %d row(s) from %d table(s)", REPO_ID(pRepo), pCtlInfo->affectedRows, pCtlInfo->numOfTables);
// end transaction // end transaction
tsdbEndDeleteTrans(pRepo, TSDB_CODE_SUCCESS); tsdbEndDeleteTrans(pRepo, TSDB_CODE_SUCCESS);
// set affected tables number // set affected tables number
if(pCtlInfo->pRsp) { if(pCtlInfo->pRsp) {
pCtlInfo->pRsp->numOfTables = (int32_t)taosArrayGetSize(affectedTables); pCtlInfo->pRsp->numOfTables = (int32_t)taosArrayGetSize(affectedTables);
pCtlInfo->pRsp->affectedRows = pCtlInfo->affectedRows;
} }
// update last row // update last row
...@@ -167,7 +171,7 @@ _err: ...@@ -167,7 +171,7 @@ _err:
static void tsdbStartDeleteTrans(STsdbRepo *pRepo) { static void tsdbStartDeleteTrans(STsdbRepo *pRepo) {
assert(pRepo->deleteState != TSDB_IN_DELETE); assert(pRepo->deleteState != TSDB_IN_DELETE);
tsdbInfo("vgId:%d start to delete!", REPO_ID(pRepo)); tsdbInfo(":SDEL vgId:%d start delete transaction!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0); tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS; pRepo->code = TSDB_CODE_SUCCESS;
pRepo->deleteState = TSDB_IN_DELETE; pRepo->deleteState = TSDB_IN_DELETE;
...@@ -180,7 +184,7 @@ static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno) { ...@@ -180,7 +184,7 @@ static void tsdbEndDeleteTrans(STsdbRepo *pRepo, int eno) {
tsdbEndFSTxn(pRepo); tsdbEndFSTxn(pRepo);
} }
pRepo->deleteState = TSDB_NO_DELETE; pRepo->deleteState = TSDB_NO_DELETE;
tsdbInfo("vgId:%d delete over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); tsdbInfo(":SDEL vgId:%d end delete transaction, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit)); tsem_post(&(pRepo->readyToCommit));
} }
...@@ -189,8 +193,6 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray ...@@ -189,8 +193,6 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
SDeleteH deleteH = {0}; SDeleteH deleteH = {0};
SDFileSet * pSet = NULL; SDFileSet * pSet = NULL;
tsdbDebug("vgId:%d start to delete TS data for %d", REPO_ID(pRepo), pCtlInfo->tids[0]);
if (tsdbInitDeleteH(&deleteH, pRepo) < 0) { if (tsdbInitDeleteH(&deleteH, pRepo) < 0) {
return -1; return -1;
} }
...@@ -207,13 +209,13 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray ...@@ -207,13 +209,13 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
while ((pSet = tsdbFSIterNext(&(deleteH.fsIter)))) { while ((pSet = tsdbFSIterNext(&(deleteH.fsIter)))) {
// remove expired files // remove expired files
if (pSet->fid < deleteH.rtn.minFid) { if (pSet->fid < deleteH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, tsdbInfo(":SDEL vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue; continue;
} }
if ((pSet->fid < sFid) || (pSet->fid > eFid)) { if ((pSet->fid < sFid) || (pSet->fid > eFid)) {
tsdbDebug("vgId:%d no need to delete FSET %d, sFid %d, eFid %d", REPO_ID(pRepo), pSet->fid, sFid, eFid); tsdbDebug(":SDEL vgId:%d no need to delete FSET %d, sFid %d, eFid %d", REPO_ID(pRepo), pSet->fid, sFid, eFid);
if (tsdbApplyRtnOnFSet(pRepo, pSet, &(deleteH.rtn)) < 0) { if (tsdbApplyRtnOnFSet(pRepo, pSet, &(deleteH.rtn)) < 0) {
return -1; return -1;
} }
...@@ -231,7 +233,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray ...@@ -231,7 +233,7 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
if (pCtlInfo->command & CMD_DELETE_DATA) { if (pCtlInfo->command & CMD_DELETE_DATA) {
if (tsdbFSetDelete(&deleteH, pSet) < 0) { if (tsdbFSetDelete(&deleteH, pSet) < 0) {
tsdbDestroyDeleteH(&deleteH); tsdbDestroyDeleteH(&deleteH);
tsdbError("vgId:%d failed to delete data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to delete data in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1; return -1;
} }
} else { } else {
...@@ -241,7 +243,27 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray ...@@ -241,7 +243,27 @@ static int tsdbDeleteTSData(STsdbRepo *pRepo, SControlDataInfo* pCtlInfo, SArray
} }
tsdbDestroyDeleteH(&deleteH); tsdbDestroyDeleteH(&deleteH);
tsdbDebug("vgId:%d delete TS data over", REPO_ID(pRepo)); tsdbDebug(":SDEL vgId:%d delete TS data over", REPO_ID(pRepo));
return 0;
}
static int tsdbSDFileCreate(STsdbRepo* pRepo, SDFileSet *pWSet, SDFileSet *pSet, SDiskID did, TSDB_FILE_T ftype) {
SDFile *pSDFile = TSDB_DFILE_IN_SET(pWSet, ftype);
tsdbInitDFile(pSDFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_VERSION(REPO_FS(pRepo)), ftype);
struct stat st;
if (stat(pSDFile->f.aname, &st) == 0) {
tsdbError(":SDEL vgId:%d file exist no need create fid=%d name=%s", REPO_ID(pRepo), pSet->fid, pSDFile->f.aname);
return 0;
}
if (tsdbCreateDFile(pSDFile, true, ftype) < 0) {
tsdbError(":SDEL vgId:%d failed to delete table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCloseDFile(pSDFile);
tsdbRemoveDFile(pSDFile);
return -1;
}
tsdbCloseDFile(pSDFile);
return 0; return 0;
} }
...@@ -250,11 +272,11 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) { ...@@ -250,11 +272,11 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) {
SDiskID did = {0}; SDiskID did = {0};
SDFileSet *pWSet = TSDB_DELETE_WSET(pdh); SDFileSet *pWSet = TSDB_DELETE_WSET(pdh);
tsdbDebug("vgId:%d start to delete data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, tsdbDebug(":SDEL vgId:%d start to delete data in FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
if (tsdbFSetInit(pdh, pSet) < 0) { if (tsdbFSetInit(pdh, pSet) < 0) {
tsdbError("vgId:%d fset init failed. FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, tsdbError(":SDEL vgId:%d fset init failed. FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
return -1; return -1;
} }
...@@ -263,7 +285,7 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) { ...@@ -263,7 +285,7 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) {
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pdh->rtn)), &(did.level), &(did.id)); tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pdh->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) { if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to delete table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to delete table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbDeleteFSetEnd(pdh); tsdbDeleteFSetEnd(pdh);
return -1; return -1;
} }
...@@ -271,20 +293,37 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) { ...@@ -271,20 +293,37 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) {
// Only .head is created, use original .data/.last/.smad/.smal // Only .head is created, use original .data/.last/.smad/.smal
tsdbInitDFileSetEx(pWSet, pSet); tsdbInitDFileSetEx(pWSet, pSet);
pWSet->state = 0; pWSet->state = 0;
// old pset
if(pWSet->ver == TSDB_FSET_VER_0) {
tsdbDebug(":SDEL vgId:%d pWSet is ver0. fid=%d", REPO_ID(pRepo), pSet->fid);
if(tsdbSDFileCreate(pRepo, pWSet, pSet, did, TSDB_FILE_SMAD) < 0 ) {
tsdbError(":SDEL vgId:%d failed to create sma files. fid=%d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
if(tsdbSDFileCreate(pRepo, pWSet, pSet, did, TSDB_FILE_SMAL) < 0 ) {
tsdbError(":SDEL vgId:%d failed to create sma files. fid=%d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
pWSet->ver = TSDB_FS_VER_1;
}
// create new .head file FS_TXN_VERSION is nstatus.meta.version, so version is +=1
SDFile *pHeadFile = TSDB_DFILE_IN_SET(pWSet, TSDB_FILE_HEAD); SDFile *pHeadFile = TSDB_DFILE_IN_SET(pWSet, TSDB_FILE_HEAD);
tsdbInitDFile(pHeadFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); tsdbInitDFile(pHeadFile, did, REPO_ID(pRepo), TSDB_FSET_FID(pSet), FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD);
if (tsdbCreateDFile(pHeadFile, true, TSDB_FILE_HEAD) < 0) { if (tsdbCreateDFile(pHeadFile, true, TSDB_FILE_HEAD) < 0) {
tsdbError("vgId:%d failed to delete table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to delete table in FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCloseDFile(pHeadFile); tsdbCloseDFile(pHeadFile);
tsdbRemoveDFile(pHeadFile); tsdbRemoveDFile(pHeadFile);
return -1; return -1;
} }
tsdbCloseDFile(pHeadFile); tsdbCloseDFile(pHeadFile);
if (tsdbOpenDFileSet(pWSet, O_RDWR) < 0) { if (tsdbOpenDFileSet(pWSet, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file set %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tstrerror(terrno)); tsdbError(":SDEL vgId:%d failed to open file set %d since %s", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), tstrerror(terrno));
return -1; return -1;
} }
...@@ -297,7 +336,7 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) { ...@@ -297,7 +336,7 @@ static int tsdbFSetDelete(SDeleteH *pdh, SDFileSet *pSet) {
tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh)); tsdbCloseDFileSet(TSDB_DELETE_WSET(pdh));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_DELETE_WSET(pdh)); tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_DELETE_WSET(pdh));
tsdbDebug("vgId:%d FSET %d delete data over", REPO_ID(pRepo), pSet->fid); tsdbDebug(":SDEL vgId:%d FSET %d delete data over", REPO_ID(pRepo), pSet->fid);
tsdbDeleteFSetEnd(pdh); tsdbDeleteFSetEnd(pdh);
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册