提交 8fb2c5a2 编写于 作者: H Hongze Cheng

refactor more code

上级 d2b5bf71
...@@ -513,7 +513,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); ...@@ -513,7 +513,7 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock);
static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) {
ASSERT(size > 0); ASSERT(size > 0);
void *pBuf = *pBuf; void *pBuf = *ppBuf;
uint32_t tsize = taosTSizeof(pBuf); uint32_t tsize = taosTSizeof(pBuf);
if (tsize >= size) return 0; if (tsize >= size) return 0;
......
...@@ -25,6 +25,7 @@ ...@@ -25,6 +25,7 @@
#define TSDB_META_FILE_CHANGE 1 #define TSDB_META_FILE_CHANGE 1
#define TSDB_COMMIT_OVER 2 #define TSDB_COMMIT_OVER 2
#define TSDB_MAX_SUBBLOCKS 8
#define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5) #define TSDB_DEFAULT_ROWS_TO_COMMIT(maxRows) ((maxRows) * 4 / 5)
typedef struct { typedef struct {
...@@ -243,7 +244,7 @@ static int tsdbCommitMetaData(SCommitHandle *pCommitH) { ...@@ -243,7 +244,7 @@ static int tsdbCommitMetaData(SCommitHandle *pCommitH) {
} }
} }
if (tdKVStoreEndCommit(pMeta->pStore, false /*hasError = false*/) < 0) { if (tdKVStoreEndCommit(pStore, false /*hasError = false*/) < 0) {
tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno));
return -1; return -1;
} }
...@@ -651,12 +652,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { ...@@ -651,12 +652,12 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
return 0; return 0;
} }
if (tsdbLoadBlockInfo(pReadH, tid) < 0) { if (tsdbLoadBlockInfo(pReadH) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return -1; return -1;
} }
if (tsdbCommitTableDataImpl(pTSCh) < 0) { if (tsdbCommitTableDataImpl(pTSCh, tid) < 0) {
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return -1; return -1;
} }
...@@ -736,173 +737,6 @@ static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCol ...@@ -736,173 +737,6 @@ static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCol
return 0; return 0;
} }
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx <= (int)pIdx->numOfBlocks);
ASSERT(pCompBlock->numOfSubBlocks == 1);
// Adjust memory if no more room
if (pIdx->len == 0) pIdx->len = sizeof(SBlockInfo) + sizeof(TSCKSUM);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, pIdx->len + sizeof(SBlockInfo)) < 0) goto _err;
// Change the offset
for (uint32_t i = 0; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
}
// Memmove if needed
int tsize = pIdx->len - (sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx);
if (tsize > 0) {
ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) < taosTSizeof(pHelper->pCompInfo));
ASSERT(sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1) + tsize <= taosTSizeof(pHelper->pCompInfo));
memmove(POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * (blkIdx + 1)),
POINTER_SHIFT(pHelper->pCompInfo, sizeof(SBlockInfo) + sizeof(SBlock) * blkIdx), tsize);
}
pHelper->pCompInfo->blocks[blkIdx] = *pCompBlock;
pIdx->numOfBlocks++;
pIdx->len += sizeof(SBlock);
ASSERT(pIdx->len <= taosTSizeof(pHelper->pCompInfo));
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
if (pIdx->numOfBlocks > 1) {
ASSERT(pHelper->pCompInfo->blocks[0].keyLast < pHelper->pCompInfo->blocks[1].keyFirst);
}
tsdbDebug("vgId:%d tid:%d a super block is inserted at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
_err:
return -1;
}
static int tsdbAddSubBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx, int rowsAdded) {
ASSERT(pCompBlock->numOfSubBlocks == 0);
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
size_t spaceNeeded =
(pSCompBlock->numOfSubBlocks == 1) ? pIdx->len + sizeof(SBlock) * 2 : pIdx->len + sizeof(SBlock);
if (tsdbAdjustInfoSizeIfNeeded(pHelper, spaceNeeded) < 0) goto _err;
pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
// Add the sub-block
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len));
if (tsize > 0) {
memmove((void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len + sizeof(SBlock)),
(void *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len), tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += sizeof(SBlock);
}
}
*(SBlock *)((char *)(pHelper->pCompInfo) + pSCompBlock->offset + pSCompBlock->len) = *pCompBlock;
pSCompBlock->numOfSubBlocks++;
ASSERT(pSCompBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
pSCompBlock->len += sizeof(SBlock);
pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->keyFirst = MIN(pSCompBlock->keyFirst, pCompBlock->keyFirst);
pSCompBlock->keyLast = MAX(pSCompBlock->keyLast, pCompBlock->keyLast);
pIdx->len += sizeof(SBlock);
} else { // Need to create two sub-blocks
void *ptr = NULL;
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) {
ptr = POINTER_SHIFT(pHelper->pCompInfo, pTCompBlock->offset);
break;
}
}
if (ptr == NULL) ptr = POINTER_SHIFT(pHelper->pCompInfo, pIdx->len - sizeof(TSCKSUM));
size_t tsize = pIdx->len - ((char *)ptr - (char *)(pHelper->pCompInfo));
if (tsize > 0) {
memmove(POINTER_SHIFT(ptr, sizeof(SBlock) * 2), ptr, tsize);
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = pHelper->pCompInfo->blocks + i;
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset += (sizeof(SBlock) * 2);
}
}
((SBlock *)ptr)[0] = *pSCompBlock;
((SBlock *)ptr)[0].numOfSubBlocks = 0;
((SBlock *)ptr)[1] = *pCompBlock;
pSCompBlock->numOfSubBlocks = 2;
pSCompBlock->numOfRows += rowsAdded;
pSCompBlock->offset = ((char *)ptr) - ((char *)pHelper->pCompInfo);
pSCompBlock->len = sizeof(SBlock) * 2;
pSCompBlock->keyFirst = MIN(((SBlock *)ptr)[0].keyFirst, ((SBlock *)ptr)[1].keyFirst);
pSCompBlock->keyLast = MAX(((SBlock *)ptr)[0].keyLast, ((SBlock *)ptr)[1].keyLast);
pIdx->len += (sizeof(SBlock) * 2);
}
pIdx->maxKey = pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].keyLast;
pIdx->hasLast = (uint32_t)pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last;
tsdbDebug("vgId:%d tid:%d a subblock is added at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid, blkIdx);
return 0;
_err:
return -1;
}
static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SBlock *pCompBlock, int blkIdx) {
ASSERT(pCompBlock->numOfSubBlocks == 1);
SBlockIdx *pIdx = &(pHelper->curCompIdx);
ASSERT(blkIdx >= 0 && blkIdx < (int)pIdx->numOfBlocks);
SBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
ASSERT(pSCompBlock->numOfSubBlocks >= 1);
// Delete the sub blocks it has
if (pSCompBlock->numOfSubBlocks > 1) {
size_t tsize = (size_t)(pIdx->len - (pSCompBlock->offset + pSCompBlock->len));
if (tsize > 0) {
memmove(POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset),
POINTER_SHIFT(pHelper->pCompInfo, pSCompBlock->offset + pSCompBlock->len), tsize);
}
for (uint32_t i = blkIdx + 1; i < pIdx->numOfBlocks; i++) {
SBlock *pTCompBlock = &pHelper->pCompInfo->blocks[i];
if (pTCompBlock->numOfSubBlocks > 1) pTCompBlock->offset -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks);
}
pIdx->len -= (sizeof(SBlock) * pSCompBlock->numOfSubBlocks);
}
*pSCompBlock = *pCompBlock;
pIdx->maxKey = blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->keyLast;
pIdx->hasLast = (uint32_t)blockAtIdx(pHelper, pIdx->numOfBlocks - 1)->last;
tsdbDebug("vgId:%d tid:%d a super block is updated at index %d", REPO_ID(pHelper->pRepo), pHelper->tableInfo.tid,
blkIdx);
return 0;
}
static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup) {
ASSERT(pOldGroup->fileId == pNewGroup->fileId); ASSERT(pOldGroup->fileId == pNewGroup->fileId);
...@@ -976,13 +810,21 @@ static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { ...@@ -976,13 +810,21 @@ static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) {
pBlockInfo->tid = TABLE_TID(pReadH->pTable); pBlockInfo->tid = TABLE_TID(pReadH->pTable);
if (pTSCh->nSubBlocks > 0) { if (pTSCh->nSubBlocks > 0) {
if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tlen) < 0) { if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tlen) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
memcpy(POINTER_SHIFT(pTSCh->pBlockInfo, sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks), memcpy(POINTER_SHIFT(pTSCh->pBlockInfo, sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks),
(void *)pTSCh->pSubBlock, sizeof(SBlock) * pTSCh->nSubBlocks); (void *)pTSCh->pSubBlock, sizeof(SBlock) * pTSCh->nSubBlocks);
int64_t oShift = sizeof(SBlockInfo) + sizeof(SBlock) * pTSCh->nBlocks;
for (int i = 0; i < pTSCh->nBlocks; i++) {
SBlock *pBlock = pTSCh->pBlockInfo->blocks + i;
ASSERT(pBlock->numOfSubBlocks >= 1 && pBlock->numOfSubBlocks <= TSDB_MAX_SUBBLOCKS);
if (pBlock->numOfSubBlocks > 1) pBlock->offset += oShift;
}
} }
taosCalcChecksumAppend(0, (uint8_t *)(pTSCh->pBlockInfo), tlen); taosCalcChecksumAppend(0, (uint8_t *)(pTSCh->pBlockInfo), tlen);
...@@ -1142,7 +984,7 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { ...@@ -1142,7 +984,7 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // default block rows int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // default block rows
tdResetDataCols(pDataCols); tdResetDataCols(pDataCols);
int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, dbrows, pDataCols, NULL, 0); int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows, pDataCols, NULL, 0);
ASSERT(rows > 0 && pDataCols->numOfRows == rows); ASSERT(rows > 0 && pDataCols->numOfRows == rows);
if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, pBlock) < 0) return -1; if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, pBlock) < 0) return -1;
...@@ -1152,10 +994,13 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) { ...@@ -1152,10 +994,13 @@ static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
} }
static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) {
if (pBlock->last) { // merge with the last block SReadHandle *pReadH = pTSCh->pReadH;
if (tsdbCommitMergeLastBlock(pTSCh, pBlock) < 0) return -1; if (pBlock->last) {
} else { // merge with a data block ASSERT(POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock) == (pReadH->nBlockIdx - 1));
if (tsdbCommitMergeDataBlock(pTSCh, pBlock) < 0) return -1; ASSERT(pReadH->pCurBlockIdx->hasLast);
if (tsdbMergeLastBlock(pTSCh, pBlock) < 0) return -1;
} else {
if (tsdbMergeDataBlock(pTSCh, pBlock) < 0) return -1;
} }
return 0; return 0;
...@@ -1175,7 +1020,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1175,7 +1020,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true);
if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -1192,7 +1037,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1192,7 +1037,7 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
nColsNotAllNull++; nColsNotAllNull++;
csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); csize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull);
if (tsdbAllocBuf(&((void *)pReadH->pBlockData), csize) < 0) { if (tsdbAllocBuf(&(pReadH->pBlockData), csize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -1298,150 +1143,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols ...@@ -1298,150 +1143,150 @@ static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols
return 0; return 0;
} }
static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { // static int tsdbCommitMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SDataCols * pDataCols = pTSCh->pDataCols; // SDataCols * pDataCols = pTSCh->pDataCols;
SReadHandle *pReadH = pTSCh->pReadH; // SReadHandle *pReadH = pTSCh->pReadH;
STable * pTable = pTSCh->pReadH->pTable; // STable * pTable = pTSCh->pReadH->pTable;
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable); // SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
STsdbRepo * pRepo = pReadH->pRepo; // STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config); // STsdbCfg * pCfg = &(pRepo->config);
int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock); // int blockCommitRows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
SBlock nblock = {0}; // SBlock nblock = {0};
SBlock * pNBlock = &nblock; // SBlock * pNBlock = &nblock;
TSKEY nextKey = tsdbNextIterKey(pIter->pIter); // TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock); // int bidx = POINTER_DISTANCE(pBlock, pReadH->pBlockInfo->blocks) / sizeof(SBlock);
if (nextKey > pBlock->keyLast) { // just merge and append // if (nextKey > pBlock->keyLast) { // just merge and append
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows, // int rowsToRead = tsdbLoadDataFromCache(pTable, pIter->pIter, pReadH->maxKey, blockCommitRows - pBlock->numOfRows,
pDataCols, NULL, 0); // pDataCols, NULL, 0);
ASSERT(rowsToRead > 0); // ASSERT(rowsToRead > 0);
if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS && // if (rowsToRead + pBlock->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUB_BLOCKS &&
/* No new last file is opened*/) { // /* No new last file is opened*/) {
if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true, // if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST), pDataCols, pNBlock, true,
false) < 0) { // false) < 0) {
return -1; // return -1;
}; // };
if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1; // if (tsdbAddSubBlock(pTSCh, pNBlock, NULL) < 0) return -1;
} else { // } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; // if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1; // if (tsdbWriteBlockToProperFile(pReadH, pReadH->pDataCols[0], pNBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1; // if (tsdbAddSuperBlock(pTSCh, pNBlock) < 0) return -1;
} // }
} else { // } else {
if (/* append the old last file */) { // if (/* append the old last file */) {
SSkipListIterator titer = *(pIter->pIter); // SSkipListIterator titer = *(pIter->pIter);
int16_t colId = 0; // int16_t colId = 0;
if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1; // if (tsdbLoadBlockDataCols(pReadH, pBlock, NULL, &colId, 1) < 0) return -1;
int rowsToRead = tsdbLoadDataFromCache(); // int rowsToRead = tsdbLoadDataFromCache();
if (rowsToRead == 0) { // if (rowsToRead == 0) {
*(pIter->pIter) = titer; // *(pIter->pIter) = titer;
tsdbCopyBlocks(); // tsdbCopyBlocks();
} else { // } else {
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) { // if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rowsToRead + Block->numOfRows < pCfg->minRowsPerFileBlock) {
tsdbLoadDataFromCache(); // tsdbLoadDataFromCache();
if (tsdbWriteBlockToFile() < 0) return -1; // if (tsdbWriteBlockToFile() < 0) return -1;
if (tsdbaddsubblock() < 0) return -1; // if (tsdbaddsubblock() < 0) return -1;
} else { // } else {
if (tasdbloadblockdata() < 0) return -1; // if (tasdbloadblockdata() < 0) return -1;
while (true) // while (true)
{ // {
tsdbLoadAndMergeFromCache(); // tsdbLoadAndMergeFromCache();
} // }
} // }
} // }
} else { // } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
while (true) { // while (true) {
tsdbLoadAndMergeFromCache(); // tsdbLoadAndMergeFromCache();
} // }
} // }
} // }
return 0; // return 0;
} // }
static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { // static int tsdbCommitMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); // TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
TSKEY blkKeyFirst = pCompBlock->keyFirst; // TSKEY blkKeyFirst = pCompBlock->keyFirst;
TSKEY blkKeyLast = pCompBlock->keyLast; // TSKEY blkKeyLast = pCompBlock->keyLast;
if (keyFirst < blkKeyFirst) { // if (keyFirst < blkKeyFirst) {
while (true) { // while (true) {
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = // int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); // tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break; // if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++; // tblkIdx++;
} // }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
ASSERT(keyFirst <= blkKeyLast); // ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0; // int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; // if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
slIter = *(pCommitIter->pIter); // slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); // int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, // int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows); // pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out // if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter; // *(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; // int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { // if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2; // int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, // int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); // pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; // if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; // tblkIdx++;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; // int round = 0;
int dIter = 0; // int dIter = 0;
while (true) { // while (true) {
int rowsRead = // int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); // tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break; // if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (round == 0) { // if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { // } else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} // }
round++; // round++;
tblkIdx++; // tblkIdx++;
} // }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} // }
} // }
} // }
return 0; // return 0;
} // }
static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) { static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh) {
SReadHandle *pReadH = pTSCh->pReadH; SReadHandle *pReadH = pTSCh->pReadH;
...@@ -1474,7 +1319,7 @@ static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) { ...@@ -1474,7 +1319,7 @@ static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup) {
} }
static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) { static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh) {
if (tsdbAllocBuf(&((void *)(pTSCh->pBlockIdx)), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) { if (tsdbAllocBuf(&(pTSCh->pBlockIdx), sizeof(SBlockIdx) * (pTSCh->nBlockIdx + 1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -1491,6 +1336,8 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { ...@@ -1491,6 +1336,8 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) {
SFile * pRFile = NULL; SFile * pRFile = NULL;
SBlock newBlock = {0}; SBlock newBlock = {0};
ASSERT(pBlock->numOfSubBlocks >= 1);
if (pBlock->last) { if (pBlock->last) {
pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST); pWFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST); pRFile = TSDB_FILE_IN_FGROUP(&(pReadH->fGroup), TSDB_FILE_TYPE_LAST);
...@@ -1502,14 +1349,19 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) { ...@@ -1502,14 +1349,19 @@ static int tsdbCopyBlock(STSCommitHandle *pTSCh, int bidx) {
// TODO: use flag to omit this string compare. this may cause a lot of time // TODO: use flag to omit this string compare. this may cause a lot of time
if (strncmp(pWFile->fname, pRFile->fname, TSDB_FILENAME_LEN) == 0) { if (strncmp(pWFile->fname, pRFile->fname, TSDB_FILENAME_LEN) == 0) {
if (pBlock->numOfSubBlocks == 1) { if (pBlock->numOfSubBlocks == 1) {
if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1;
} else { // need to copy both super block and sub-blocks
newBlock = *pBlock;
newBlock.offset = sizeof(SBlock) * pTSCh->nSubBlocks;
} else { // need to copy both super block and sub-blocks if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
if (tsdbAddSubBlocks(pTSCh, POINTER_SHIFT(pReadH->pBlockInfo, pBlock->offset), pBlock->numOfSubBlocks) < 0)
return -1;
} }
} else { } else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1; if (tsdbWriteBlockToFile(pTSCh, pWFile, pReadH->pDataCols[0], &newBlock, pBlock->last, true) < 0) return -1;
// TODO: add a super block if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
} }
return 0; return 0;
...@@ -1529,8 +1381,10 @@ static int compareKeyBlock(const void *arg1, const void *arg2) { ...@@ -1529,8 +1381,10 @@ static int compareKeyBlock(const void *arg1, const void *arg2) {
} }
static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
int tsize = sizeof(SBlockInfo) + sizeof(SBlock) * (pTSCh->nBlocks + 1) + sizeof(TSCKSUM); ASSERT(pBlock->numOfSubBlocks > 0);
if (tsdbAllocBuf(&((void *)pTSCh->pBlockInfo), tsize) < 0) {
int tsize = TSDB_BLOCK_INFO_LEN(pTSCh->nBlocks + 1);
if (tsdbAllocBuf(&(pTSCh->pBlockInfo), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
...@@ -1538,5 +1392,64 @@ static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ...@@ -1538,5 +1392,64 @@ static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
ASSERT(pTSCh->nBlocks == 0 || pBlock->keyFirst > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast); ASSERT(pTSCh->nBlocks == 0 || pBlock->keyFirst > pTSCh->pBlockInfo->blocks[pTSCh->nBlocks - 1].keyLast);
pTSCh->pBlockInfo->blocks[pTSCh->nBlocks++] = *pBlock; pTSCh->pBlockInfo->blocks[pTSCh->nBlocks++] = *pBlock;
return 0;
}
static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks) {
int tBlocks = pTSCh->nSubBlocks + nBlocks;
int tsize = sizeof(SBlock) * tBlocks;
if (tsdbAllocBuf(&(pTSCh->pSubBlock), tsize) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
memcpy((void *)(&pTSCh->pSubBlock[pTSCh->nSubBlocks]), (void *)pBlocks, sizeof(SBlock) * nBlocks);
pTSCh->nSubBlocks += nBlocks;
return 0;
}
static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
SReadHandle *pReadH = pTSCh->pReadH;
STsdbRepo * pRepo = pReadH->pRepo;
STsdbCfg * pCfg = &(pRepo->config);
SDataCols * pDataCols = pTSCh->pDataCols;
STable * pTable = pReadH->pTable;
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pCfg->maxRowsPerFileBlock);
SBlock newBlock = {0};
SFile * pFile = NULL;
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext > pBlock->keyLast) {
tdResetDataCols(pDataCols);
int rows = tsdbLoadDataFromCache(pTable, pIter->pIter, pTSCh->maxKey, dbrows-pBlock->numOfRows, pDataCols, NULL, 0);
ASSERT(rows > 0);
if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock &&
pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) {
pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_LAST);
if (tsdbWriteBlockToFile(pTSCh, pFile, pDataCols, &newBlock, true, false) < 0) return -1;
// TODO: refactor code here
if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1;
} else {
if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1;
if (tdMergeDataCols(pReadH->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
if (tsdbWriteBlockToRightFile(pTSCh, pReadH->pDataCols[0], &newBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1;
}
} else {
}
return 0;
}
static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) {
// TODO
return 0; return 0;
} }
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册