diff --git a/src/tsdb/inc/tsdbFile.h b/src/tsdb/inc/tsdbFile.h index 54d9bbaec21c471e8bcc06df88e4507bf9dc1be8..1713240b2928db8f082463720fd3f31df5690945 100644 --- a/src/tsdb/inc/tsdbFile.h +++ b/src/tsdb/inc/tsdbFile.h @@ -38,7 +38,15 @@ #define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) #define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) #define TSDB_FSET_NFILES_VALID(s) (((s)->nFiles >= TSDB_FILE_MIN) && ((s)->nFiles <= TSDB_FILE_MAX)) -typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T; +typedef enum { + TSDB_FILE_HEAD = 0, + TSDB_FILE_DATA, + TSDB_FILE_LAST, + TSDB_FILE_SMAD, // sma for .data + TSDB_FILE_SMAL, // sma for .last + TSDB_FILE_MAX, + TSDB_FILE_META +} TSDB_FILE_T; #define TSDB_FILE_MIN 3U // min number of files in one DFileSet diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 617eb38878d6693260200cdcb3a637496a2a881b..54a4c0314ad1723b16bc076de453798569f12b46 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -126,7 +126,7 @@ typedef struct { int32_t len; uint32_t type : 8; uint32_t offset : 24; - char padding[]; + // char padding[]; } SBlockColV1; #define SBlockCol SBlockColV1 // latest SBlockCol definition @@ -191,7 +191,8 @@ struct SReadH { #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -#define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA) +#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD) +#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL) #define TSDB_READ_BUF(rh) ((rh)->pBuf) #define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) #define TSDB_READ_EXBUF(rh) ((rh)->pExBuf) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 4d73bd4692c3cb58c01d314d561006af56e91dc9..87acea636ac34f4f2a6cb90b6ba62f7d9293e450 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -35,7 +35,6 @@ typedef struct { SDFileSet wSet; bool isDFileSame; bool isLFileSame; - bool isSmaFileSame; TSKEY minKey; TSKEY maxKey; SArray * aBlkIdx; // SBlockIdx array @@ -52,7 +51,8 @@ typedef struct { #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) -#define TSDB_COMMIT_AGGR_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMA) +#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) +#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) #define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) #define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) #define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh)) @@ -1055,7 +1055,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { return 0; } } - +static int originalDataFileSize = 0; +static int originalLastFileSize = 0; +static char latestLastFile[TSDB_FILENAME_LEN] = {0}; int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { STsdbCfg * pCfg = REPO_CFG(pRepo); @@ -1064,6 +1066,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile int64_t offset = 0, offsetAggr = 0; int rowsToWrite = pDataCols->numOfRows; + int blkSizeBefore = 0; + int blkSizeAfter = 0; + int aggrSizeBefore = 0; + int aggrSizeAfter = 0; + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); @@ -1115,6 +1122,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile // Compress the data if neccessary int tcol = 0; // counter of not all NULL and written columns uint32_t toffset = 0; + int32_t tsizeV0 = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, TSDB_SBLK_VER_0); int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest); int32_t lsize = tsize; int32_t keyLen = 0; @@ -1173,6 +1181,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile toffset += flen; lsize += flen; + tsizeV0 += flen; } pBlockData->delimiter = TSDB_FILE_DELIMITER; @@ -1181,16 +1190,17 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); - + blkSizeBefore = pDFile->info.size; // Write the whole block to file if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { return -1; } - + blkSizeAfter = pDFile->info.size; #ifdef __TD_6117__ // pAggrBlkData->delimiter = TSDB_FILE_DELIMITER; // pAggrBlkData->uid = TABLE_UID(pTable); int aggrStatus = ((aggrNum > 0) && (rowsToWrite > 5)) ? 1 : 0; // TODO: How to make the decision? + aggrSizeBefore = pDFileAggr->info.size; if (aggrStatus > 0) { pAggrBlkData->numOfCols = nColsNotAllNull; @@ -1203,6 +1213,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile } } #endif + aggrSizeAfter = pDFileAggr->info.size; // Update pBlock membership vairables pBlock->last = isLast; @@ -1222,22 +1233,45 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile pBlock->aggrLen = tsizeAggr; #endif +#ifndef __TD_6117__ tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); +#else + if (isLast) { + if (strncmp(TSDB_FILE_FULL_NAME(pDFile), latestLastFile, TSDB_FILENAME_LEN) == 0) { + originalLastFileSize += tsizeV0; + } else { + originalLastFileSize = tsizeV0; + strncpy(latestLastFile, TSDB_FILE_FULL_NAME(pDFile), TSDB_FILENAME_LEN); + } + } else { + originalDataFileSize += tsizeV0; + } + + tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 + " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, + REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, + pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); + tsdbDebug( + "prop:vgId:%d tid:%d a block of data is written to file %s(len:%d, size: %d -> %d), SMA (len:%d, size: %d -> " + "%d), .lastSize = %d, .dataSize = %d", + REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), pBlock->len, blkSizeBefore, blkSizeAfter, + pBlock->aggrLen, aggrSizeBefore, aggrSizeAfter, originalLastFileSize, originalDataFileSize); +#endif return 0; } static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper) { - return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, TSDB_COMMIT_AGGR_FILE(pCommith), pDataCols, pBlock, isLast, - isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, + isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols, + pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith)))); } - static int tsdbWriteBlockInfo(SCommitH *pCommih) { SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); SBlockIdx blkIdx; @@ -1577,7 +1611,6 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid pCommith->isDFileSame = false; pCommith->isLFileSame = false; - pCommith->isSmaFileSame = false; tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); } else { @@ -1653,18 +1686,16 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid } } - // TSDB_FILE_SMA - ASSERT(pWSet->nFiles >= TSDB_FILE_SMA); - SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh)); - SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith); + // TSDB_FILE_SMAD + SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh)); + SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith); - if (access(TSDB_FILE_FULL_NAME(pRSmaf), F_OK) != 0) { - tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmaf)); - tsdbInitDFile(pWSmaf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMA); - pCommith->isLFileSame = false; + if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) { + tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF)); + tsdbInitDFile(pWSmadF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD); - if (tsdbCreateDFile(pWSmaf, true, TSDB_FILE_SMA) < 0) { - tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), + if (tsdbCreateDFile(pWSmadF, true, TSDB_FILE_SMAD) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); @@ -1675,10 +1706,29 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid } } } else { - tsdbInitDFileEx(pWSmaf, pRSmaf); - pCommith->isSmaFileSame = true; - if (tsdbOpenDFile(pWSmaf, O_RDWR) < 0) { - tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf), + tsdbInitDFileEx(pWSmadF, pRSmadF); + if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } + + // TSDB_FILE_SMAL + ASSERT(pWSet->nFiles >= TSDB_FILE_SMAL); + SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh)); + SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith); + + if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) { + tsdbInitDFileEx(pWSmalF, pRSmalF); + if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), tstrerror(terrno)); tsdbCloseDFileSet(pWSet); @@ -1688,6 +1738,21 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid return -1; } } + } else { + tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF)); + tsdbInitDFile(pWSmalF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL); + + if (tsdbCreateDFile(pWSmalF, true, TSDB_FILE_SMAL) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } } } diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 4fdd293d70b2e2c348f5e99b812b844ca157cc5b..0f6502a4f527b516d560867f25fe55aa6a5c2be2 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -37,7 +37,8 @@ typedef struct { #define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD) #define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA) #define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST) -#define TSDB_COMPACT_AGGR_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMA) +#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD) +#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL) #define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh)) #define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) #define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh)) @@ -523,8 +524,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { isLast = false; } - if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, TSDB_COMPACT_AGGR_FILE(pComph), pDataCols, &block, isLast, true, - ppBuf, ppCBuf, ppExBuf) < 0) { + if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, + isLast ? TSDB_COMPACT_SMAL_FILE(pComph) : TSDB_COMPACT_SMAD_FILE(pComph), pDataCols, &block, + isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) { return -1; } diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index 9924a640b089fb6cbf6518e18a74f4fcaf32ad7c..edbf91d845ad533e66ca8ed25b6bb9d901903d3d 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -16,12 +16,13 @@ #include "tsdbint.h" static const char *TSDB_FNAME_SUFFIX[] = { - "head", // TSDB_FILE_HEAD - "data", // TSDB_FILE_DATA - "last", // TSDB_FILE_LAST - "sma", // TSDB_FILE_SMA(Small Materialized Aggregate) - "", // TSDB_FILE_MAX - "meta", // TSDB_FILE_META + "head", // TSDB_FILE_HEAD + "data", // TSDB_FILE_DATA + "last", // TSDB_FILE_LAST + "smad", // TSDB_FILE_SMA_DATA(Small Materialized Aggregate for .data File) + "smal", // TSDB_FILE_SMA_LAST(Small Materialized Aggregate for .last File) + "", // TSDB_FILE_MAX + "meta", // TSDB_FILE_META }; static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname); diff --git a/src/tsdb/src/tsdbReadImpl.c b/src/tsdb/src/tsdbReadImpl.c index 7ebba1783ae638da32cef4c8acf5734316ccc0af..13100f9079e1421faf3a22663b9a37c03db1f31c 100644 --- a/src/tsdb/src/tsdbReadImpl.c +++ b/src/tsdb/src/tsdbReadImpl.c @@ -450,7 +450,7 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) { static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) { ASSERT((pBlock->blkVer > TSDB_SBLK_VER_0) && (pBlock->aggrStat)); // TODO: remove after pass all the test - SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh); + SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh); if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) { tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",