From 9bfa475dd00fff50b5e4a01d34673445297d191b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 18 Oct 2020 17:45:28 +0800 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 1 + src/tsdb/src/tsdbCommit.c | 169 ++++++++++++++++++++++++------------ src/tsdb/src/tsdbReadUtil.c | 5 ++ 3 files changed, 120 insertions(+), 55 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 4752248d68..a6fc636e10 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -529,6 +529,7 @@ static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx); void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx); +int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index d6729eb767..b70666396e 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -14,6 +14,7 @@ */ #include +#include #include #include @@ -677,49 +678,6 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return 0; } -static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows) { - int numOfRows = 0; - TSKEY key1 = INT64_MAX; - TSKEY key2 = INT64_MAX; - STSchema *pSchema = NULL; - - ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); - tdResetDataCols(pTarget); - - while (true) { - key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - SDataRow row = tsdbNextIterRow(pCommitIter->pIter); - key2 = (row == NULL || dataRowKey(row) > maxKey) ? INT64_MAX : dataRowKey(row); - - if (key1 == INT64_MAX && key2 == INT64_MAX) break; - - if (key1 <= key2) { - for (int i = 0; i < pDataCols->numOfCols; i++) { - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - pTarget->numOfRows++; - (*iter)++; - if (key1 == key2) tSkipListIterNext(pCommitIter->pIter); - } else { - if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendDataRowToDataCol(row, pSchema, pTarget); - tSkipListIterNext(pCommitIter->pIter); - } - - numOfRows++; - if (numOfRows >= maxRows) break; - ASSERT(numOfRows == pTarget->numOfRows && numOfRows <= pTarget->maxPoints); - } - - return numOfRows; -} - static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock) { STsdbCfg *pCfg = &(pTSCh->pReadH->pRepo->config); SFile * pFile = NULL; @@ -967,7 +925,7 @@ static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) { ASSERT(sidx <= eidx); for (int idx = sidx; idx < eidx; idx++) { - if (tsdbCopyBlock(pTSCh, idx) < 0) return -1; + if (tsdbCopyBlock(pTSCh, pTSCh->pReadH->pBlockInfo->blocks + idx) < 0) return -1; } return 0; @@ -1328,16 +1286,14 @@ static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) { return 0; } -static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { - SReadHandle *pReadH = pTSCh->pReadH; - - SBlock *pBlock = pReadH->pBlockInfo->blocks + bidx; - SFile * pWFile = NULL; - SFile * pRFile = NULL; - SBlock newBlock = {0}; - +static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ASSERT(pBlock->numOfSubBlocks >= 1); + SReadHandle *pReadH = pTSCh->pReadH; + SFile * pWFile = NULL; + SFile * pRFile = NULL; + SBlock newBlock = {0}; + if (pBlock->last) { pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); @@ -1422,7 +1378,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { SFile * pFile = NULL; TSKEY keyNext = tsdbNextIterKey(pIter->pIter); - if (keyNext > pBlock->keyLast) { + if (keyNext > pBlock->keyLast) { // append merge last block tdResetDataCols(pDataCols); int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows-pBlock->numOfRows, pDataCols, NULL, 0); ASSERT(rows > 0); @@ -1442,14 +1398,117 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { if (tsdbWriteBlockToRightFile(pTSCh, pReadH->pDataCols[0], &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } - } else { - + } else { // sort merge last block + SSkipListIterator titer = *(pIter->pIter); + + if (tsdbLoadKeyCol(pReadH, NULL, pBlock) < 0) return -1; + int rows = tsdbLoadDataFromCache(pTable, &titer, pTSCh->maxKey, INT32_MAX, NULL, + pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); + if (rows == 0) { // all data duplicate + *pIter->pIter = titer; + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; + } else if (pBlock->numOfRows + rows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true/*TODO: if have same file*/){ + tdResetDataCols(pDataCols); + tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, INT32_MAX, pDataCols, + pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); + ASSERT(pDataCols->numOfRows == rows); + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, &newBlock, + true, false) < 0) + return -1; + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; + if (tsdbInsertSubBlock() < 0) return -1; // TODO + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + while (true) { + tdResetDataCols(pDataCols); + rows = tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey); + if (rows == 0) break; + if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, &newBlock) < 0) return -1; + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; + } + } } return 0; } static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pReadH->pTable); + SDataCols * pDataCols = pTSCh->pDataCols; + SBlock newBlock = {0}; + int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); + int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); + int rows = 0; + TSKEY keyLimit = (bidx == pReadH->pCurBlockIdx->numOfBlocks - 1) ? pTSCh->maxKey : (pBlock[1].keyFirst - 1); + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); + + SSkipListIterator titer = *(pIter->pIter); + + ASSERT(bidx < pReadH->pCurBlockIdx->numOfBlocks && pBlock->numOfSubBlocks >= 1); + + // Commit data to pBlock->keyFirst - 1 included + if (keyNext < pBlock->keyFirst) { + while (true) { + if (TSDB_KEY_BEYOND_RANGE(keyNext, pBlock->keyFirst - 1)) break; + + tdResetDataCols(pDataCols); + rows = tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, pBlock->keyFirst - 1, dbrows, pDataCols, NULL, 0); + ASSERT(rows > 0); + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, + false, true) < 0) + return -1; + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; + + keyNext = tsdbNextIterKey(pIter->pIter); + } + } + + if (keyNext > pBlock->keyLast) { + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; + return 0; + } + + // Commit data to keyLimit included + if (tsdbLoadKeyCol(pReadH, pBlock, NULL) < 0) return -1; + rows = tsdbLoadDataFromCache(pIter->pTable, &titer, pBlock->keyLast, INT32_MAX, NULL, + pReadH->pDataCols[0]->cols[0].pData, pBlock->numOfRows); + + if (rows == 0) { + *(pIter->pIter) = titer; + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; + } else if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows + pBlock->numOfRows <= pCfg->maxRowsPerFileBlock && + true /*TODO: the same block*/) { + int trow = tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT_MAX, NULL, NULL, 0) + rows; + if (trow + pBlock->numOfRows <= pCfg->maxRowsPerFileBlock) rows = trow; + tdResetDataCols(pDataCols); + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, rows, pDataCols, pReadH->pDataCols[0]->cols[0].pData, + pBlock->numOfRows); + ASSERT(pDataCols->numOfRows == rows); + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, + false, false) < 0) + return -1; + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; + if (tsdbInsertSubBlock() < 0) return -1; // TODO + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + while (true) { + tdResetDataCols(pDataCols); + rows = tsdbLoadMergeFromCache(pTSCh, keyLimit); + if (rows == 0) break; + if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_DATA), pDataCols, &newBlock, + false, true) < 0) + return -1; + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; + } + } + + return 0; +} + +static int tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) { // TODO return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 247aa350fd..9b6c855e9a 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -344,6 +344,11 @@ int tsdbLoadBlockDataInfo(SReadHandle *pReadH, SBlock *pBlock) { return 0; } +int tsdbLoadKeyCol(SReadHandle *pReadH, SBlockInfo *pBlockInfo, SBlock *pBlock) { + int16_t colId = 0; + return tsdbLoadBlockDataCols(pReadH, pBlock, pBlockInfo, &colId, 1); +} + static int tsdbLoadBlockDataImpl(SReadHandle *pReadH, SBlock *pBlock, SDataCols *pDataCols) { ASSERT(pBlock->numOfSubBlocks <= 1); -- GitLab