提交 2a4e788d 编写于 作者: H Hongze Cheng

refactor more code

上级 5f21e2e7
...@@ -23,6 +23,8 @@ ...@@ -23,6 +23,8 @@
#define TSDB_DATA_FILE_CHANGE 0 #define TSDB_DATA_FILE_CHANGE 0
#define TSDB_META_FILE_CHANGE 1 #define TSDB_META_FILE_CHANGE 1
#define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5)
typedef struct { typedef struct {
int maxIters; int maxIters;
SCommitIter *pIters; SCommitIter *pIters;
...@@ -998,12 +1000,18 @@ static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { ...@@ -998,12 +1000,18 @@ static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) {
} }
static int tsdbAppendCommit(STSCommitHandle *pTSCh) { static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
SDataCols *pDataCols = pTSCh->pDataCols; SDataCols * pDataCols = pTSCh->pDataCols;
SBlock block = {0}; SReadHandle *pReadH = pTSCh->pReadH;
SBlock * pBlock = █ STsdbRepo * pRepo = pReadH->pRepo;
STable * pTable = pReadH->pTable;
SBlock block = {0};
SBlock * pBlock = █
STsdbCfg * pCfg = &(pRepo->config);
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rowsToRead = tsdbLoadDataFromCache(); int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows, pDataCols, NULL, 0);
ASSERT(rowsToRead > 0); ASSERT(rowsToRead > 0);
if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1; if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1;
...@@ -1013,15 +1021,10 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { ...@@ -1013,15 +1021,10 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
} }
static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) {
STable * pTable = pTSCh->pReadH->pTable; if (pBlock->last) { // merge with the last block
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); if (tsdbCommitMergeLastBlock(pTSCh, pBlock) < 0) return -1;
} else { // merge with a data block
TSKEY nextKey = tsdbNextIterKey(pIter->pIter); if (tsdbCommitMergeDataBlock(pTSCh, pBlock) < 0) return -1;
if (pBlock->last) {
} else {
} }
return 0; return 0;
...@@ -1163,5 +1166,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1163,5 +1166,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
pFile->info.size += pBlock->len; pFile->info.size += pBlock->len;
return 0;
}
static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SDataCols * pDataCols = pTSCh->pDataCols;
SReadHandle *pReadH = pTSCh->pReadH;
STable * pTable = pTSCh->pReadH->pTable;
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
SBlock nblock = {0};
SBlock * pNBlock = &nblock;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock);
if (nextKey > pBlock->keyLast) { // just merge and append
tdResetDataCols(pDataCols);
int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows,
pDataCols, NULL, 0);
ASSERT(rowsToRead > 0);
if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS &&
/* No new last file is opened*/) {
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true,
false) < 0) {
return -1;
};
if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1;
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1;
}
} else {
if (/* append the old last file */) {
SSkipListIterator titer = *(pIter->pIter);
int16_t colId = 0;
if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1;
int rowsToRead = tsdbLoadDataFromCache();
if (rowsToRead == 0) {
*(pIter->pIter) = titer;
tsdbCopyBlocks();
} else {
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) {
tsdbLoadDataFromCache();
if (tsdbWriteBlockToFile() < 0) return -1;
if (tsdbaddsubblock() < 0) return -1;
} else {
if (tasdbloadblockdata() < 0) return -1;
while (true)
{
tsdbLoadAndMergeFromCache();
}
}
}
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) {
tsdbLoadAndMergeFromCache();
}
}
}
return 0;
}
static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
TSKEY blkKeyFirst = pCompBlock->keyFirst;
TSKEY blkKeyLast = pCompBlock->keyLast;
if (keyFirst < blkKeyFirst) {
while (true) {
tdResetDataCols(pDataCols);
int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++;
}
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0;
int dIter = 0;
while (true) {
int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
}
round++;
tblkIdx++;
}
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
}
}
}
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册