From 58ca1acd0a87edc23f1472d1e6ca989672e4c2fb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 18 Oct 2020 21:07:13 +0800 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 2 + src/tsdb/src/tsdbCommit.c | 170 ++++++++++++++++++++++++++++++++---- src/tsdb/src/tsdbFile.c | 9 +- src/tsdb/src/tsdbMemTable.c | 8 -- src/tsdb/src/tsdbReadUtil.c | 4 +- src/util/src/tkvstore.c | 17 ++-- 6 files changed, 165 insertions(+), 45 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index a6fc636e10..b2b68bfe7f 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -525,11 +525,13 @@ static FORCE_INLINE int tsdbAllocBuf(void **ppBuf, uint32_t size) { *ppBuf = taosTRealloc(pBuf, tsize); if (*ppBuf == NULL) return -1; + return 0; } int tsdbEncodeBlockIdx(void** buf, SBlockIdx* pBlockIdx); void* tsdbDecodeBlockIdx(void* buf, SBlockIdx* pBlockIdx); int tsdbLoadKeyCol(SReadHandle* pReadH, SBlockInfo* pBlockInfo, SBlock* pBlock); +int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); #ifdef __cplusplus } diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index ff918e396b..b2deb38721 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include #include #include @@ -44,6 +45,7 @@ typedef struct { SBlock * pSubBlock; int nSubBlocks; SDataCols * pDataCols; + int miter; } STSCommitHandle; typedef struct { @@ -63,6 +65,52 @@ typedef struct { SFileGroup nfgroup; } SDataFileChange; +static int tsdbStartCommit(SCommitHandle *pCommitH); +static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError); +static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH); +static int tsdbCommitMetaData(SCommitHandle *pCommitH); +static SCommitIter * tsdbCreateCommitIters(STsdbRepo *pRepo); +static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); +static int tsdbCommitToFileGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup); +static int tsdbHasDataToCommit(STSCommitHandle *pTSCh, TSKEY minKey, TSKEY maxKey); +static STSCommitHandle *tsdbNewTSCommitHandle(STsdbRepo *pRepo); +static void tsdbFreeTSCommitHandle(STSCommitHandle *pTSCh); +static int tsdbLogFileChange(SCommitHandle *pCommitH, STsdbFileChange *pChange); +static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange); +static void * tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange); +static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid); +static int tsdbLogMetaFileChange(SCommitHandle *pCommitH); +static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid); +static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd); +static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key); +static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup); +static void * tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup); +static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup); +static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid); +static int tsdbWriteBlockToRightFile(STSCommitHandle *pTSCh, SDataCols *pDataCols, SBlock *pBlock); +static int tsdbSetAndOpenCommitFGroup(STSCommitHandle *pTSCh, SFileGroup *pOldGroup, SFileGroup *pNewGroup); +static void tsdbCloseAndUnsetCommitFGroup(STSCommitHandle *pTSCh, bool hasError); +static int tsdbWriteBlockInfo(STSCommitHandle *pTSCh); +static int tsdbWriteBlockIdx(STSCommitHandle *pTSCh); +static int tsdbSetCommitTable(STSCommitHandle *pTSCh, STable *pTable); +static int tsdbCommitTableDataImpl(STSCommitHandle *pTSCh, int tid); +static int tsdbCopyBlocks(STSCommitHandle *pTSCh, int sidx, int eidx); +static int tsdbAppendCommit(STSCommitHandle *pTSCh); +static int tsdbMergeCommit(STSCommitHandle *pTSCh, SBlock *pBlock); +static int tsdbWriteBlockToFile(STSCommitHandle *pTSCh, int ftype, SDataCols *pDataCols, SBlock *pBlock, + bool isSuperBlock); +static int tsdbEncodeBlockIdxArray(STSCommitHandle *pTSCh); +static int tsdbUpdateFileGroupInfo(SFileGroup *pFileGroup); +static int tsdbAppendBlockIdx(STSCommitHandle *pTSCh); +static int tsdbCopyBlock(STSCommitHandle *pTSCh, SBlock *pBlock); +static int compareKeyBlock(const void *arg1, const void *arg2); +static int tsdbAddSuperBlock(STSCommitHandle *pTSCh, SBlock *pBlock); +static int tsdbAddSubBlocks(STSCommitHandle *pTSCh, SBlock *pBlocks, int nBlocks); +static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock); +static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock); +static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey); +static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock); + int tsdbCommitData(STsdbRepo *pRepo) { ASSERT(pRepo->commit == 1 && pRepo->imem != NULL); @@ -103,7 +151,7 @@ static int tsdbStartCommit(SCommitHandle *pCommitH) { pCommitH->fd = -1; - tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, pCommitH->fname); + tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pCommitH->fname)); pCommitH->fd = open(pCommitH->fname, O_CREAT | O_WRONLY | O_APPEND, 0755); if (pCommitH->fd < 0) { tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), pCommitH->fname, strerror(errno)); @@ -450,7 +498,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { tsize += taosEncodeString(buf, pMetaChange->oname); tsize += taosEncodeString(buf, pMetaChange->nname); - tsize += tdEncodeStoreInfo(buf, pMetaChange->info); + tsize += tdEncodeStoreInfo(buf, &(pMetaChange->info)); } else if (pChange->type == TSDB_DATA_FILE_CHANGE) { SDataFileChange *pDataChange = (SDataFileChange *)pChange->change; @@ -463,7 +511,7 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { return tsize; } -static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) { +static UNUSED_FUNC void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) { // TODO return buf; } @@ -534,7 +582,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { STsdbFileH *pFileH = pRepo->tsdbFileH; for (int i = 0; i < pFileH->nFGroups; i++) { - SFileGroup *pFGroup = pFileH->pFGroup[i]; + SFileGroup *pFGroup = pFileH->pFGroup + i; if (pFGroup->fileId < mfid) { SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange)); if (pNode == NULL) { @@ -546,7 +594,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { pChange->type = TSDB_DATA_FILE_CHANGE; SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; - pDataFileChange->ofgroup = pFGroup; + pDataFileChange->ofgroup = *pFGroup; if (tsdbLogFileChange(pCommitH, pChange) < 0) { free(pNode); @@ -566,7 +614,7 @@ static int tsdbApplyFileChange(STsdbFileChange *pChange, bool isCommitEnd) { SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change; if (isCommitEnd) { - if (strncmp(pMetaChange->oname, pMetaChange->nname) != 0) { + if (strncmp(pMetaChange->oname, pMetaChange->nname, TSDB_FILENAME_LEN) != 0) { (void)remove(pMetaChange->oname); } } else { // roll back @@ -618,7 +666,7 @@ static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) { return buf; } -static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGroup) { +static void tsdbGetNextCommitFileGroup(STsdbRepo *pRepo, int vid, SFileGroup *pOldGroup, SFileGroup *pNewGroup) { pNewGroup->fileId = pOldGroup->fileId; for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { @@ -1243,7 +1291,7 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { if (pBlock->numOfRows + pDataCols->numOfRows < pCfg->minRowsPerFileBlock && pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && true /*TODO: check if same file*/) { if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; - // TODO: refactor code here + if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; } else { if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; @@ -1270,13 +1318,13 @@ static int tsdbMergeLastBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ASSERT(pDataCols->numOfRows == rows); if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_LAST, pDataCols, &newBlock, false) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; - if (tsdbInsertSubBlock() < 0) return -1; // TODO + if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; } else { if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; while (true) { - tdResetDataCols(pDataCols); - rows = tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey); - if (rows == 0) break; + pTSCh->miter = 0; + tsdbLoadMergeFromCache(pTSCh, pTSCh->maxKey); + if (pDataCols->numOfRows == 0) break; if (tsdbWriteBlockToRightFile(pTSCh, pDataCols, &newBlock) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } @@ -1342,13 +1390,13 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { ASSERT(pDataCols->numOfRows == rows); if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, false) < 0) return -1; if (tsdbCopyBlock(pTSCh, pBlock) < 0) return -1; - if (tsdbInsertSubBlock() < 0) return -1; // TODO + if (tsdbInsertSubBlock(pTSCh, &newBlock) < 0) return -1; } else { if (tsdbLoadBlockData(pReadH, pBlock, NULL) < 0) return -1; while (true) { - tdResetDataCols(pDataCols); - rows = tsdbLoadMergeFromCache(pTSCh, keyLimit); - if (rows == 0) break; + pTSCh->miter = 0; + tsdbLoadMergeFromCache(pTSCh, keyLimit); + if (pDataCols->numOfRows == 0) break; if (tsdbWriteBlockToFile(pTSCh, TSDB_FILE_TYPE_DATA, pDataCols, &newBlock, true) < 0) return -1; if (tsdbAddSuperBlock(pTSCh, &newBlock) < 0) return -1; } @@ -1357,7 +1405,93 @@ static int tsdbMergeDataBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { return 0; } -static int tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) { - // TODO +static void tsdbLoadMergeFromCache(STSCommitHandle *pTSCh, TSKEY maxKey) { + SReadHandle *pReadH = pTSCh->pReadH; + STsdbRepo * pRepo = pReadH->pRepo; + SDataCols * pMCols = pReadH->pDataCols[0]; + SDataCols * pDataCols = pTSCh->pDataCols; + int dbrows = TSDB_DEFAULT_ROWS_TO_COMMIT(pRepo->config.maxRowsPerFileBlock); + SCommitIter *pIter = pTSCh->pIters + TABLE_TID(pReadH->pTable); + TSKEY key1 = 0; + TSKEY key2 = 0; + SDataRow row = NULL; + TSKEY keyNext = 0; + STSchema * pSchema = NULL; + + tdResetDataCols(pDataCols); + + if (pTSCh->miter >= pMCols->numOfRows) { + key1 = INT64_MAX; + } else { + key1 = dataColsKeyAt(pMCols, pTSCh->miter); + } + + keyNext = tsdbNextIterKey(pIter->pIter); + if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) { + key2 = INT64_MAX; + } else { + row = tsdbNextIterRow(pIter->pIter); + key2 = keyNext; + } + + while (true) { + if ((key1 == INT64_MAX && key2 == INT64_MAX) || pDataCols->numOfRows >= dbrows) break; + + if (key1 <= key2) { + for (int i = 0; i < pMCols->numOfCols; i++) { + dataColAppendVal(pDataCols->cols + i, tdGetColDataOfRow(pMCols->cols + i, pTSCh->miter), pDataCols->numOfRows, + pDataCols->maxPoints); + } + pDataCols->numOfRows++; + pTSCh->miter++; + if (key1 == key2) { + tSkipListIterNext(pIter->pIter); + keyNext = tsdbNextIterKey(pIter->pIter); + if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) { + key2 = INT64_MAX; + } else { + row = tsdbNextIterRow(pIter->pIter); + key2 = keyNext; + } + } + } else { + if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pIter->pTable, false, false, dataRowVersion(row)); + ASSERT(pSchema != NULL); + } + + tdAppendDataRowToDataCol(row, pSchema, pDataCols); + tSkipListIterNext(pIter->pIter); + + // update row and key2 + keyNext = tsdbNextIterKey(pIter->pIter); + if (TSDB_KEY_BEYOND_RANGE(keyNext, maxKey)) { + key2 = INT64_MAX; + } else { + row = tsdbNextIterRow(pIter->pIter); + key2 = keyNext; + } + } + } +} + +static int tsdbInsertSubBlock(STSCommitHandle *pTSCh, SBlock *pBlock) { + ASSERT(pBlock->numOfSubBlocks == 0 && pTSCh->nBlocks > 0); + + SBlock *pSuperBlock = pTSCh->pBlockInfo->blocks + pTSCh->nBlocks - 1; + + ASSERT(pSuperBlock->numOfSubBlocks > 0 && pSuperBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS); + if (pSuperBlock->numOfSubBlocks == 1) { + SBlock oBlock = *pSuperBlock; + oBlock.numOfSubBlocks = 0; + pSuperBlock->offset = sizeof(SBlock) * pTSCh->nSubBlocks; + if (tsdbAddSubBlocks(pTSCh, &oBlock, 1) < 0) return -1; + } + pSuperBlock->numOfSubBlocks++; + pSuperBlock->numOfRows += pBlock->numOfRows; + pSuperBlock->keyFirst = MIN(pSuperBlock->keyFirst, pBlock->keyFirst); + pSuperBlock->keyLast = MAX(pSuperBlock->keyLast, pBlock->keyLast); + if (tsdbAddSubBlocks(pTSCh, pBlock, 1) < 0) return -1; + return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index b11f23bb02..6c9cae687d 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #define _DEFAULT_SOURCE +#include #include #define TAOS_RANDOM_FILE_FAIL_TEST @@ -29,7 +30,6 @@ static int compFGroup(const void *arg1, const void *arg2); static int keyFGroupCompFunc(const void *key, const void *fgroup); static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo); static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep); -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days); // ---------------- INTERNAL FUNCTIONS ---------------- STsdbFileH *tsdbNewFileH(STsdbCfg *pCfg) { @@ -518,6 +518,9 @@ _err: *size = 0; } +int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { + return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); +} // ---------------- LOCAL FUNCTIONS ---------------- static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) { uint32_t version; @@ -590,7 +593,3 @@ static void tsdbInitFileGroup(SFileGroup *pFGroup, STsdbRepo *pRepo) { static TSKEY tsdbGetCurrMinKey(int8_t precision, int32_t keep) { return (TSKEY)(taosGetTimestamp(precision) - keep * tsMsPerDay[precision]); } - -static int tsdbGetCurrMinFid(int8_t precision, int32_t keep, int32_t days) { - return (int)(TSDB_KEY_FILEID(tsdbGetCurrMinKey(precision, keep), days, precision)); -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 291590f7ff..ea3cef659d 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -24,14 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); -static void * tsdbCommitData(void *arg); -static int tsdbCommitMeta(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo); -static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); -static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); // ---------------- INTERNAL FUNCTIONS ---------------- int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) { diff --git a/src/tsdb/src/tsdbReadUtil.c b/src/tsdb/src/tsdbReadUtil.c index 9b6c855e9a..dc9de4b016 100644 --- a/src/tsdb/src/tsdbReadUtil.c +++ b/src/tsdb/src/tsdbReadUtil.c @@ -31,7 +31,8 @@ static int tsdbDecodeBlockIdxArray(SReadHandle *pReadH); static int tsdbVerifyBlockInfo(SBlockInfo *pBlockInfo, SBlockIdx *pBlockIdx); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, int maxPoints, char *buffer, int bsize); -static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); +static int tsdbLoadColData(SReadHandle *pReadH, SFile *pFile, SBlock *pBlock, SBlockCol *pBlockCol, + SDataCol *pDataCol); SReadHandle *tsdbNewReadHandle(STsdbRepo *pRepo) { SReadHandle *pReadH = (SReadHandle *)calloc(1, sizeof(*pReadH)); @@ -75,7 +76,6 @@ int tsdbSetAndOpenReadFGroup(SReadHandle *pReadH, SFileGroup *pFGroup) { ASSERT(pReadH != NULL && pFGroup != NULL); STsdbRepo *pRepo = pReadH->pRepo; - STsdbCfg * pCfg = &(pRepo->config); pReadH->fGroup = *pFGroup; diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index c54a0f8a3a..776c8bc23c 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -17,6 +17,11 @@ #define TAOS_RANDOM_FILE_FAIL_TEST +#include +#include +#include +#include + #include "os.h" #include "hash.h" #include "taoserror.h" @@ -40,7 +45,6 @@ typedef struct { static int tdInitKVStoreHeader(int fd, char *fname); static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); -static char * tdGetKVStoreSnapshotFname(char *fdata); static char * tdGetKVStoreNewFname(char *fdata); static void tdFreeKVStore(SKVStore *pStore); static int tdUpdateKVStoreHeader(int fd, char *fname, SStoreInfo *pInfo); @@ -420,17 +424,6 @@ static void tdFreeKVStore(SKVStore *pStore) { } } -static char *tdGetKVStoreSnapshotFname(char *fdata) { - size_t size = strlen(fdata) + strlen(TD_KVSTORE_SNAP_SUFFIX) + 1; - char * fname = malloc(size); - if (fname == NULL) { - terrno = TSDB_CODE_COM_OUT_OF_MEMORY; - return NULL; - } - sprintf(fname, "%s%s", fdata, TD_KVSTORE_SNAP_SUFFIX); - return fname; -} - static char *tdGetKVStoreNewFname(char *fdata) { size_t size = strlen(fdata) + strlen(TD_KVSTORE_NEW_SUFFIX) + 1; char * fname = malloc(size); -- GitLab