diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index ebff9a0935619e66c6e59c3b37ec5f17a8419372..4752248d68434cb6431ad7f3f0a545d40939a0da 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -513,7 +513,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { ASSERT(size > 0); - void *pBuf = *pBuf; + void *pBuf = *ppBuf; uint32_t tsize = taosTSizeof(pBuf); if (tsize >= size) return 0; diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index ee294f33a509d8e5886a9e8971fd980b258fd144..d6729eb767c533ec7e2c92132ac9a6a9777d40a0 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -25,6 +25,7 @@ #define TSDB_META_FILE_CHANGE 1 #define TSDB_COMMIT_OVER 2 +#define TSDB_MAX_SUBBLOCKS 8 #define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5) typedef struct { @@ -243,7 +244,7 @@ static int tsdbCommitMetaData(SCommitHandle *pCommitH) { } } - if (tdKVStoreEndCommit(pMeta->pStore, false /*hasError = false*/) < 0) { + if (tdKVStoreEndCommit(pStore, false /*hasError = false*/) < 0) { tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } @@ -651,12 +652,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return 0; } - if (tsdbLoadBlockInfo(pReadH, tid) < 0) { + if (tsdbLoadBlockInfo(pReadH) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); return -1; } - if (tsdbCommitTableDataImpl(pTSCh) < 0) { + if (tsdbCommitTableDataImpl(pTSCh, tid) < 0) { taosRUnLockLatch(&(pIter->pTable->latch)); return -1; } @@ -736,173 +737,6 @@ static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCol return 0; } -static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { - SBlockIdx *pIdx = &(pHelper->curCompIdx); - - ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks); - ASSERT(pCompBlock->numOfSubBlocks == 1); - - // Adjust memory if no more room - if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM); - if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err; - - // Change the offset - for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); - } - - // Memmove if needed - int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx); - if (tsize > 0) { - ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo)); - ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo)); - memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)), - POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize); - } - pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock; - - pIdx->numOfBlocks++; - pIdx->len += sizeof(SBlock); - ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo)); - pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; - pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; - - if (pIdx->numOfBlocks > 1) { - ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst); - } - - tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, - blkIdx); - - return 0; - -_err: - return -1; -} - -static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, int rowsAdded) { - ASSERT(pCompBlock->numOfSubBlocks == 0); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - - SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; - ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); - - size_t spaceNeeded = - (pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock); - if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err; - - pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; - - // Add the sub-block - if (pSCompBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); - if (tsize > 0) { - memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SBlock)), - (void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize); - - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock); - } - } - - *(SBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock; - - pSCompBlock->numOfSubBlocks++; - ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); - pSCompBlock->len += sizeof(SBlock); - pSCompBlock->numOfRows += rowsAdded; - pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst); - pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast); - pIdx->len += sizeof(SBlock); - } else { // Need to create two sub-blocks - void *ptr = NULL; - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; - if (pTCompBlock->numOfSubBlocks > 1) { - ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset); - break; - } - } - - if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM)); - - size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo)); - if (tsize > 0) { - memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize); - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2); - } - } - - ((SBlock *)ptr)[0] = *pSCompBlock; - ((SBlock *)ptr)[0].numOfSubBlocks = 0; - - ((SBlock *)ptr)[1] = *pCompBlock; - - pSCompBlock->numOfSubBlocks = 2; - pSCompBlock->numOfRows += rowsAdded; - pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo); - pSCompBlock->len = sizeof(SBlock) * 2; - pSCompBlock->keyFirst = MIN(((SBlock *)ptr)[0].keyFirst, ((SBlock *)ptr)[1].keyFirst); - pSCompBlock->keyLast = MAX(((SBlock *)ptr)[0].keyLast, ((SBlock *)ptr)[1].keyLast); - - pIdx->len += (sizeof(SBlock) * 2); - } - - pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast; - pIdx->hasLast = (uint32_t)pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last; - - tsdbDebug("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx); - - return 0; - -_err: - return -1; -} - -static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) { - ASSERT(pCompBlock->numOfSubBlocks == 1); - - SBlockIdx *pIdx = &(pHelper->curCompIdx); - - ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks); - - SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx; - - ASSERT(pSCompBlock->numOfSubBlocks >= 1); - - // Delete the sub blocks it has - if (pSCompBlock->numOfSubBlocks > 1) { - size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len)); - if (tsize > 0) { - memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset), - POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize); - } - - for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) { - SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i]; - if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks); - } - - pIdx->len -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks); - } - - *pSCompBlock = *pCompBlock; - - pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast; - pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last; - - tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, - blkIdx); - - return 0; -} - static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { ASSERT(pOldGroup->fileId == pNewGroup->fileId); @@ -976,13 +810,21 @@ static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { pBlockInfo->tid = TABLE_TID(pReadH->pTable); if (pTSCh->nSubBlocks > 0) { - if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tlen) < 0) { + if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tlen) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } memcpy(POINTER_SHIFT(pTSCh->pBlockInfo, sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks), (void *)pTSCh->pSubBlock, sizeof(SBlock) * pTSCh->nSubBlocks); + + int64_t oShift = sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks; + for (int i = 0; i < pTSCh->nBlocks; i++) { + SBlock *pBlock = pTSCh->pBlockInfo->blocks + i; + ASSERT(pBlock->numOfSubBlocks >= 1 && pBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS); + + if (pBlock->numOfSubBlocks > 1) pBlock->offset += oShift; + } } taosCalcChecksumAppend(0, (uint8_t *)(pTSCh->pBlockInfo), tlen); @@ -1142,7 +984,7 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // default block rows tdResetDataCols(pDataCols); - int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, dbrows, pDataCols, NULL, 0); + int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows, pDataCols, NULL, 0); ASSERT(rows > 0 && pDataCols->numOfRows == rows); if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, pBlock) < 0) return -1; @@ -1152,10 +994,13 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { } static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { - if (pBlock->last) { // merge with the last block - if (tsdbCommitMergeLastBlock(pTSCh, pBlock) < 0) return -1; - } else { // merge with a data block - if (tsdbCommitMergeDataBlock(pTSCh, pBlock) < 0) return -1; + SReadHandle *pReadH = pTSCh->pReadH; + if (pBlock->last) { + ASSERT(POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock) == (pReadH->nBlockIdx - 1)); + ASSERT(pReadH->pCurBlockIdx->hasLast); + if (tsdbMergeLastBlock(pTSCh, pBlock) < 0) return -1; + } else { + if (tsdbMergeDataBlock(pTSCh, pBlock) < 0) return -1; } return 0; @@ -1175,7 +1020,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); - if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { + if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1192,7 +1037,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols nColsNotAllNull++; csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); - if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { + if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1298,150 +1143,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols return 0; } -static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { - SDataCols * pDataCols = pTSCh->pDataCols; - SReadHandle *pReadH = pTSCh->pReadH; - STable * pTable = pTSCh->pReadH->pTable; - SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); - STsdbRepo * pRepo = pReadH->pRepo; - STsdbCfg * pCfg = &(pRepo->config); - int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); - SBlock nblock = {0}; - SBlock * pNBlock = &nblock; - - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); - - if (nextKey > pBlock->keyLast) { // just merge and append - tdResetDataCols(pDataCols); - - int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows, - pDataCols, NULL, 0); - ASSERT(rowsToRead > 0); - - if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS && - /* No new last file is opened*/) { - if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true, - false) < 0) { - return -1; - }; - if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1; - } else { - if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; - - if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; - - if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1; - if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1; - } - } else { - if (/* append the old last file */) { - SSkipListIterator titer = *(pIter->pIter); - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1; - - int rowsToRead = tsdbLoadDataFromCache(); - if (rowsToRead == 0) { - *(pIter->pIter) = titer; - tsdbCopyBlocks(); - } else { - if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) { - tsdbLoadDataFromCache(); - if (tsdbWriteBlockToFile() < 0) return -1; - if (tsdbaddsubblock() < 0) return -1; - } else { - if (tasdbloadblockdata() < 0) return -1; - - while (true) - { - tsdbLoadAndMergeFromCache(); - } - } - } - } else { - if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; - - while (true) { - tsdbLoadAndMergeFromCache(); - } - } - } - return 0; -} - -static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { - TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); - TSKEY blkKeyFirst = pCompBlock->keyFirst; - TSKEY blkKeyLast = pCompBlock->keyLast; - - if (keyFirst < blkKeyFirst) { - while (true) { - tdResetDataCols(pDataCols); - int rowsRead = - tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); - if (rowsRead == 0) break; - - ASSERT(rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - ASSERT(keyFirst <= blkKeyLast); - int16_t colId = 0; - if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - - slIter = *(pCommitIter->pIter); - int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); - int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, - pDataCols0->numOfRows); - - if (rows2 == 0) { // all filtered out - *(pCommitIter->pIter) = slIter; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; - - if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { - int rows = (rows1 >= rows3) ? rows3 : rows2; - tdResetDataCols(pDataCols); - int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, - pDataCols0->cols[0].pData, pDataCols0->numOfRows); - ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; - if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; - tblkIdx++; - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } else { - if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; - int round = 0; - int dIter = 0; - while (true) { - int rowsRead = - tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); - if (rowsRead == 0) break; - - if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; - if (round == 0) { - if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } else { - if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; - } - - round++; - tblkIdx++; - } - ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || - tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); - } - } - } - return 0; -} +// static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { +// SDataCols * pDataCols = pTSCh->pDataCols; +// SReadHandle *pReadH = pTSCh->pReadH; +// STable * pTable = pTSCh->pReadH->pTable; +// SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); +// STsdbRepo * pRepo = pReadH->pRepo; +// STsdbCfg * pCfg = &(pRepo->config); +// int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); +// SBlock nblock = {0}; +// SBlock * pNBlock = &nblock; + +// TSKEY nextKey = tsdbNextIterKey(pIter->pIter); +// int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); + +// if (nextKey > pBlock->keyLast) { // just merge and append +// tdResetDataCols(pDataCols); + +// int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows, +// pDataCols, NULL, 0); +// ASSERT(rowsToRead > 0); + +// if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS && +// /* No new last file is opened*/) { +// if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true, +// false) < 0) { +// return -1; +// }; +// if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1; +// } else { +// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + +// if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; + +// if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1; +// if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1; +// } +// } else { +// if (/* append the old last file */) { +// SSkipListIterator titer = *(pIter->pIter); +// int16_t colId = 0; +// if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1; + +// int rowsToRead = tsdbLoadDataFromCache(); +// if (rowsToRead == 0) { +// *(pIter->pIter) = titer; +// tsdbCopyBlocks(); +// } else { +// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) { +// tsdbLoadDataFromCache(); +// if (tsdbWriteBlockToFile() < 0) return -1; +// if (tsdbaddsubblock() < 0) return -1; +// } else { +// if (tasdbloadblockdata() < 0) return -1; + +// while (true) +// { +// tsdbLoadAndMergeFromCache(); +// } +// } +// } +// } else { +// if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + +// while (true) { +// tsdbLoadAndMergeFromCache(); +// } +// } +// } +// return 0; +// } + +// static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { +// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); +// TSKEY blkKeyFirst = pCompBlock->keyFirst; +// TSKEY blkKeyLast = pCompBlock->keyLast; + +// if (keyFirst < blkKeyFirst) { +// while (true) { +// tdResetDataCols(pDataCols); +// int rowsRead = +// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); +// if (rowsRead == 0) break; + +// ASSERT(rowsRead == pDataCols->numOfRows); +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; +// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// tblkIdx++; +// } +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// ASSERT(keyFirst <= blkKeyLast); +// int16_t colId = 0; +// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; + +// slIter = *(pCommitIter->pIter); +// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); +// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, +// pDataCols0->numOfRows); + +// if (rows2 == 0) { // all filtered out +// *(pCommitIter->pIter) = slIter; +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; + +// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { +// int rows = (rows1 >= rows3) ? rows3 : rows2; +// tdResetDataCols(pDataCols); +// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, +// pDataCols0->cols[0].pData, pDataCols0->numOfRows); +// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; +// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; +// tblkIdx++; +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } else { +// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; +// int round = 0; +// int dIter = 0; +// while (true) { +// int rowsRead = +// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); +// if (rowsRead == 0) break; + +// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; +// if (round == 0) { +// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } else { +// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; +// } + +// round++; +// tblkIdx++; +// } +// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || +// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); +// } +// } +// } +// return 0; +// } static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { SReadHandle *pReadH = pTSCh->pReadH; @@ -1474,7 +1319,7 @@ static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) { } static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) { - if (tsdbAllocBuf(&((void *)(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { + if (tsdbAllocBuf(&(pTSCh->pBlockIdx), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1491,6 +1336,8 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { SFile * pRFile = NULL; SBlock newBlock = {0}; + ASSERT(pBlock->numOfSubBlocks >= 1); + if (pBlock->last) { pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); @@ -1502,14 +1349,19 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { // TODO: use flag to omit this string compare. this may cause a lot of time if (strncmp(pWFile->fname, pRFile->fname, TSDB_FILENAME_LEN) == 0) { if (pBlock->numOfSubBlocks == 1) { + if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1; + } else { // need to copy both super block and sub-blocks + newBlock = *pBlock; + newBlock.offset = sizeof(SBlock) * pTSCh->nSubBlocks; - } else { // need to copy both super block and sub-blocks - + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; + if (tsdbAddSubBlocks(pTSCh, POINTER_SHIFT(pReadH->pBlockInfo, pBlock->offset), pBlock->numOfSubBlocks) < 0) + return -1; } } else { if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1; - // TODO: add a super block + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } return 0; @@ -1529,8 +1381,10 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { } static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { - int tsize = sizeof(SBlockInfo) + sizeof(SBlock) * (pTSCh->nBlocks + 1) + sizeof(TSCKSUM); - if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tsize) < 0) { + ASSERT(pBlock->numOfSubBlocks > 0); + + int tsize = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks + 1); + if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tsize) < 0) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } @@ -1538,5 +1392,64 @@ static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ASSERT(pTSCh->nBlocks == 0 || pBlock->keyFirst > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast); pTSCh->pBlockInfo->blocks[pTSCh->nBlocks++] = *pBlock; + return 0; +} + +static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks) { + int tBlocks = pTSCh->nSubBlocks + nBlocks; + int tsize = sizeof(SBlock) * tBlocks; + + if (tsdbAllocBuf(&(pTSCh->pSubBlock), tsize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + memcpy((void *)(&pTSCh->pSubBlock[pTSCh->nSubBlocks]), (void *)pBlocks, sizeof(SBlock) * nBlocks); + pTSCh->nSubBlocks += nBlocks; + + return 0; +} + +static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + SDataCols * pDataCols = pTSCh->pDataCols; + STable * pTable = pReadH->pTable; + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); + int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); + SBlock newBlock = {0}; + SFile * pFile = NULL; + + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); + if (keyNext > pBlock->keyLast) { + tdResetDataCols(pDataCols); + int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows-pBlock->numOfRows, pDataCols, NULL, 0); + ASSERT(rows > 0); + + if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock && + pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) { + pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); + if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, &newBlock, true, false) < 0) return -1; + // TODO: refactor code here + if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; + } else { + if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; + if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + if (tsdbWriteBlockToRightFile(pTSCh, pReadH->pDataCols[0], &newBlock) < 0) return -1; + if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; + } + } else { + + } + + return 0; +} + +static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + // TODO return 0; } \ No newline at end of file