/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include "tsdbUpgrade.h" // old extern void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); extern int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData); // new extern int32_t save_fs(const TFileSetArray *arr, const char *fname); extern int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype); extern int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, TBrinBlkArray *brinBlkArray, uint8_t **bufArr); extern int32_t tsdbFileWriteBrinBlk(STsdbFD *fd, TBrinBlkArray *brinBlkArray, SFDataPtr *ptr, int64_t *fileSize); extern int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFooter *footer); static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; struct { // config int32_t maxRow; int8_t cmprAlg; int32_t szPage; uint8_t *bufArr[8]; // reader SArray *aBlockIdx; SMapData mDataBlk[1]; SBlockData blockData[1]; // writer STsdbFD *fd; SBrinBlock brinBlock[1]; TBrinBlkArray brinBlkArray[1]; SHeadFooter footer[1]; } ctx[1] = {{ .maxRow = tsdb->pVnode->config.tsdbCfg.maxRows, .cmprAlg = tsdb->pVnode->config.tsdbCfg.compression, .szPage = tsdb->pVnode->config.tsdbPageSize, }}; if ((ctx->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbReadBlockIdx(reader, ctx->aBlockIdx); TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(ctx->aBlockIdx) == 0) { goto _exit; } else { STFile file = { .type = TSDB_FTYPE_HEAD, .did = pDFileSet->diskId, .fid = fset->fid, .cid = pDFileSet->pHeadF->commitID, .size = pDFileSet->pHeadF->size, }; code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_HEAD]); TSDB_CHECK_CODE(code, lino, _exit); // open fd char fname[TSDB_FILENAME_LEN]; tsdbTFileName(tsdb, &file, fname); code = tsdbOpenFile(fname, ctx->szPage, TD_FILE_READ | TD_FILE_WRITE, &ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t iBlockIdx = 0; iBlockIdx < taosArrayGetSize(ctx->aBlockIdx); ++iBlockIdx) { SBlockIdx *pBlockIdx = taosArrayGet(ctx->aBlockIdx, iBlockIdx); code = tsdbReadDataBlk(reader, pBlockIdx, ctx->mDataBlk); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) { SDataBlk dataBlk[1]; tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk); SBrinRecord record = { .suid = pBlockIdx->suid, .uid = pBlockIdx->uid, .firstKey = dataBlk->minKey.ts, .firstKeyVer = dataBlk->minKey.version, .lastKey = dataBlk->maxKey.ts, .lastKeyVer = dataBlk->maxKey.version, .minVer = dataBlk->minVer, .maxVer = dataBlk->maxVer, .blockOffset = dataBlk->aSubBlock->offset, .smaOffset = dataBlk->smaInfo.offset, .blockSize = dataBlk->aSubBlock->szBlock, .blockKeySize = dataBlk->aSubBlock->szKey, .smaSize = dataBlk->smaInfo.size, .numRow = dataBlk->nRow, .count = dataBlk->nRow, }; if (dataBlk->hasDup) { code = tsdbReadDataBlockEx(reader, dataBlk, ctx->blockData); TSDB_CHECK_CODE(code, lino, _exit); record.count = 1; for (int32_t i = 1; i < ctx->blockData->nRow; ++i) { if (ctx->blockData->aTSKEY[i] != ctx->blockData->aTSKEY[i - 1]) { record.count++; } } } code = tBrinBlockPut(ctx->brinBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) { code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->brinBlkArray, ctx->bufArr); TSDB_CHECK_CODE(code, lino, _exit); } } } if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) { code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->brinBlkArray, ctx->bufArr); TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbFileWriteBrinBlk(ctx->fd, ctx->brinBlkArray, ctx->footer->brinBlkPtr, &fset->farr[TSDB_FTYPE_HEAD]->f->size); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFsyncFile(ctx->fd); TSDB_CHECK_CODE(code, lino, _exit); tsdbCloseFile(&ctx->fd); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } TARRAY2_DESTROY(ctx->brinBlkArray, NULL); tBrinBlockDestroy(ctx->brinBlock); tBlockDataDestroy(ctx->blockData); tMapDataClear(ctx->mDataBlk); taosArrayDestroy(ctx->aBlockIdx); for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); ++i) { tFree(ctx->bufArr[i]); } return code; } static int32_t tsdbUpgradeData(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; // TODO _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbUpgradeSma(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; // TODO _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbUpgradeStt(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *reader, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; // TODO _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbUpgradeFileSet(STsdb *tsdb, SDFileSet *pDFileSet, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; SDataFReader *reader; STFileSet *fset; code = tsdbTFileSetInit(pDFileSet->fid, &fset); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDataFReaderOpen(&reader, tsdb, pDFileSet); TSDB_CHECK_CODE(code, lino, _exit); // .head code = tsdbUpgradeHead(tsdb, pDFileSet, reader, fset); TSDB_CHECK_CODE(code, lino, _exit); // .data code = tsdbUpgradeData(tsdb, pDFileSet, reader, fset); TSDB_CHECK_CODE(code, lino, _exit); // .sma code = tsdbUpgradeSma(tsdb, pDFileSet, reader, fset); TSDB_CHECK_CODE(code, lino, _exit); // .stt if (pDFileSet->nSttF > 0) { code = tsdbUpgradeStt(tsdb, pDFileSet, reader, fset); TSDB_CHECK_CODE(code, lino, _exit); } tsdbDataFReaderClose(&reader); code = TARRAY2_APPEND(fileSetArray, fset); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) { int32_t code = 0; int32_t lino = 0; SArray *aDelData = NULL; int64_t minKey, maxKey; STombBlock tombBlock[1] = {0}; TTombBlkArray tombBlkArray[1] = {0}; STsdbFD *fd = NULL; tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey); if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) { SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i); code = tsdbReadDelData(reader, pDelIdx, aDelData); TSDB_CHECK_CODE(code, lino, _exit); for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) { SDelData *pDelData = taosArrayGet(aDelData, j); if (pDelData->sKey > maxKey || pDelData->eKey < minKey) { continue; } STombRecord record = { .suid = pDelIdx->suid, .uid = pDelIdx->uid, .version = pDelData->version, .skey = pDelData->sKey, .ekey = pDelData->eKey, }; code = tTombBlockPut(tombBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) { if (fd == NULL) { STFile file = { .type = TSDB_FTYPE_TOMB, .did = {0}, // TODO .fid = fset->fid, .cid = 0, // TODO }; code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd); TSDB_CHECK_CODE(code, lino, _exit); uint8_t hdr[TSDB_FHDR_SIZE] = {0}; code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE); TSDB_CHECK_CODE(code, lino, _exit); fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr); } // TODO tTombBlockClear(tombBlock); } } } if (TOMB_BLOCK_SIZE(tombBlock) > 0) { // TODO tTombBlockClear(tombBlock); } if (TARRAY2_SIZE(tombBlkArray) > 0) { // TODO } if (fd) { // write footer // sync and close code = tsdbFsyncFile(fd); TSDB_CHECK_CODE(code, lino, _exit); tsdbCloseFile(&fd); } // clear TARRAY2_DESTROY(tombBlkArray, NULL); tTombBlockDestroy(tombBlock); taosArrayDestroy(aDelData); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; SDelFReader *reader = NULL; SArray *aDelIdx = NULL; if ((aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } code = tsdbDelFReaderOpen(&reader, pDelFile, tsdb); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadDelIdx(reader, aDelIdx); TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(aDelIdx) > 0) { STFileSet *fset; TARRAY2_FOREACH(fileSetArray, fset) { code = tsdbDumpTombDataToFSet(tsdb, reader, aDelIdx, fset); TSDB_CHECK_CODE(code, lino, _exit); } } tsdbDelFReaderClose(&reader); taosArrayDestroy(aDelIdx); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, TFileSetArray *fileSetArray) { int32_t code = 0; int32_t lino = 0; // upgrade each file set for (int32_t i = 0; i < taosArrayGetSize(tsdb->fs.aDFileSet); i++) { code = tsdbUpgradeFileSet(tsdb, taosArrayGet(tsdb->fs.aDFileSet, i), fileSetArray); TSDB_CHECK_CODE(code, lino, _exit); } // upgrade tomb file if (tsdb->fs.pDelFile != NULL) { code = tsdbUpgradeTombFile(tsdb, tsdb->fs.pDelFile, fileSetArray); TSDB_CHECK_CODE(code, lino, _exit); } _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } return code; } static int32_t tsdbUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { int32_t code = 0; int32_t lino = 0; TFileSetArray fileSetArray[1] = {0}; // open old file system code = tsdbFSOpen(tsdb, rollback); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbDoUpgradeFileSystem(tsdb, fileSetArray); TSDB_CHECK_CODE(code, lino, _exit); // close file system code = tsdbFSClose(tsdb); TSDB_CHECK_CODE(code, lino, _exit); // save new file system char fname[TSDB_FILENAME_LEN]; current_fname(tsdb, fname, TSDB_FCURRENT); code = save_fs(fileSetArray, fname); TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear); return code; } int32_t tsdbCheckAndUpgradeFileSystem(STsdb *tsdb, int8_t rollback) { char fname[TSDB_FILENAME_LEN]; tsdbGetCurrentFName(tsdb, fname, NULL); if (!taosCheckExistFile(fname)) return 0; int32_t code = tsdbUpgradeFileSystem(tsdb, rollback); if (code) return code; taosRemoveFile(fname); return 0; }