diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a7831f301cd42fc0d746fa78725f4ceacce1368f..87dbaa8d53df5ae325dc5b2bbaa6a4f4ad27a0c1 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -183,6 +183,9 @@ bool tsdbTbDataIterNext(STbDataIter *pIter); // tsdbFile.c ============================================================================================== typedef enum { TSDB_HEAD_FILE = 0, TSDB_DATA_FILE, TSDB_LAST_FILE, TSDB_SMA_FILE } EDataFileT; void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char fname[]); +bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype); +int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype); +int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype); int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype); int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile); @@ -205,7 +208,7 @@ SDFileSet *tsdbFSStateGetDFileSet(STsdbFSState *pState, int32_t fid); // SDataFWriter int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); -int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf); +int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, uint8_t **ppBuf, SBlockIdx *pBlockIdx); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_t **ppBuf1, uint8_t **ppBuf2, diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 4371cded6464e2c3213f92f917ab582dc3d84c50..492fcdbaa53f23abb52fdb71942771ee6eb228e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -782,7 +782,13 @@ static int32_t tsdbCommitDiskData(SCommitter *pCommitter, SBlockIdx *oBlockIdx) if (code) goto _err; tBlockReset(pBlockN); - pBlockN->last = 1; + pBlockN->minKey = pBlockO->minKey; + pBlockN->maxKey = pBlockO->maxKey; + pBlockN->minVersion = pBlockO->minVersion; + pBlockN->maxVersion = pBlockO->maxVersion; + pBlockN->nRow = pBlockO->nRow; + pBlockN->last = pBlockO->last; + pBlockN->hasDup = pBlockO->hasDup; code = tsdbWriteBlockData(pCommitter->pWriter, pBlockDataO, NULL, NULL, pBlockIdx, pBlockN, pCommitter->cmprAlg); if (code) goto _err; @@ -964,7 +970,7 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { if (code) goto _err; // update file header - code = tsdbUpdateDFileSetHeader(pCommitter->pWriter, NULL); + code = tsdbUpdateDFileSetHeader(pCommitter->pWriter); if (code) goto _err; // upsert SDFileSet diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 27c2f3fa7a0ff32157f5006f7075fce53f386a59..c2af9c53e4b42f440512adcb8914606dc0755dda 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -171,7 +171,71 @@ _err: static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet *pTo) { int32_t code = 0; - // TODO + char fname[TSDB_FILENAME_LEN]; + + if (pFrom && pTo) { + // head + if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) { + ASSERT(0); + } else { + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname); + taosRemoveFile(fname); + } + + // data + if (tsdbFileIsSame(pFrom, pTo, TSDB_DATA_FILE)) { + if (pFrom->fData.size > pTo->fData.size) { + code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_DATA_FILE); + if (code) goto _err; + } + } else { + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname); + taosRemoveFile(fname); + } + + // last + if (tsdbFileIsSame(pFrom, pTo, TSDB_LAST_FILE)) { + if (pFrom->fLast.size > pTo->fLast.size) { + code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE); + if (code) goto _err; + } + } else { + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname); + taosRemoveFile(fname); + } + + // sma + if (tsdbFileIsSame(pFrom, pTo, TSDB_SMA_FILE)) { + if (pFrom->fSma.size > pTo->fSma.size) { + code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_SMA_FILE); + if (code) goto _err; + } + } else { + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname); + taosRemoveFile(fname); + } + } else if (pFrom) { + // head + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname); + taosRemoveFile(fname); + + // data + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_DATA_FILE, fname); + taosRemoveFile(fname); + + // last + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_LAST_FILE, fname); + taosRemoveFile(fname); + + // fsm + tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_SMA_FILE, fname); + taosRemoveFile(fname); + } + + return code; + +_err: + tsdbError("vgId:%d tsdb apply disk file set change failed since %s", TD_VID(pFS->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 18b215afadd144b69e66a1b0084505de6909269a..5a4dd830729eb441caed938fe5283f75f7c2f495 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -120,6 +120,107 @@ void tsdbDataFileName(STsdb *pTsdb, SDFileSet *pDFileSet, EDataFileT ftype, char } } +bool tsdbFileIsSame(SDFileSet *pDFileSet1, SDFileSet *pDFileSet2, EDataFileT ftype) { + if (pDFileSet1->diskId.level != pDFileSet2->diskId.level || pDFileSet1->diskId.id != pDFileSet2->diskId.id) { + return false; + } + + switch (ftype) { + case TSDB_HEAD_FILE: + return pDFileSet1->fHead.commitID == pDFileSet2->fHead.commitID; + case TSDB_DATA_FILE: + return pDFileSet1->fData.commitID == pDFileSet2->fData.commitID; + case TSDB_LAST_FILE: + return pDFileSet1->fLast.commitID == pDFileSet2->fLast.commitID; + case TSDB_SMA_FILE: + return pDFileSet1->fSma.commitID == pDFileSet2->fSma.commitID; + default: + ASSERT(0); + break; + } +} + +int32_t tsdbUpdateDFileHdr(TdFilePtr pFD, SDFileSet *pSet, EDataFileT ftype) { + int32_t code = 0; + int64_t n; + char hdr[TSDB_FHDR_SIZE]; + + memset(hdr, 0, TSDB_FHDR_SIZE); + tPutDataFileHdr(hdr, pSet, ftype); + taosCalcChecksumAppend(0, hdr, TSDB_FHDR_SIZE); + + n = taosLSeekFile(pFD, 0, SEEK_SET); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + + n = taosWriteFile(pFD, hdr, TSDB_FHDR_SIZE); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _exit; + } + +_exit: + return code; +} + +int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype) { + int32_t code = 0; + int64_t size; + TdFilePtr pFD; + char fname[TSDB_FILENAME_LEN]; + + tsdbDataFileName(pTsdb, pSet, ftype, fname); + + // open + pFD = taosOpenFile(fname, TD_FILE_WRITE); + if (pFD == NULL) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // truncate + switch (ftype) { + case TSDB_HEAD_FILE: + size = pSet->fHead.size; + break; + case TSDB_DATA_FILE: + size = pSet->fData.size; + break; + case TSDB_LAST_FILE: + size = pSet->fLast.size; + break; + case TSDB_SMA_FILE: + size = pSet->fSma.size; + break; + default: + ASSERT(0); + } + if (taosFtruncateFile(pFD, size) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // update header + code = tsdbUpdateDFileHdr(pFD, pSet, ftype); + if (code) goto _err; + + // sync + if (taosFsyncFile(pFD) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // close + taosCloseFile(&pFD); + + return code; + +_err: + return code; +} + int32_t tPutDataFileHdr(uint8_t *p, SDFileSet *pSet, EDataFileT ftype) { int32_t n = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index fb500ca1e7ad5151fa67fa776fc052be6139c9ac..0cfb18f9812874cee3a0d71db1a30233c566c254 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -1047,99 +1047,35 @@ _err: return code; } -int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { +int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { int32_t code = 0; int64_t size = TSDB_FHDR_SIZE; int64_t n; - uint8_t *pBuf = NULL; + uint8_t hdr[TSDB_FHDR_SIZE]; SHeadFile *pHeadFile = &pWriter->wSet.fHead; SDataFile *pDataFile = &pWriter->wSet.fData; SLastFile *pLastFile = &pWriter->wSet.fLast; SSmaFile *pSmaFile = &pWriter->wSet.fSma; - // alloc - if (!ppBuf) ppBuf = &pBuf; - code = tsdbRealloc(ppBuf, size); - if (code) goto _err; - // head ============== - // build - memset(*ppBuf, 0, size); - tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_HEAD_FILE); - taosCalcChecksumAppend(0, *ppBuf, size); - - // seek - if (taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // write - n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_HEAD_FILE); + if (code) goto _err; // data ============== - memset(*ppBuf, 0, size); - tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_DATA_FILE); - taosCalcChecksumAppend(0, *ppBuf, size); - - // seek - if (taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // write - n = taosWriteFile(pWriter->pDataFD, *ppBuf, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_DATA_FILE); + if (code) goto _err; // last ============== - memset(*ppBuf, 0, size); - tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_LAST_FILE); - taosCalcChecksumAppend(0, *ppBuf, size); - - // seek - if (taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // write - n = taosWriteFile(pWriter->pLastFD, *ppBuf, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_LAST_FILE); + if (code) goto _err; // sma ============== - memset(*ppBuf, 0, size); - tPutDataFileHdr(*ppBuf, &pWriter->wSet, TSDB_SMA_FILE); - taosCalcChecksumAppend(0, *ppBuf, size); - - // seek - if (taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET) < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - // write - n = taosWriteFile(pWriter->pSmaFD, *ppBuf, size); - if (n < 0) { - code = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + code = tsdbUpdateDFileHdr(pWriter->pHeadFD, &pWriter->wSet, TSDB_SMA_FILE); + if (code) goto _err; - tsdbFree(pBuf); return code; _err: - tsdbFree(pBuf); tsdbError("vgId:%d update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 424f7fc5e8b5fa1c2e26a7b9409036947d7a3199..54a90f8184a43acb47d1c008f52ffdbc5840efab 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -355,6 +355,7 @@ void tBlockReset(SBlock *pBlock) { pBlock->aSubBlock[iSubBlock].nRow = 0; pBlock->aSubBlock[iSubBlock].cmprAlg = -1; pBlock->aSubBlock[iSubBlock].offset = -1; + pBlock->aSubBlock[iSubBlock].vsize = -1; pBlock->aSubBlock[iSubBlock].ksize = -1; pBlock->aSubBlock[iSubBlock].bsize = -1; tMapDataReset(&pBlock->aSubBlock->mBlockCol);