提交 62386223 编写于 作者: H Hongze Cheng

refactor more code

上级 55ad0357
...@@ -29,7 +29,10 @@ typedef struct { ...@@ -29,7 +29,10 @@ typedef struct {
SReadHandle *pReadH; SReadHandle *pReadH;
SFileGroup * pFGroup; SFileGroup * pFGroup;
SBlockIdx * pBlockIdx; SBlockIdx * pBlockIdx;
int nBlockIdx;
SBlockInfo * pBlockInfo; SBlockInfo * pBlockInfo;
SBlock * pSubBlock;
int nSubBlock;
SDataCols * pDataCols; SDataCols * pDataCols;
} STSCommitHandle; } STSCommitHandle;
...@@ -60,17 +63,18 @@ int tsdbCommitData(STsdbRepo *pRepo) { ...@@ -60,17 +63,18 @@ int tsdbCommitData(STsdbRepo *pRepo) {
if (tsdbStartCommit(pCommitH) < 0) return -1; if (tsdbStartCommit(pCommitH) < 0) return -1;
if (tsdbCommitTimeSeriesData(pCommitH) < 0) goto _err; if (tsdbCommitTimeSeriesData(pCommitH) < 0) {
tsdbEndCommit(pCommitH, true);
return -1;
}
if (tsdbCommitMetaData(pCommitH) < 0) goto _err; if (tsdbCommitMetaData(pCommitH) < 0) {
tsdbEndCommit(pCommitH, true);
return -1;
}
tsdbEndCommit(pCommitH, false); tsdbEndCommit(pCommitH, false);
return 0; return 0;
_err:
tsdbEndCommit(pCommitH, true);
return -1;
} }
static int tsdbStartCommit(SCommitHandle *pCommitH) { static int tsdbStartCommit(SCommitHandle *pCommitH) {
...@@ -302,28 +306,21 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG ...@@ -302,28 +306,21 @@ static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pOldGroup, SFileG
} }
for (int tid = 1; tid < pTSCh->maxIters; tid++) { for (int tid = 1; tid < pTSCh->maxIters; tid++) {
if (tsdbCommitTableData(pTSCh, tid) < 0) { SCommitIter *pIter = iters + tid;
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); if (pIter->pTable == NULL) continue;
return -1;
}
if (tsdbTryMoveLastBlock(pTSCh) < 0) {
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */);
return -1;
}
if (tsdbWriteBlockInfo(pWHelper) < 0) { if (tsdbCommitTableData(pTSCh, tid) < 0) {
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */);
return -1; return -1;
} }
} }
if (tsdbWriteBlockIdx(pWHelper) < 0) { if (tsdbWriteBlockIdx(pTSCh) < 0) {
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */); tsdbCloseAndUnsetCommitFGroup(pTSCh, true /* hasError = true */);
return -1; return -1;
} }
tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = true */); tsdbCloseAndUnsetCommitFGroup(pTSCh, false /* hasError = false */);
return 0; return 0;
} }
...@@ -610,236 +607,228 @@ static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGr ...@@ -610,236 +607,228 @@ static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGr
static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid; SCommitIter *pIter = pTSCh->pIters + tid;
if (pIter->pTable == NULL) return 0; SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols;
taosRLockLatch(&(pIter->pTable->latch)); taosRLockLatch(&(pIter->pTable->latch));
if (pIter->pIter == NULL) { if (tsdbSetCommitTable(pTSCh, pIter->pTable) < 0) {
// TODO taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
} }
if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { if (tsdbLoadBlockInfo(pReadH) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; taosRUnLockLatch(&(pIter->pTable->latch));
goto _err; return -1;
} }
if (tsdbReadBlockInfo() < 0) { if (tsdbCommitTableDataImpl(pTSCh) < 0) {
goto _err; taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
} }
while (true) { if (tsdbWriteBlockInfo(pTSCh) < 0) {
TSKEY keyNext = tsdbNextIterKey(pIter->pIter); taosRUnLockLatch(&(pIter->pTable->latch));
if (keyNext < 0 || keyNext > maxKey) break; return -1;
if (/* no block info exists*/ || keyNext > pIdx->maxKey) {
if (tsdbProcessAppendCommit() < 0) goto _err;
} else {
if (tsdbProcessMergeCommit() < 0) goto _err;
}
} }
taosRUnLockLatch(&(pIter->pTable->latch)); taosRUnLockLatch(&(pIter->pTable->latch));
return 0; return 0;
_err:
taosRUnLockLatch(&(pIter->pTable->latch));
return -1;
} }
static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { // static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) {
STsdbCfg * pCfg = &(pHelper->pRepo->config); // STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable; // STable * pTable = pCommitIter->pTable;
SBlockIdx * pIdx = &(pHelper->curCompIdx); // SBlockIdx * pIdx = &(pHelper->curCompIdx);
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); // TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; // int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SBlock compBlock = {0}; // SBlock compBlock = {0};
ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); // ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey);
if (pIdx->hasLast) { // append to with last block // if (pIdx->hasLast) { // append to with last block
ASSERT(pIdx->len > 0); // ASSERT(pIdx->len > 0);
SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); // SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1);
ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); // ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock);
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, // int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows,
pDataCols, NULL, 0); // pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && // if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { // pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; // if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
} else { // } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); // ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows);
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; // if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1;
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); // ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; // if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1;
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; // if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
} // }
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; // if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
} else { // } else {
ASSERT(!pHelper->hasOldLastBlock); // ASSERT(!pHelper->hasOldLastBlock);
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); // int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0);
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; // if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1;
} // }
#ifndef NDEBUG // #ifndef NDEBUG
TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); // TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter);
ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); // ASSERT(keyNext < 0 || keyNext > pIdx->maxKey);
#endif // #endif
return 0; // return 0;
} // }
static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, // static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey,
int *blkIdx) { // int *blkIdx) {
STsdbCfg * pCfg = &(pHelper->pRepo->config); // STsdbCfg * pCfg = &(pHelper->pRepo->config);
STable * pTable = pCommitIter->pTable; // STable * pTable = pCommitIter->pTable;
SBlockIdx * pIdx = &(pHelper->curCompIdx); // SBlockIdx * pIdx = &(pHelper->curCompIdx);
SBlock compBlock = {0}; // SBlock compBlock = {0};
TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); // TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter);
int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; // int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5;
SDataCols *pDataCols0 = pHelper->pDataCols[0]; // SDataCols *pDataCols0 = pHelper->pDataCols[0];
SSkipListIterator slIter = {0}; // SSkipListIterator slIter = {0};
ASSERT(keyFirst <= pIdx->maxKey); // ASSERT(keyFirst <= pIdx->maxKey);
SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), // SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx),
pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); // pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE);
ASSERT(pCompBlock != NULL); // ASSERT(pCompBlock != NULL);
int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); // int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock));
if (pCompBlock->last) { // if (pCompBlock->last) {
ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); // ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1);
int16_t colId = 0; // int16_t colId = 0;
slIter = *(pCommitIter->pIter); // slIter = *(pCommitIter->pIter);
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; // if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); // ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows);
int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; // int rows1 = defaultRowsInBlock - pCompBlock->numOfRows;
int rows2 = // int rows2 =
tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); // tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows);
if (rows2 == 0) { // all data filtered out // if (rows2 == 0) { // all data filtered out
*(pCommitIter->pIter) = slIter; // *(pCommitIter->pIter) = slIter;
} else { // } else {
if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && // if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock &&
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { // pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, // int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); // pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; // if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; // tblkIdx++;
} else { // } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; // int round = 0;
int dIter = 0; // int dIter = 0;
while (true) { // while (true) {
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = // int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); // tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock);
if (rowsRead == 0) break; // if (rowsRead == 0) break;
if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; // if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1;
if (round == 0) { // if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { // } else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} // }
tblkIdx++; // tblkIdx++;
round++; // round++;
} // }
} // }
if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; // if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
} // }
} else { // } else {
TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); // TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1);
TSKEY blkKeyFirst = pCompBlock->keyFirst; // TSKEY blkKeyFirst = pCompBlock->keyFirst;
TSKEY blkKeyLast = pCompBlock->keyLast; // TSKEY blkKeyLast = pCompBlock->keyLast;
if (keyFirst < blkKeyFirst) { // if (keyFirst < blkKeyFirst) {
while (true) { // while (true) {
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = // int rowsRead =
tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); // tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0);
if (rowsRead == 0) break; // if (rowsRead == 0) break;
ASSERT(rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
tblkIdx++; // tblkIdx++;
} // }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
ASSERT(keyFirst <= blkKeyLast); // ASSERT(keyFirst <= blkKeyLast);
int16_t colId = 0; // int16_t colId = 0;
if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; // if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1;
slIter = *(pCommitIter->pIter); // slIter = *(pCommitIter->pIter);
int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); // int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows);
int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, // int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData,
pDataCols0->numOfRows); // pDataCols0->numOfRows);
if (rows2 == 0) { // all filtered out // if (rows2 == 0) { // all filtered out
*(pCommitIter->pIter) = slIter; // *(pCommitIter->pIter) = slIter;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; // int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2;
if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { // if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) {
int rows = (rows1 >= rows3) ? rows3 : rows2; // int rows = (rows1 >= rows3) ? rows3 : rows2;
tdResetDataCols(pDataCols); // tdResetDataCols(pDataCols);
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, // int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
pDataCols0->cols[0].pData, pDataCols0->numOfRows); // pDataCols0->cols[0].pData, pDataCols0->numOfRows);
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); // ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
return -1; // return -1;
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; // if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
tblkIdx++; // tblkIdx++;
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} else { // } else {
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; // if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
int round = 0; // int round = 0;
int dIter = 0; // int dIter = 0;
while (true) { // while (true) {
int rowsRead = // int rowsRead =
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); // tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
if (rowsRead == 0) break; // if (rowsRead == 0) break;
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) // if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0)
return -1; // return -1;
if (round == 0) { // if (round == 0) {
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} else { // } else {
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; // if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
} // }
round++; // round++;
tblkIdx++; // tblkIdx++;
} // }
ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || // ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 ||
tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); // tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast));
} // }
} // }
} // }
} // }
*blkIdx = tblkIdx; // *blkIdx = tblkIdx;
return 0; // return 0;
} // }
static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
TSKEY maxKey, int maxRows) { TSKEY maxKey, int maxRows) {
...@@ -1097,19 +1086,18 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro ...@@ -1097,19 +1086,18 @@ static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, STsdbRepo *pOldGro
if (pOldFile->fname[0] == '\0' || if (pOldFile->fname[0] == '\0' ||
strncmp(pOldFile->fname, pNewFile->fname, TSDB_FILENAME_LEN) != 0) { // new file is created strncmp(pOldFile->fname, pNewFile->fname, TSDB_FILENAME_LEN) != 0) { // new file is created
if (tsdbUpdateFileHeader(pNewFile) < 0) { if (tsdbUpdateFileHeader(pNewFile) < 0) {
tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno)); tsdbError("vgId:%d failed to update file %s header since %s", REPO_ID(pRepo), pNewFile->fname, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/); tsdbCloseAndUnsetCommitFGroup(pTSCh, true /*hasError = true*/);
return -1; return -1;
} }
} }
} }
pTSCh->pFGroup = pNewGroup; pTSCh->pFGroup = pNewGroup;
pTSCh->nBlockIdx = 0;
return 0; return 0;
_err:
} }
static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) { static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) {
...@@ -1128,3 +1116,245 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError) ...@@ -1128,3 +1116,245 @@ static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError)
} }
} }
} }
static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) {
// TODO
return 0;
}
static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) {
// TODO
return 0;
}
static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable) {
if (tsdbSetReadTable(pTSCh->pReadH, pTable) < 0) return -1;
if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pTable, false, false, -1)) < 0) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid) {
SCommitIter *pIter = pTSCh->pIters + tid;
SReadHandle *pReadH = pTSCh->pReadH;
SDataCols * pDataCols = pTSCh->pDataCols;
SBlockIdx * pOldIdx = pReadH->pCurBlockIdx;
ASSERT(pOldIdx == NULL || pOldIdx->numOfBlocks > 0);
int sidx = 0;
int eidx = (pOldIdx == NULL) ? 0 : pOldIdx->numOfBlocks;
while (true) {
TSKEY keyNext = tsdbNextIterKey(pIter->pIter);
if (keyNext > maxKey) break;
void *ptr = taosbsearch((void *)keyNext, (void *)(pReadH->pBlockInfo->blocks + sidx), eidx - sidx, sizeof(SBlock),
NULL, TD_GE);
if (ptr == NULL) {
if (sidx < eidx && pOldIdx->hasLast) {
ptr = pReadH->pBlockInfo->blocks + eidx - 1;
}
}
int bidx = 0;
if (ptr == NULL) {
bidx = eidx;
} else {
bidx = POINTER_DISTANCE(ptr, (void *)pReadH->pBlockInfo->blocks) / sizeof(SBlock);
}
if (tsdbCopyBlocks(pTSCh, sidx, bidx) < 0) return -1;
sidx = bidx;
if (ptr == NULL) {
if (tsdbAppendCommit(pTSCh) < 0) return -1;
} else {
if (tsdbMergeCommit(pTSCh, (SBlock *)ptr) < 0) return -1;
sidx++;
}
}
if (tsdbCopyBlocks(pTSCh, sidx, eidx) < 0) return -1;
return 0;
}
static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx) {
ASSERT(sidx <= eidx);
for (int idx = sidx; idx < eidx; idx++) {
// TODO
}
return 0;
}
static int tsdbAppendCommit(STSCommitHandle *pTSCh) {
SDataCols *pDataCols = pTSCh->pDataCols;
SBlock block = {0};
SBlock * pBlock = &block;
tdResetDataCols(pDataCols);
int rowsToRead = tsdbLoadDataFromCache();
ASSERT(rowsToRead > 0);
if (tsdbWriteBlockToProperFile(pTSCh, pDataCols, pBlock) < 0) return -1;
if (tsdbAddSuperBlock(pTSCh, pBlock) < 0) return -1;
return -1;
}
static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) {
STable * pTable = pTSCh->pReadH->pTable;
SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pTable);
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
if (pBlock->last) {
} else {
}
return 0;
}
static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock,
bool isLast, bool isSuperBlock) {
STsdbCfg * pCfg = &(pHelper->pRepo->config);
SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer);
int64_t offset = 0;
int rowsToWrite = pDataCols->numOfRows;
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true);
offset = lseek(pFile->fd, 0, SEEK_END);
if (offset < 0) {
tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
int nColsNotAllNull = 0;
for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull;
if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
continue;
}
memset(pCompCol, 0, sizeof(*pCompCol));
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
if (tDataTypeDesc[pDataCol->type].getStatisFunc) {
(*tDataTypeDesc[pDataCol->type].getStatisFunc)(
(TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max),
&(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull));
}
nColsNotAllNull++;
}
ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols);
// Compress the data if neccessary
int tcol = 0;
int32_t toffset = 0;
int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull);
int32_t lsize = tsize;
int32_t keyLen = 0;
for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) {
if (ncol != 0 && tcol >= nColsNotAllNull) break;
SDataCol *pDataCol = pDataCols->cols + ncol;
SBlockCol *pCompCol = pCompData->cols + tcol;
if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue;
void *tptr = POINTER_SHIFT(pCompData, lsize);
int32_t flen = 0; // final length
int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite);
if (pCfg->compression) {
if (pCfg->compression == TWO_STAGE_COMP) {
pHelper->compBuffer = taosTRealloc(pHelper->compBuffer, tlen + COMP_OVERFLOW_BYTES);
if (pHelper->compBuffer == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
}
}
flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
(int32_t)taosTSizeof(pHelper->pBuffer) - lsize, pCfg->compression,
pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer));
} else {
flen = tlen;
memcpy(tptr, pDataCol->pData, flen);
}
// Add checksum
ASSERT(flen > 0);
flen += sizeof(TSCKSUM);
taosCalcChecksumAppend(0, (uint8_t *)tptr, flen);
pFile->info.magic =
taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM));
if (ncol != 0) {
pCompCol->offset = toffset;
pCompCol->len = flen;
tcol++;
} else {
keyLen = flen;
}
toffset += flen;
lsize += flen;
}
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = pHelper->tableInfo.uid;
pCompData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize);
pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)),
sizeof(TSCKSUM));
// Write the whole block to file
if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname,
strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
goto _err;
}
// Update pCompBlock membership vairables
pCompBlock->last = isLast;
pCompBlock->offset = offset;
pCompBlock->algorithm = pCfg->compression;
pCompBlock->numOfRows = rowsToWrite;
pCompBlock->len = lsize;
pCompBlock->keyLen = keyLen;
pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0;
pCompBlock->numOfCols = nColsNotAllNull;
pCompBlock->keyFirst = dataColsKeyFirst(pDataCols);
pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1);
tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset),
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
pCompBlock->keyLast);
pFile->info.size += pCompBlock->len;
return 0;
_err:
return -1;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册