From bad9d5511984558c3a13a3e3ab097cbfb6a2cdca Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 8 Jan 2021 23:45:24 +0800 Subject: [PATCH] more work --- src/tsdb/src/tsdbCommit.c | 219 +++++++++++++++++++++++++++++++++----- 1 file changed, 194 insertions(+), 25 deletions(-) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 1f600d4920..d9743542ff 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -15,6 +15,7 @@ #include "tsdbMain.h" #define TSDB_IVLD_FID INT_MIN +#define TSDB_MAX_SUBBLOCKS 8 typedef struct { int minFid; @@ -495,14 +496,7 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) { pBlock = NULL; } } else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - TSKEY keyLimit; - if (cidx == nBlocks - 1) { - keyLimit = pch->maxKey; - } else { - keyLimit = pBlock[1].keyFirst - 1; - } - - if (tsdbMergeMemData(pch, pIter, pBlock, keyLimit) < 0) { + if (tsdbMergeMemData(pch, pIter, cidx) < 0) { TSDB_RUNLOCK_TABLE(pIter->pTable); return -1; } @@ -874,8 +868,74 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi return 0; } -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, SBlock *pBlock, TSKEY keyLimit) { - // TODO +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + TSKEY keyLimit; + int16_t colId = 0; + SMergeInfo mInfo; + SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; + SBlock block, supBlock; + SDFile * pDFile; + + if (bidx == nBlocks - 1) { + keyLimit = pCommith->maxKey; + } else { + keyLimit = pBlock[1].keyFirst - 1; + } + + SSkipListIterator titer = *(pIter->pIter); + if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; + + tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, + pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); + + if (mInfo.nOperations == 0) { + // no new data to insert (all updates denied) + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + *(pIter->pIter) = titer; + } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { + // Ignore the block + ASSERT(0); + *(pIter->pIter) = titer; + } else if (tsdbCanAddSubBlock()) { + // Add a sub-block + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, + pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, + &mInfo); + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; + + if (pBlock->numOfSubBlocks == 1) { + subBlocks[0] = *pBlock; + subBlocks[0].numOfSubBlocks = 0; + } else { + memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + sizeof(SBlock) * pBlock->numOfSubBlocks); + } + subBlocks[pBlock->numOfSubBlocks] = block; + supBlock = *pBlock; + supBlock.keyFirst = mInfo.keyFirst; + supBlock.keyLast = mInfo.keyLast; + supBlock.numOfSubBlocks++; + supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; + supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); + + if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, pBlock->numOfSubBlocks + 1) < 0) return -1; + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; + } + return 0; } @@ -887,32 +947,141 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) { if (pBlock->numOfSubBlocks == 1) { - if (taosArrayPush(pCommith->aSupBlk, (void *)pBlock) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } + if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1; } else { block = *pBlock; block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock); - if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (taosArrayPushBatch(pCommith->aSubBlk, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + pBlock->numOfSubBlocks) < 0) { return -1; } } } else { if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; - if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } + + return 0; +} + +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { + ASSERT(pSupBlock != NULL); + + if (taosArrayPush(pCommith->aSupBlk, pSupBlock) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pSubBlocks && taosArrayPushBatch(pCommith->aSupBlk, pSubBlocks, nSubBlocks) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; } return 0; +} + +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { + STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlock block; + SDFile * pDFile; + bool isLast; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + + int biter = 0; + while (true) { + tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, + pCfg->update); + + if (pCommith->pDataCols->numOfRows == 0) break; + + if (isLastOneBlock) { + if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } + + +} + +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update) { + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; + + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); + + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + bool isRowDel = false; + SDataRow row = tsdbNextIterRow(pCommitIter->pIter); + if (row == NULL || dataRowKey(row) > maxKey) { + key2 = INT64_MAX; + } else { + key2 = dataRowKey(row); + isRowDel = dataRowDeleted(row); + } + + if (key1 == INT64_MAX && key2 == INT64_MAX) break; + + if (key1 < key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + + pTarget->numOfRows++; + (*iter)++; + } else if (key1 > key2) { + if (!isRowDel) { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendDataRowToDataCol(row, pSchema, pTarget); + } + + tSkipListIterNext(pCommitIter->pIter); + } else { + if (update) { + if (!isRowDel) { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendDataRowToDataCol(row, pSchema, pTarget); + } + } else { + ASSERT(!isRowDel); + + for (int i = 0; i < pDataCols->numOfCols; i++) { + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } + + pTarget->numOfRows++; + } + (*iter)++; + tSkipListIterNext(pCommitIter->pIter); + } + + if (pTarget->numOfRows >= maxRows) break; + } } \ No newline at end of file -- GitLab