提交 fea49884 编写于 作者: H Hongze Cheng

refactor more code

上级 9bfa475d
...@@ -680,17 +680,15 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { ...@@ -680,17 +680,15 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) { static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) {
STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config); STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config);
SFile * pFile = NULL; int ftype = 0;
bool islast = false;
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); ftype = TSDB_FILE_TYPE_DATA;
} else { } else {
islast = true; ftype = TSDB_FILE_TYPE_LAST;
pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
} }
if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, pBlock, islast, true) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, ftype, pDataCols, pBlock, true) < 0) return -1;
return 0; return 0;
} }
...@@ -964,8 +962,8 @@ static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -964,8 +962,8 @@ static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) {
return 0; return 0;
} }
static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pDataCols, SBlock *pBlock, bool isSuperBlock) {
bool isSuperBlock) { SFile * pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, ftype);
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo; STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
...@@ -973,6 +971,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -973,6 +971,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
int nColsNotAllNull = 0; int nColsNotAllNull = 0;
int csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); // column size int csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); // column size
int32_t keyLen = 0; int32_t keyLen = 0;
bool isLast = (ftype == TSDB_FILE_TYPE_LAST);
ASSERT(offset == lseek(pFile->fd, 0, SEEK_END)); ASSERT(offset == lseek(pFile->fd, 0, SEEK_END));
ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock);
...@@ -1101,151 +1100,6 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1101,151 +1100,6 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
return 0; return 0;
} }
// static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
// SDataCols * pDataCols = pTSCh->pDataCols;
// SReadHandle *pReadH = pTSCh->pReadH;
// STable * pTable = pTSCh->pReadH->pTable;
// SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
// STsdbRepo * pRepo = pReadH->pRepo;
// STsdbCfg * pCfg = &(pRepo->config);
// int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
// SBlock nblock = {0};
// SBlock * pNBlock = &nblock;
// TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
// int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock);
// if (nextKey > pBlock->keyLast) { // just merge and append
// tdResetDataCols(pDataCols);
// int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows,
// pDataCols, NULL, 0);
// ASSERT(rowsToRead > 0);
// if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS &&
// /* No new last file is opened*/) {
// if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true,
// false) < 0) {
// return -1;
// };
// if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1;
// } else {
// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
// if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
// if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1;
// if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1;
// }
// } else {
// if (/* append the old last file */) {
// SSkipListIterator titer = *(pIter->pIter);
// int16_t colId = 0;
// if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1;
// int rowsToRead = tsdbLoadDataFromCache();
// if (rowsToRead == 0) {
// *(pIter->pIter) = titer;
// tsdbCopyBlocks();
// } else {
// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) {
// tsdbLoadDataFromCache();
// if (tsdbWriteBlockToFile() < 0) return -1;
// if (tsdbaddsubblock() < 0) return -1;
// } else {
// if (tasdbloadblockdata() < 0) return -1;
// while (true)
// {
// tsdbLoadAndMergeFromCache();
// }
// }
// }
// } else {
// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
// while (true) {
// tsdbLoadAndMergeFromCache();
// }
// }
// }
// return 0;
// }
// static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
// TSKEY blkKeyFirst = pCompBlock->keyFirst;
// TSKEY blkKeyLast = pCompBlock->keyLast;
// if (keyFirst < blkKeyFirst) {
// while (true) {
// tdResetDataCols(pDataCols);
// int rowsRead =
// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
// if (rowsRead == 0) break;
// ASSERT(rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// tblkIdx++;
// }
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// ASSERT(keyFirst <= blkKeyLast);
// int16_t colId = 0;
// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
// slIter = *(pCommitIter->pIter);
// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
// pDataCols0->numOfRows);
// if (rows2 == 0) { // all filtered out
// *(pCommitIter->pIter) = slIter;
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
// int rows = (rows1 >= rows3) ? rows3 : rows2;
// tdResetDataCols(pDataCols);
// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
// pDataCols0->cols[0].pData, pDataCols0->numOfRows);
// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1;
// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
// tblkIdx++;
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// } else {
// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
// int round = 0;
// int dIter = 0;
// while (true) {
// int rowsRead =
// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
// if (rowsRead == 0) break;
// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
// if (round == 0) {
// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// } else {
// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
// }
// round++;
// tblkIdx++;
// }
// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
// }
// }
// }
// return 0;
// }
static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) {
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
int len = 0; int len = 0;
...@@ -1293,13 +1147,16 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1293,13 +1147,16 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SFile * pWFile = NULL; SFile * pWFile = NULL;
SFile * pRFile = NULL; SFile * pRFile = NULL;
SBlock newBlock = {0}; SBlock newBlock = {0};
int ftype = 0;
if (pBlock->last) { if (pBlock->last) {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
ftype = TSDB_FILE_TYPE_LAST;
} else { } else {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA); pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_DATA);
ftype = TSDB_FILE_TYPE_DATA;
} }
// TODO: use flag to omit this string compare. this may cause a lot of time // TODO: use flag to omit this string compare. this may cause a lot of time
...@@ -1316,7 +1173,7 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1316,7 +1173,7 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
} }
} else { } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, ftype, pReadH->pDataCols[0], &newBlock, true) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
} }
...@@ -1385,8 +1242,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1385,8 +1242,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock && if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock &&
pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) { pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) {
pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1;
if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, &newBlock, true, false) < 0) return -1;
// TODO: refactor code here // TODO: refactor code here
if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1;
} else { } else {
...@@ -1412,9 +1268,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1412,9 +1268,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, INT32_MAX, pDataCols, tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, INT32_MAX, pDataCols,
pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows);
ASSERT(pDataCols->numOfRows == rows); ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, &newBlock, if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1;
true, false) < 0)
return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO if (tsdbInsertSubBlock() < 0) return -1; // TODO
} else { } else {
...@@ -1457,8 +1311,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1457,8 +1311,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
rows = tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, pBlock->keyFirst - 1, dbrows, pDataCols, NULL, 0); rows = tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, pBlock->keyFirst - 1, dbrows, pDataCols, NULL, 0);
ASSERT(rows > 0); ASSERT(rows > 0);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0)
false, true) < 0)
return -1; return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
...@@ -1487,9 +1340,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1487,9 +1340,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, rows, pDataCols, pReadH->pDataCols[0]->cols[0].pData, tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, rows, pDataCols, pReadH->pDataCols[0]->cols[0].pData,
pBlock->numOfRows); pBlock->numOfRows);
ASSERT(pDataCols->numOfRows == rows); ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, false) < 0) return -1;
false, false) < 0)
return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO if (tsdbInsertSubBlock() < 0) return -1; // TODO
} else { } else {
...@@ -1498,9 +1349,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1498,9 +1349,7 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
rows = tsdbLoadMergeFromCache(pTSCh, keyLimit); rows = tsdbLoadMergeFromCache(pTSCh, keyLimit);
if (rows == 0) break; if (rows == 0) break;
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1;
false, true) < 0)
return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册