提交 9bfa475d 编写于 作者: H Hongze Cheng

refactor more code

上级 8fb2c5a2
......@@ -529,6 +529,7 @@ static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) {
int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx);
void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx);
int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock);
#ifdef __cplusplus
}
......
......@@ -14,6 +14,7 @@
*/
#include <fcntl.h>
#include <limits.h>
#include <sys/stat.h>
#include <sys/types.h>
......@@ -677,49 +678,6 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return 0;
}
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) {
int numOfRows = 0;
TSKEY key1 = INT64_MAX;
TSKEY key2 = INT64_MAX;
STSchema *pSchema = NULL;
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
tdResetDataCols(pTarget);
while (true) {
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
SDataRow row = tsdbNextIterRow(pCommitIter->pIter);
key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row);
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
if (key1 <= key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) {
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints);
}
pTarget->numOfRows++;
(*iter)++;
if (key1 == key2) tSkipListIterNext(pCommitIter->pIter);
} else {
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
ASSERT(pSchema != NULL);
}
tdAppendDataRowToDataCol(row, pSchema, pTarget);
tSkipListIterNext(pCommitIter->pIter);
}
numOfRows++;
if (numOfRows >= maxRows) break;
ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints);
}
return numOfRows;
}
static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) {
STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config);
SFile * pFile = NULL;
......@@ -967,7 +925,7 @@ static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) {
ASSERT(sidx <= eidx);
for (int idx = sidx; idx < eidx; idx++) {
if (tsdbCopyBlock(pTSCh, idx) < 0) return -1;
if (tsdbCopyBlock(pTSCh, pTSCh->pReadH->pBlockInfo->blocks + idx) < 0) return -1;
}
return 0;
......@@ -1328,16 +1286,14 @@ static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) {
return 0;
}
static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) {
SReadHandle *pReadH = pTSCh->pReadH;
SBlock *pBlock = pReadH->pBlockInfo->blocks + bidx;
SFile * pWFile = NULL;
SFile * pRFile = NULL;
SBlock newBlock = {0};
static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pBlock->numOfSubBlocks >= 1);
SReadHandle *pReadH = pTSCh->pReadH;
SFile * pWFile = NULL;
SFile * pRFile = NULL;
SBlock newBlock = {0};
if (pBlock->last) {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
......@@ -1422,7 +1378,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SFile * pFile = NULL;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext > pBlock->keyLast) {
if (keyNext > pBlock->keyLast) { // append merge last block
tdResetDataCols(pDataCols);
int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows-pBlock->numOfRows, pDataCols, NULL, 0);
ASSERT(rows > 0);
......@@ -1442,14 +1398,117 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
if (tsdbWriteBlockToRightFile(pTSCh, pReadH->pDataCols[0], &newBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
}
} else {
} else { // sort merge last block
SSkipListIterator titer = *(pIter->pIter);
if (tsdbLoadKeyCol(pReadH, NULL, pBlock) < 0) return -1;
int rows = tsdbLoadDataFromCache(pTable, &titer, pTSCh->maxKey, INT32_MAX, NULL,
pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows);
if (rows == 0) { // all data duplicate
*pIter->pIter = titer;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
} else if (pBlock->numOfRows + rows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true/*TODO: if have same file*/){
tdResetDataCols(pDataCols);
tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, INT32_MAX, pDataCols,
pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows);
ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, &newBlock,
true, false) < 0)
return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) {
tdResetDataCols(pDataCols);
rows = tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey);
if (rows == 0) break;
if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, &newBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
}
}
}
return 0;
}
static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pReadH->pTable);
SDataCols * pDataCols = pTSCh->pDataCols;
SBlock newBlock = {0};
int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock);
int rows = 0;
TSKEY keyLimit = (bidx == pReadH->pCurBlockIdx->numOfBlocks - 1) ? pTSCh->maxKey : (pBlock[1].keyFirst - 1);
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
SSkipListIterator titer = *(pIter->pIter);
ASSERT(bidx < pReadH->pCurBlockIdx->numOfBlocks && pBlock->numOfSubBlocks >= 1);
// Commit data to pBlock->keyFirst - 1 included
if (keyNext < pBlock->keyFirst) {
while (true) {
if (TSDB_KEY_BEYOND_RANGE(keyNext, pBlock->keyFirst - 1)) break;
tdResetDataCols(pDataCols);
rows = tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, pBlock->keyFirst - 1, dbrows, pDataCols, NULL, 0);
ASSERT(rows > 0);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock,
false, true) < 0)
return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
keyNext = tsdbNextIterKey(pIter->pIter);
}
}
if (keyNext > pBlock->keyLast) {
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
return 0;
}
// Commit data to keyLimit included
if (tsdbLoadKeyCol(pReadH, pBlock, NULL) < 0) return -1;
rows = tsdbLoadDataFromCache(pIter->pTable, &titer, pBlock->keyLast, INT32_MAX, NULL,
pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows);
if (rows == 0) {
*(pIter->pIter) = titer;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
} else if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows + pBlock->numOfRows <= pCfg->maxRowsPerFileBlock &&
true /*TODO: the same block*/) {
int trow = tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT_MAX, NULL, NULL, 0) + rows;
if (trow + pBlock->numOfRows <= pCfg->maxRowsPerFileBlock) rows = trow;
tdResetDataCols(pDataCols);
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, rows, pDataCols, pReadH->pDataCols[0]->cols[0].pData,
pBlock->numOfRows);
ASSERT(pDataCols->numOfRows == rows);
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock,
false, false) < 0)
return -1;
if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1;
if (tsdbInsertSubBlock() < 0) return -1; // TODO
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) {
tdResetDataCols(pDataCols);
rows = tsdbLoadMergeFromCache(pTSCh, keyLimit);
if (rows == 0) break;
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock,
false, true) < 0)
return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
}
}
return 0;
}
static int tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) {
// TODO
return 0;
}
\ No newline at end of file
......@@ -344,6 +344,11 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) {
return 0;
}
int tsdbLoadKeyCol(SReadHandle *pReadH, SBlockInfo *pBlockInfo, SBlock *pBlock) {
int16_t colId = 0;
return tsdbLoadBlockDataCols(pReadH, pBlock, pBlockInfo, &colId, 1);
}
static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols) {
ASSERT(pBlock->numOfSubBlocks <= 1);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册