提交 d40f37b5 编写于 作者: C Cary Xu

separate sma to smad and smal

上级 24cbab3e
......@@ -38,7 +38,15 @@
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
#define TSDB_FSET_NFILES_VALID(s) (((s)->nFiles >= TSDB_FILE_MIN) && ((s)->nFiles <= TSDB_FILE_MAX))
typedef enum { TSDB_FILE_HEAD = 0, TSDB_FILE_DATA, TSDB_FILE_LAST, TSDB_FILE_SMA, TSDB_FILE_MAX, TSDB_FILE_META } TSDB_FILE_T;
typedef enum {
TSDB_FILE_HEAD = 0,
TSDB_FILE_DATA,
TSDB_FILE_LAST,
TSDB_FILE_SMAD, // sma for .data
TSDB_FILE_SMAL, // sma for .last
TSDB_FILE_MAX,
TSDB_FILE_META
} TSDB_FILE_T;
#define TSDB_FILE_MIN 3U // min number of files in one DFileSet
......
......@@ -126,7 +126,7 @@ typedef struct {
int32_t len;
uint32_t type : 8;
uint32_t offset : 24;
char padding[];
// char padding[];
} SBlockColV1;
#define SBlockCol SBlockColV1 // latest SBlockCol definition
......@@ -191,7 +191,8 @@ struct SReadH {
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
#define TSDB_READ_AGGR_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMA)
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
......
......@@ -35,7 +35,6 @@ typedef struct {
SDFileSet wSet;
bool isDFileSame;
bool isLFileSame;
bool isSmaFileSame;
TSKEY minKey;
TSKEY maxKey;
SArray * aBlkIdx; // SBlockIdx array
......@@ -52,7 +51,8 @@ typedef struct {
#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD)
#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA)
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_AGGR_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMA)
#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD)
#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_EXBUF(ch) TSDB_READ_EXBUF(&((ch)->readh))
......@@ -1055,7 +1055,9 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
return 0;
}
}
static int originalDataFileSize = 0;
static int originalLastFileSize = 0;
static char latestLastFile[TSDB_FILENAME_LEN] = {0};
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo);
......@@ -1064,6 +1066,11 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows;
int blkSizeBefore = 0;
int blkSizeAfter = 0;
int aggrSizeBefore = 0;
int aggrSizeAfter = 0;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
......@@ -1115,6 +1122,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
// Compress the data if neccessary
int tcol = 0; // counter of not all NULL and written columns
uint32_t toffset = 0;
int32_t tsizeV0 = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, TSDB_SBLK_VER_0);
int32_t tsize = (int32_t)tsdbBlockStatisSize(nColsNotAllNull, SBlockVerLatest);
int32_t lsize = tsize;
int32_t keyLen = 0;
......@@ -1173,6 +1181,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
toffset += flen;
lsize += flen;
tsizeV0 += flen;
}
pBlockData->delimiter = TSDB_FILE_DELIMITER;
......@@ -1181,16 +1190,17 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)));
blkSizeBefore = pDFile->info.size;
// Write the whole block to file
if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) {
return -1;
}
blkSizeAfter = pDFile->info.size;
#ifdef __TD_6117__
// pAggrBlkData->delimiter = TSDB_FILE_DELIMITER;
// pAggrBlkData->uid = TABLE_UID(pTable);
int aggrStatus = ((aggrNum > 0) && (rowsToWrite > 5)) ? 1 : 0; // TODO: How to make the decision?
aggrSizeBefore = pDFileAggr->info.size;
if (aggrStatus > 0) {
pAggrBlkData->numOfCols = nColsNotAllNull;
......@@ -1203,6 +1213,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
}
}
#endif
aggrSizeAfter = pDFileAggr->info.size;
// Update pBlock membership vairables
pBlock->last = isLast;
......@@ -1222,22 +1233,45 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
pBlock->aggrLen = tsizeAggr;
#endif
#ifndef __TD_6117__
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
#else
if (isLast) {
if (strncmp(TSDB_FILE_FULL_NAME(pDFile), latestLastFile, TSDB_FILENAME_LEN) == 0) {
originalLastFileSize += tsizeV0;
} else {
originalLastFileSize = tsizeV0;
strncpy(latestLastFile, TSDB_FILE_FULL_NAME(pDFile), TSDB_FILENAME_LEN);
}
} else {
originalDataFileSize += tsizeV0;
}
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast);
tsdbDebug(
"prop:vgId:%d tid:%d a block of data is written to file %s(len:%d, size: %d -> %d), SMA (len:%d, size: %d -> "
"%d), .lastSize = %d, .dataSize = %d",
REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), pBlock->len, blkSizeBefore, blkSizeAfter,
pBlock->aggrLen, aggrSizeBefore, aggrSizeAfter, originalLastFileSize, originalDataFileSize);
#endif
return 0;
}
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
bool isSuper) {
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, TSDB_COMMIT_AGGR_FILE(pCommith), pDataCols, pBlock, isLast,
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile,
isLast ? TSDB_COMMIT_SMAL_FILE(pCommith) : TSDB_COMMIT_SMAD_FILE(pCommith), pDataCols,
pBlock, isLast, isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))), (void **)(&(TSDB_COMMIT_EXBUF(pCommith))));
}
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
SBlockIdx blkIdx;
......@@ -1577,7 +1611,6 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
pCommith->isDFileSame = false;
pCommith->isLFileSame = false;
pCommith->isSmaFileSame = false;
tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet),
TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet));
} else {
......@@ -1653,18 +1686,16 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
// TSDB_FILE_SMA
ASSERT(pWSet->nFiles >= TSDB_FILE_SMA);
SDFile *pRSmaf = TSDB_READ_AGGR_FILE(&(pCommith->readh));
SDFile *pWSmaf = TSDB_COMMIT_AGGR_FILE(pCommith);
// TSDB_FILE_SMAD
SDFile *pRSmadF = TSDB_READ_SMAD_FILE(&(pCommith->readh));
SDFile *pWSmadF = TSDB_COMMIT_SMAD_FILE(pCommith);
if (access(TSDB_FILE_FULL_NAME(pRSmaf), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmaf));
tsdbInitDFile(pWSmaf, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMA);
pCommith->isLFileSame = false;
if (access(TSDB_FILE_FULL_NAME(pRSmadF), F_OK) != 0) {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmadF));
tsdbInitDFile(pWSmadF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAD);
if (tsdbCreateDFile(pWSmaf, true, TSDB_FILE_SMA) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
if (tsdbCreateDFile(pWSmadF, true, TSDB_FILE_SMAD) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
......@@ -1675,10 +1706,29 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
}
}
} else {
tsdbInitDFileEx(pWSmaf, pRSmaf);
pCommith->isSmaFileSame = true;
if (tsdbOpenDFile(pWSmaf, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmaf),
tsdbInitDFileEx(pWSmadF, pRSmadF);
if (tsdbOpenDFile(pWSmadF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmadF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
// TSDB_FILE_SMAL
ASSERT(pWSet->nFiles >= TSDB_FILE_SMAL);
SDFile *pRSmalF = TSDB_READ_SMAL_FILE(&(pCommith->readh));
SDFile *pWSmalF = TSDB_COMMIT_SMAL_FILE(pCommith);
if ((pCommith->isLFileSame) && access(TSDB_FILE_FULL_NAME(pRSmalF), F_OK) == 0) {
tsdbInitDFileEx(pWSmalF, pRSmalF);
if (tsdbOpenDFile(pWSmalF, O_RDWR) < 0) {
tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
......@@ -1688,6 +1738,21 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid
return -1;
}
}
} else {
tsdbDebug("vgId:%d create data file %s as not exist", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pRSmalF));
tsdbInitDFile(pWSmalF, did, REPO_ID(pRepo), fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_SMAL);
if (tsdbCreateDFile(pWSmalF, true, TSDB_FILE_SMAL) < 0) {
tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWSmalF),
tstrerror(terrno));
tsdbCloseDFileSet(pWSet);
(void)tsdbRemoveDFile(pWHeadf);
if (pCommith->isRFileSet) {
tsdbCloseAndUnsetFSet(&(pCommith->readh));
return -1;
}
}
}
}
......
......@@ -37,7 +37,8 @@ typedef struct {
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_AGGR_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMA)
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
#define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh))
......@@ -523,8 +524,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, TSDB_COMPACT_AGGR_FILE(pComph), pDataCols, &block, isLast, true,
ppBuf, ppCBuf, ppExBuf) < 0) {
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile,
isLast ? TSDB_COMPACT_SMAL_FILE(pComph) : TSDB_COMPACT_SMAD_FILE(pComph), pDataCols, &block,
isLast, true, ppBuf, ppCBuf, ppExBuf) < 0) {
return -1;
}
......
......@@ -16,12 +16,13 @@
#include "tsdbint.h"
static const char *TSDB_FNAME_SUFFIX[] = {
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"sma", // TSDB_FILE_SMA(Small Materialized Aggregate)
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
"head", // TSDB_FILE_HEAD
"data", // TSDB_FILE_DATA
"last", // TSDB_FILE_LAST
"smad", // TSDB_FILE_SMA_DATA(Small Materialized Aggregate for .data File)
"smal", // TSDB_FILE_SMA_LAST(Small Materialized Aggregate for .last File)
"", // TSDB_FILE_MAX
"meta", // TSDB_FILE_META
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
......
......@@ -450,7 +450,7 @@ static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock) {
static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock) {
ASSERT((pBlock->blkVer > TSDB_SBLK_VER_0) && (pBlock->aggrStat)); // TODO: remove after pass all the test
SDFile *pDFileAggr = TSDB_READ_AGGR_FILE(pReadh);
SDFile *pDFileAggr = pBlock->last ? TSDB_READ_SMAL_FILE(pReadh) : TSDB_READ_SMAD_FILE(pReadh);
if (tsdbSeekDFile(pDFileAggr, pBlock->aggrOffset, SEEK_SET) < 0) {
tsdbError("vgId:%d failed to load block statis part while seek file %s to offset %" PRId64 " since %s",
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册