diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ba028232e8d2731fde01c49979c4c4820cf14d7e..9a2867d78cfe3313094a5a3d96a18ec3aef1a813 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -257,8 +257,6 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS); int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS); void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS); -int32_t tsdbFSRollback(STsdbFS *pFS); - int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet); int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile); // tsdbReaderWriter.c ============================================================================================== diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 57609cd02521c13a0b5a7f54b008a3455bd53831..514a150aa1cf7ce785fbf300f7c9bcd2715fb425 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -16,7 +16,7 @@ #include "tsdb.h" // ================================================================================================= -static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { +static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) { int32_t n = 0; int8_t hasDel = pFS->pDelFile ? 1 : 0; uint32_t nSet = taosArrayGetSize(pFS->aDFileSet); @@ -39,50 +39,96 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) { return n; } -static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) { - int32_t code = 0; - int64_t n; - int64_t size; - uint8_t *pData = NULL; - TdFilePtr pFD = NULL; +static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, STsdbFS *pFS) { + int32_t code = 0; + int32_t n = 0; - // to binary - size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM); - pData = taosMemoryMalloc(size); + // version + n += tGetI8(pData + n, NULL); + + // SDelFile + int8_t hasDel = 0; + n += tGetI8(pData + n, &hasDel); + if (hasDel) { + pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile)); + if (pFS->pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += tGetDelFile(pData + n, pFS->pDelFile); + pFS->pDelFile->nRef = 1; + } else { + pFS->pDelFile = NULL; + } + + // aDFileSet + taosArrayClear(pFS->aDFileSet); + uint32_t nSet = 0; + n += tGetU32v(pData + n, &nSet); + for (uint32_t iSet = 0; iSet < nSet; iSet++) { + SDFileSet fSet = {0}; + + int32_t nt = tGetDFileSet(pData + n, &fSet); + if (nt < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + n += nt; + if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + ASSERT(n + sizeof(TSCKSUM) == nData); + +_exit: + return code; +} + +static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) { + int32_t code = 0; + int32_t lino = 0; + + // encode to binary + int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM); + uint8_t *pData = taosMemoryMalloc(size); if (pData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = tsdbEncodeFS(pData, pFS); - ASSERT(n + sizeof(TSCKSUM) == size); + tsdbFSToBinary(pData, pFS); taosCalcChecksumAppend(0, pData, size); - // create and write - pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); + // save to file + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pFD == NULL) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } - n = taosWriteFile(pFD, pData, size); + int64_t n = taosWriteFile(pFD, pData, size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } if (taosFsyncFile(pFD) < 0) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); } taosCloseFile(&pFD); +_exit: if (pData) taosMemoryFree(pData); - return code; - -_err: - tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); - if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } return code; } @@ -120,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) { static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { int32_t code = 0; - int64_t size; - char fname[TSDB_FILENAME_LEN]; + int32_t lino = 0; + int64_t size = 0; + char fname[TSDB_FILENAME_LEN] = {0}; // SDelFile if (pTsdb->fs.pDelFile) { tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } // SArray + int32_t fid = 0; for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) { SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet); + fid = pSet->fid; // head ========= tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } // data ========= tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // sma ============= tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; - } else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { - code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); - if (code) goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } + // else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) { + // code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE); + // TSDB_CHECK_CODE(code, lino, _exit); + // } // stt =========== for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) { code = TSDB_CODE_FILE_CORRUPTED; - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } } } @@ -198,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { // remove those invalid files (todo) } - return code; - -_err: - tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code), + fid); + } return code; } @@ -215,57 +267,6 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) { return 0; } -static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) { - int32_t code = 0; - int8_t hasDel; - uint32_t nSet; - int32_t n = 0; - - // version - n += tGetI8(pData + n, NULL); - - // SDelFile - n += tGetI8(pData + n, &hasDel); - if (hasDel) { - pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); - if (pTsdb->fs.pDelFile == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - pTsdb->fs.pDelFile->nRef = 1; - n += tGetDelFile(pData + n, pTsdb->fs.pDelFile); - } else { - pTsdb->fs.pDelFile = NULL; - } - - // SArray - taosArrayClear(pTsdb->fs.aDFileSet); - n += tGetU32v(pData + n, &nSet); - for (uint32_t iSet = 0; iSet < nSet; iSet++) { - SDFileSet fSet = {0}; - - int32_t nt = tGetDFileSet(pData + n, &fSet); - if (nt < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - n += nt; - - if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - - ASSERT(n + sizeof(TSCKSUM) == nData); - return code; - -_err: - return code; -} - static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { SVnode *pVnode = pTsdb->pVnode; if (pVnode->pTfs) { @@ -287,72 +288,502 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) { } } -// EXPOSED APIS ==================================================================================== -int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { +static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + uint8_t *pData = NULL; + + // load binary + TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ); + if (pFD == NULL) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + int64_t size; + if (taosFStatFile(pFD, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + pData = taosMemoryMalloc(size); + if (pData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (taosReadFile(pFD, pData, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (!taosCheckChecksumWhole(pData, size)) { + code = TSDB_CODE_FILE_CORRUPTED; + taosCloseFile(&pFD); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosCloseFile(&pFD); + + // decode binary + code = tsdbBinaryToFS(pData, size, pFS); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (pData) taosMemoryFree(pData); + if (code) { + tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname); + } + return code; +} + +static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN] = {0}; + + int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSet->pHeadF); + } + + nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pDataF); + } + + nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->pSmaF); + } + + for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); + taosRemoveFile(fname); + taosMemoryFree(pSet->aSttF[iStt]); + } + } + +_exit: + return code; +} + +static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFrom) { int32_t code = 0; int32_t lino = 0; - SVnode *pVnode = pTsdb->pVnode; - // open handle - code = tsdbFSCreate(&pTsdb->fs); - TSDB_CHECK_CODE(code, lino, _exit); + *pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0}; - // load fs or keep empty - char current[TSDB_FILENAME_LEN] = {0}; - char current_t[TSDB_FILENAME_LEN] = {0}; - tsdbGetCurrentFName(pTsdb, current, current_t); + // head + pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetTo->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pHeadF = *pSetFrom->pHeadF; + pSetTo->pHeadF->nRef = 1; - if (taosCheckExistFile(current)) { - // read - TdFilePtr pFD = taosOpenFile(current, TD_FILE_READ); - if (pFD == NULL) { - code = TAOS_SYSTEM_ERROR(errno); + // data + pSetTo->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetTo->pDataF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pDataF = *pSetFrom->pDataF; + pSetTo->pDataF->nRef = 1; + + // sma + pSetTo->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetTo->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetTo->pSmaF = *pSetFrom->pSmaF; + pSetTo->pSmaF->nRef = 1; + + // stt + for (int32_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) { + pSetTo->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetTo->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } - int64_t size; - if (taosFStatFile(pFD, &size, NULL) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosCloseFile(&pFD); + pSetTo->nSttF++; + *pSetTo->aSttF[iStt] = *pSetFrom->aSttF[iStt]; + pSetTo->aSttF[iStt]->nRef = 1; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSetNew) { + int32_t code = 0; + int32_t lino = 0; + int32_t nRef = 0; + bool sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id)); + char fname[TSDB_FILENAME_LEN] = {0}; + + // head + SHeadFile *pHeadF = pSetOld->pHeadF; + if ((!sameDisk) || (pHeadF->commitID != pSetNew->pHeadF->commitID)) { + pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); + if (pSetOld->pHeadF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } + *pSetOld->pHeadF = *pSetNew->pHeadF; + pSetOld->pHeadF->nRef = 1; - uint8_t *pData = taosMemoryMalloc(size); - if (pData == NULL) { + nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1); + if (nRef == 0) { + tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pHeadF); + } + } else { + nRef = pHeadF->nRef; + *pHeadF = *pSetNew->pHeadF; + pHeadF->nRef = nRef; + } + + // data + SDataFile *pDataF = pSetOld->pDataF; + if ((!sameDisk) || (pDataF->commitID != pSetNew->pDataF->commitID)) { + pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); + if (pSetOld->pDataF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - taosCloseFile(&pFD); TSDB_CHECK_CODE(code, lino, _exit); } + *pSetOld->pDataF = *pSetNew->pDataF; + pSetOld->pDataF->nRef = 1; - int64_t n = taosReadFile(pFD, pData, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - taosMemoryFree(pData); - taosCloseFile(&pFD); - TSDB_CHECK_CODE(code, lino, _exit); + nRef = atomic_sub_fetch_32(&pDataF->nRef, 1); + if (nRef == 0) { + tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDataF); } + } else { + nRef = pDataF->nRef; + *pDataF = *pSetNew->pDataF; + pDataF->nRef = nRef; + } - if (!taosCheckChecksumWhole(pData, size)) { - code = TSDB_CODE_FILE_CORRUPTED; - taosMemoryFree(pData); - taosCloseFile(&pFD); + // sma + SSmaFile *pSmaF = pSetOld->pSmaF; + if ((!sameDisk) || (pSmaF->commitID != pSetNew->pSmaF->commitID)) { + pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); + if (pSetOld->pSmaF == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } + *pSetOld->pSmaF = *pSetNew->pSmaF; + pSetOld->pSmaF->nRef = 1; - taosCloseFile(&pFD); + nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1); + if (nRef == 0) { + tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSmaF); + } + } else { + nRef = pSmaF->nRef; + *pSmaF = *pSetNew->pSmaF; + pSmaF->nRef = nRef; + } + + // stt + if (sameDisk) { + if (pSetNew->nSttF > pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1); + pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; + pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; + pSetOld->nSttF++; + } else if (pSetNew->nSttF < pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + pSetOld->aSttF[iStt] = NULL; + } + + pSetOld->nSttF = 1; + pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[0] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; + pSetOld->aSttF[0]->nRef = 1; + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + } else { + ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); + } + } + } + } else { + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); + if (nRef == 0) { + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pSttFile); + } + } + + pSetOld->nSttF = 0; + for (int32_t iStt = 0; iStt < pSetNew->nSttF; iStt++) { + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; + + pSetOld->nSttF++; + } + } + + if (!sameDisk) { + pSetOld->diskId = pSetNew->diskId; + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) { + int32_t code = 0; + int32_t lino = 0; + + int32_t nRef = 0; + char fname[TSDB_FILENAME_LEN] = {0}; + + // SDelFile + if (pFS->pDelFile) { + SDelFile *pDelFile = pTsdb->fs.pDelFile; - // recover fs - code = tsdbRecoverFS(pTsdb, pData, size); - if (code) { - taosMemoryFree(pData); + if (pDelFile == NULL || (pDelFile->commitID != pFS->pDelFile->commitID)) { + pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); + if (pTsdb->fs.pDelFile == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + *pTsdb->fs.pDelFile = *pFS->pDelFile; + pTsdb->fs.pDelFile->nRef = 1; + + if (pDelFile) { + nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1); + if (nRef == 0) { + tsdbDelFileName(pTsdb, pDelFile, fname); + (void)taosRemoveFile(fname); + taosMemoryFree(pDelFile); + } + } + } + } else { + ASSERT(pTsdb->fs.pDelFile == NULL); + } + + // aDFileSet + int32_t iOld = 0; + int32_t iNew = 0; + while (true) { + int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet); + int32_t nNew = taosArrayGetSize(pFS->aDFileSet); + SDFileSet fSet = {0}; + int8_t sameDisk = 0; + + if (iOld >= nOld && iNew >= nNew) break; + + SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL; + SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL; + + if (pSetOld && pSetNew) { + if (pSetOld->fid == pSetNew->fid) { + code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit); + + iOld++; + iNew++; + } else if (pSetOld->fid < pSetNew->fid) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); + TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; + } + } else if (pSetOld) { + code = tsdbRemoveFileSet(pTsdb, pSetOld); TSDB_CHECK_CODE(code, lino, _exit); + taosArrayRemove(pTsdb->fs.aDFileSet, iOld); + } else { + code = tsdbNewFileSet(pTsdb, &fSet, pSetNew); + TSDB_CHECK_CODE(code, lino, _exit) + + if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } + + iOld++; + iNew++; } + } + +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } + return code; +} + +static int32_t tsdbFSCommit(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); + + // rename the file + if (taosRenameFile(current_t, current) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // Load the new FS + STsdbFS fs = {0}; + code = tsdbFSCreate(&fs); + TSDB_CHECK_CODE(code, lino, _exit); + + code = tsdbLoadFSFromFile(current_t, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // apply file change + code = tsdbFSApplyChange(pTsdb, &fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + tsdbFSDestroy(&fs); + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno)); + } + return code; +} + +static int32_t tsdbFSRollback(STsdb *pTsdb) { + int32_t code = 0; + int32_t lino = 0; + + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, NULL, current_t); + + if (taosRemoveFile(current_t) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + TSDB_CHECK_CODE(code, lino, _exit); + } - taosMemoryFree(pData); +_exit: + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno)); + } + return code; +} + +// EXPOSED APIS ==================================================================================== +int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; + SVnode *pVnode = pTsdb->pVnode; + + // open handle + code = tsdbFSCreate(&pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + // open impl + char current[TSDB_FILENAME_LEN] = {0}; + char current_t[TSDB_FILENAME_LEN] = {0}; + tsdbGetCurrentFName(pTsdb, current, current_t); + + if (taosCheckExistFile(current)) { + code = tsdbLoadFSFromFile(current, &pTsdb->fs); + TSDB_CHECK_CODE(code, lino, _exit); + + if (taosCheckExistFile(current_t)) { + if (rollback) { + code = tsdbFSRollback(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFSCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } + } } else { // empty one - code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, current); + code = tsdbSaveFSToFile(&pTsdb->fs, current); TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(!rollback); } // scan and fix FS @@ -363,7 +794,6 @@ _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); } - return code; } @@ -404,19 +834,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) { int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { int32_t code = 0; + int32_t lino = 0; pFS->pDelFile = NULL; - pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); - if (pFS->aDFileSet == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + if (pFS->aDFileSet) { + taosArrayClear(pFS->aDFileSet); + } else { + pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet)); + if (pFS->aDFileSet == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); + } } if (pTsdb->fs.pDelFile) { pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile)); if (pFS->pDelFile == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *pFS->pDelFile = *pTsdb->fs.pDelFile; @@ -430,7 +865,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); if (fSet.pHeadF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pHeadF = *pSet->pHeadF; @@ -438,7 +873,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile)); if (fSet.pDataF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pDataF = *pSet->pDataF; @@ -446,7 +881,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile)); if (fSet.pSmaF == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.pSmaF = *pSet->pSmaF; @@ -455,26 +890,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); if (fSet.aSttF[fSet.nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; } if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + TSDB_CHECK_CODE(code, lino, _exit); } } _exit: - return code; -} - -int32_t tsdbFSRollback(STsdbFS *pFS) { - int32_t code = 0; - - ASSERT(0); - + if (code) { + tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); + } return code; } @@ -591,7 +1021,7 @@ int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) { pTsdb->path, TD_DIRSEP); // gnrt CURRENT.t - code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname); + code = tsdbSaveFSToFile(pFSNew, tfname); if (code) goto _err; // rename diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index d332d913ca89ff0a80cefd9fececdb2c6279114e..be862308f116da45c53f53a25f89192dd25432c2 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile)); if (pFile == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; if (fd >= 0) close(fd); if (fp != NULL) fclose(fp); return NULL;