From f41b47e109c51ab70e51587bbd4e2d0508e4f947 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Fri, 16 Oct 2020 09:43:02 +0800 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 2 + src/tsdb/src/tsdbCommit.c | 321 +++++++----------------------------- src/tsdb/src/tsdbReadUtil.c | 2 +- 3 files changed, 67 insertions(+), 258 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index d0b6388227..7d3f1c530d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -609,6 +609,8 @@ int tsdbLoadBlockDataInfo(SReadHandle* pReadH, SBlock* pBlock); #define TSDB_FILE_IN_FGROUP(pGroup, type) (&((pGroup)->files[(type)])) +int tsdbAllocBuf(void** ppBuf, int size); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 0bb548744a..aea38372b2 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -636,200 +636,6 @@ static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { return 0; } -// static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey) { -// STsdbCfg * pCfg = &(pHelper->pRepo->config); -// STable * pTable = pCommitIter->pTable; -// SBlockIdx * pIdx = &(pHelper->curCompIdx); -// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); -// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; -// SBlock compBlock = {0}; - -// ASSERT(pIdx->len <= 0 || keyFirst > pIdx->maxKey); -// if (pIdx->hasLast) { // append to with last block -// ASSERT(pIdx->len > 0); -// SBlock *pCompBlock = blockAtIdx(pHelper, pIdx->numOfBlocks - 1); -// ASSERT(pCompBlock->last && pCompBlock->numOfRows < pCfg->minRowsPerFileBlock); -// tdResetDataCols(pDataCols); -// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock - pCompBlock->numOfRows, -// pDataCols, NULL, 0); -// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); -// if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && -// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { -// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; -// if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1; -// } else { -// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; -// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows); - -// if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, pDataCols->numOfRows) < 0) return -1; -// ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows + pDataCols->numOfRows); - -// if (tsdbWriteBlockToProperFile(pHelper, pHelper->pDataCols[0], &compBlock) < 0) return -1; -// if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1; -// } - -// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; -// } else { -// ASSERT(!pHelper->hasOldLastBlock); -// tdResetDataCols(pDataCols); -// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, defaultRowsInBlock, pDataCols, NULL, 0); -// ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows); - -// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; -// if (tsdbInsertSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks) < 0) return -1; -// } - -// #ifndef NDEBUG -// TSKEY keyNext = tsdbNextIterKey(pCommitIter->pIter); -// ASSERT(keyNext < 0 || keyNext > pIdx->maxKey); -// #endif - -// return 0; -// } - -// static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter, SDataCols *pDataCols, TSKEY maxKey, -// int *blkIdx) { -// STsdbCfg * pCfg = &(pHelper->pRepo->config); -// STable * pTable = pCommitIter->pTable; -// SBlockIdx * pIdx = &(pHelper->curCompIdx); -// SBlock compBlock = {0}; -// TSKEY keyFirst = tsdbNextIterKey(pCommitIter->pIter); -// int defaultRowsInBlock = pCfg->maxRowsPerFileBlock * 4 / 5; -// SDataCols *pDataCols0 = pHelper->pDataCols[0]; - -// SSkipListIterator slIter = {0}; - -// ASSERT(keyFirst <= pIdx->maxKey); - -// SBlock *pCompBlock = taosbsearch((void *)(&keyFirst), (void *)blockAtIdx(pHelper, *blkIdx), -// pIdx->numOfBlocks - *blkIdx, sizeof(SBlock), compareKeyBlock, TD_GE); -// ASSERT(pCompBlock != NULL); -// int tblkIdx = (int32_t)(TSDB_GET_COMPBLOCK_IDX(pHelper, pCompBlock)); - -// if (pCompBlock->last) { -// ASSERT(pCompBlock->numOfRows < pCfg->minRowsPerFileBlock && tblkIdx == pIdx->numOfBlocks - 1); -// int16_t colId = 0; -// slIter = *(pCommitIter->pIter); -// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; -// ASSERT(pDataCols0->numOfRows == pCompBlock->numOfRows); - -// int rows1 = defaultRowsInBlock - pCompBlock->numOfRows; -// int rows2 = -// tsdbLoadDataFromCache(pTable, &slIter, maxKey, rows1, NULL, pDataCols0->cols[0].pData, pDataCols0->numOfRows); -// if (rows2 == 0) { // all data filtered out -// *(pCommitIter->pIter) = slIter; -// } else { -// if (pCompBlock->numOfRows + rows2 < pCfg->minRowsPerFileBlock && -// pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) { -// tdResetDataCols(pDataCols); -// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols, -// pDataCols0->cols[0].pData, pDataCols0->numOfRows); -// ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows); -// if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1; -// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; -// tblkIdx++; -// } else { -// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; -// int round = 0; -// int dIter = 0; -// while (true) { -// tdResetDataCols(pDataCols); -// int rowsRead = -// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, maxKey, defaultRowsInBlock); -// if (rowsRead == 0) break; - -// if (tsdbWriteBlockToProperFile(pHelper, pDataCols, &compBlock) < 0) return -1; -// if (round == 0) { -// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } else { -// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } - -// tblkIdx++; -// round++; -// } -// } -// if (pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false; -// } -// } else { -// TSKEY keyLimit = (tblkIdx == pIdx->numOfBlocks - 1) ? maxKey : (pCompBlock[1].keyFirst - 1); -// TSKEY blkKeyFirst = pCompBlock->keyFirst; -// TSKEY blkKeyLast = pCompBlock->keyLast; - -// if (keyFirst < blkKeyFirst) { -// while (true) { -// tdResetDataCols(pDataCols); -// int rowsRead = -// tsdbLoadDataFromCache(pTable, pCommitIter->pIter, blkKeyFirst - 1, defaultRowsInBlock, pDataCols, NULL, 0); -// if (rowsRead == 0) break; - -// ASSERT(rowsRead == pDataCols->numOfRows); -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1; -// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// tblkIdx++; -// } -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// ASSERT(keyFirst <= blkKeyLast); -// int16_t colId = 0; -// if (tsdbLoadBlockDataCols(pHelper, pCompBlock, NULL, &colId, 1) < 0) return -1; - -// slIter = *(pCommitIter->pIter); -// int rows1 = (pCfg->maxRowsPerFileBlock - pCompBlock->numOfRows); -// int rows2 = tsdbLoadDataFromCache(pTable, &slIter, blkKeyLast, INT_MAX, NULL, pDataCols0->cols[0].pData, -// pDataCols0->numOfRows); - -// if (rows2 == 0) { // all filtered out -// *(pCommitIter->pIter) = slIter; -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// int rows3 = tsdbLoadDataFromCache(pTable, &slIter, keyLimit, INT_MAX, NULL, NULL, 0) + rows2; - -// if (pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && rows1 >= rows2) { -// int rows = (rows1 >= rows3) ? rows3 : rows2; -// tdResetDataCols(pDataCols); -// int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols, -// pDataCols0->cols[0].pData, pDataCols0->numOfRows); -// ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows); -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0) -// return -1; -// if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1; -// tblkIdx++; -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } else { -// if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1; -// int round = 0; -// int dIter = 0; -// while (true) { -// int rowsRead = -// tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock); -// if (rowsRead == 0) break; - -// if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) -// return -1; -// if (round == 0) { -// if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } else { -// if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1; -// } - -// round++; -// tblkIdx++; -// } -// ASSERT(tblkIdx == 0 || (tsdbNextIterKey(pCommitIter->pIter) < 0 || -// tsdbNextIterKey(pCommitIter->pIter) > blockAtIdx(pHelper, tblkIdx - 1)->keyLast)); -// } -// } -// } -// } - -// *blkIdx = tblkIdx; -// return 0; -// } - static int tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows) { int numOfRows = 0; @@ -1123,7 +929,7 @@ static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh) { } static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh) { - // TODO + SFile *pFile = TSDB_FILE_IN_FGROUP(pTSCh->pFGroup, TSDB_FILE_TYPE_HEAD); return 0; } @@ -1221,62 +1027,66 @@ static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock) { return 0; } +static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, SFile *pFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuperBlock) { + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + STsdbCfg * pCfg = &(pRepo->config); + int64_t offset = 0; + SBlockData * pBlockData = NULL; + int nColsNotAllNull = 0; -static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDataCols, SBlock *pCompBlock, - bool isLast, bool isSuperBlock) { - STsdbCfg * pCfg = &(pHelper->pRepo->config); - SBlockData *pCompData = (SBlockData *)(pHelper->pBuffer); - int64_t offset = 0; - int rowsToWrite = pDataCols->numOfRows; - - ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT(isLast ? rowsToWrite < pCfg->minRowsPerFileBlock : true); + ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_END)); + ASSERT(pDataCols->numOfRows > 0 && pDataCols->numOfRows <= pCfg->maxRowsPerFileBlock); + ASSERT(isLast ? pDataCols->numOfRows < pCfg->minRowsPerFileBlock : true); - offset = lseek(pFile->fd, 0, SEEK_END); - if (offset < 0) { - tsdbError("vgId:%d failed to write block to file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } + offset = pFile->info.size; + int32_t bsize = TSDB_BLOCK_DATA_LEN(0); // total block size + int32_t csize = 0; // SBlockCol part size - int nColsNotAllNull = 0; for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + nColsNotAllNull; + SDataCol * pDataCol = pDataCols->cols + ncol; - if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it + if (isNEleNull(pDataCol, pDataCols->numOfRows)) { // all data to commit are NULL, just ignore it continue; } - memset(pCompCol, 0, sizeof(*pCompCol)); + nColsNotAllNull++; + bsize = TSDB_BLOCK_DATA_LEN(nColsNotAllNull); + if (tsdbAllocBuf(&(pReadH->pBuf), bsize) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + pBlockData = (SBlockData *)(pReadH->pBuf); + SBlockCol *pBlockCol = pBlockData->cols + (nColsNotAllNull - 1); + + memset(pBlockCol, 0, sizeof(*pBlockCol)); - pCompCol->colId = pDataCol->colId; - pCompCol->type = pDataCol->type; + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; if (tDataTypeDesc[pDataCol->type].getStatisFunc) { (*tDataTypeDesc[pDataCol->type].getStatisFunc)( - (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pCompCol->min), &(pCompCol->max), - &(pCompCol->sum), &(pCompCol->minIndex), &(pCompCol->maxIndex), &(pCompCol->numOfNull)); + (TSKEY *)(pDataCols->cols[0].pData), pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); } - nColsNotAllNull++; } + csize = bsize; + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); // Compress the data if neccessary int tcol = 0; int32_t toffset = 0; - int32_t tsize = TSDB_GET_COMPCOL_LEN(nColsNotAllNull); - int32_t lsize = tsize; int32_t keyLen = 0; for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { if (ncol != 0 && tcol >= nColsNotAllNull) break; - SDataCol *pDataCol = pDataCols->cols + ncol; - SBlockCol *pCompCol = pCompData->cols + tcol; + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + tcol; - if (ncol != 0 && (pDataCol->colId != pCompCol->colId)) continue; - void *tptr = POINTER_SHIFT(pCompData, lsize); + if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; + void *tptr = POINTER_SHIFT(pBlockData, lsize); int32_t flen = 0; // final length int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); @@ -1290,9 +1100,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa } } - flen = (*(tDataTypeDesc[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, pCfg->compression, - pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); + flen = (*(tDataTypeDesc[pDataCol->type].compFunc))( + (char *)pDataCol->pData, tlen, rowsToWrite, tptr, (int32_t)taosTSizeof(pHelper->pBuffer) - lsize, + pCfg->compression, pHelper->compBuffer, (int32_t)taosTSizeof(pHelper->compBuffer)); } else { flen = tlen; memcpy(tptr, pDataCol->pData, flen); @@ -1306,8 +1116,8 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM)), sizeof(TSCKSUM)); if (ncol != 0) { - pCompCol->offset = toffset; - pCompCol->len = flen; + pBlockCol->offset = toffset; + pBlockCol->len = flen; tcol++; } else { keyLen = flen; @@ -1317,44 +1127,41 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa lsize += flen; } - pCompData->delimiter = TSDB_FILE_DELIMITER; - pCompData->uid = pHelper->tableInfo.uid; - pCompData->numOfCols = nColsNotAllNull; + pBlockData->delimiter = TSDB_FILE_DELIMITER; + pBlockData->uid = pHelper->tableInfo.uid; + pBlockData->numOfCols = nColsNotAllNull; - taosCalcChecksumAppend(0, (uint8_t *)pCompData, tsize); - pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pCompData, tsize - sizeof(TSCKSUM)), + taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); + pFile->info.magic = taosCalcChecksum(pFile->info.magic, (uint8_t *)POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM)), sizeof(TSCKSUM)); // Write the whole block to file - if (taosTWrite(pFile->fd, (void *)pCompData, lsize) < lsize) { + if (taosTWrite(pFile->fd, (void *)pBlockData, lsize) < lsize) { tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(helperRepo(pHelper)), lsize, pFile->fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; + return -1; } - // Update pCompBlock membership vairables - pCompBlock->last = isLast; - pCompBlock->offset = offset; - pCompBlock->algorithm = pCfg->compression; - pCompBlock->numOfRows = rowsToWrite; - pCompBlock->len = lsize; - pCompBlock->keyLen = keyLen; - pCompBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; - pCompBlock->numOfCols = nColsNotAllNull; - pCompBlock->keyFirst = dataColsKeyFirst(pDataCols); - pCompBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); + // Update pBlock membership vairables + pBlock->last = isLast; + pBlock->offset = offset; + pBlock->algorithm = pCfg->compression; + pBlock->numOfRows = rowsToWrite; + pBlock->len = lsize; + pBlock->keyLen = keyLen; + pBlock->numOfSubBlocks = isSuperBlock ? 1 : 0; + pBlock->numOfCols = nColsNotAllNull; + pBlock->keyFirst = dataColsKeyFirst(pDataCols); + pBlock->keyLast = dataColsKeyAt(pDataCols, rowsToWrite - 1); tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pCompBlock->offset), - (int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst, - pCompBlock->keyLast); + REPO_ID(helperRepo(pHelper)), pHelper->tableInfo.tid, pFile->fname, (int64_t)(pBlock->offset), + (int)(pBlock->numOfRows), pBlock->len, pBlock->numOfCols, pBlock->keyFirst, + pBlock->keyLast); - pFile->info.size += pCompBlock->len; + pFile->info.size += pBlock->len; return 0; - -_err: - return -1; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index a9da2e1374..e8f88ff0be 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -519,7 +519,7 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH) { return 0; } -static int tsdbAllocBuf(void **ppBuf, int size) { +int tsdbAllocBuf(void **ppBuf, int size) { void *pBuf = *pBuf; int tsize = taosTSizeof(pBuf); -- GitLab