From 608cd0446dbfeb3cb9314912d958eeaa4bd84112 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 13 Oct 2020 09:54:43 +0800 Subject: [PATCH] refactor more code --- src/tsdb/inc/tsdbMain.h | 6 +- src/tsdb/src/tsdbCommit.c | 225 ++++++++++--- src/tsdb/src/tsdbManifest.c | 238 -------------- src/tsdb/src/tsdbMemTable.c | 606 ++++++++++++++++++------------------ 4 files changed, 492 insertions(+), 583 deletions(-) delete mode 100644 src/tsdb/src/tsdbManifest.c diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 16f5c93374..0749fac72c 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -181,9 +181,9 @@ typedef struct { } SFile; typedef struct { - int fileId; - int state; // 0 for health, 1 for problem - SFile files[TSDB_FILE_TYPE_MAX]; + int32_t fileId; + int32_t state; // 0 for health, 1 for problem + SFile files[TSDB_FILE_TYPE_MAX]; } SFileGroup; typedef struct { diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index 3ac10f26d4..419d63b6d5 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -107,6 +107,8 @@ _err: } static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { + STsdbRepo *pRepo = pCommitH->pRepo; + // TODO: append commit over flag if (false /* tsdbLogCommitOver(pCommitH) < 0 */) { hasError = true; @@ -127,6 +129,11 @@ static void tsdbEndCommit(SCommitHandle *pCommitH, bool hasError) { pCommitH->fd = -1; remove(pCommitH->fname); tdListFree(pCommitH->pModLog); + + // notify uplayer to delete WAL + if (!hasError && pRepo->appH.notifyStatus) { + pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + } return; } @@ -162,27 +169,20 @@ static int tsdbCommitTimeSeriesData(SCommitHandle *pCommitH) { if (!tsdbHasDataToCommit(tsCommitH.pIters, pMem->maxTables, minKey, maxKey)) continue; - { // TODO: Log file change - SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); - if (pFGroup == NULL) { - - } else { - - } + if (tsdbLogTSFileChange(pCommitH, fid) < 0) { + tsdbDestroyTSCommitHandle(&tsCommitH); + return -1; } if (tsdbCommitToFile(pRepo, fid, &tsCommitH) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; + tsdbError("vgId:%d error occurs while committing to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbDestroyTSCommitHandle(&tsCommitH); + return -1; } } tsdbDestroyTSCommitHandle(&tsCommitH); return 0; - -_err: - tsdbDestroyTSCommitHandle(&tsCommitH); - return -1; } // Function to commit meta data @@ -289,7 +289,44 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { free(iters); } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) { +static int tsdbCommitToFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup, STSCommitHandle *pTSCh) { + SRWHelper * pWHelper = &(pTSCh->whelper); + SCommitIter *iters = pTSCh->pIters; + + if (tsdbHelperOpenFile(pWHelper) < 0) return -1; + + if (tsdbLoadCompIdx(pWHelper, NULL) < 0) { + tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + return -1; + } + + for (int tid = 1; tid < pTSCh->maxIters; tid++) { + if (tsdbCommitTableData(pTSCh, tid) < 0) { + tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + return -1; + } + + if (tsdbTryMoveLastBlock(pTSCh) < 0) { + tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + return -1; + } + + if (tsdbWriteBlockInfo(pWHelper) < 0) { + tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + return -1; + } + } + + if (tsdbWriteBlockIdx(pWHelper) < 0) { + tsdbHelperCloseFile(pWHelper, true /* hasError = false */); + return -1; + } + + tsdbHelperCloseFile(pWHelper, false /* hasError = false */); + return 0; +} + +static int tsdbCommitToFile(STsdbRepo *pRepo, SFileGroup *pOldFGroup, SFileGroup *pNewFGroup, STSCommitHandle *pTSCh) { char * dataDir = NULL; STsdbCfg * pCfg = &pRepo->config; STsdbFileH * pFileH = pRepo->tsdbFileH; @@ -299,18 +336,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) { SCommitIter *iters = pTSCh->pIters; SRWHelper * pHelper = &(pTSCh->whelper); SDataCols * pDataCols = pTSCh->pDataCols; + int fid = pOldFGroup->fileId; - // Create and open files for commit - dataDir = tsdbGetDataDirName(pRepo->rootDir); - if (dataDir == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { - tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } + ASSERT(pOldFGroup->fileId == pNewFGroup->fileId); // Open files for write/read if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { @@ -372,21 +400,21 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, STSCommitHandle *pTSCh) { taosTFree(dataDir); tsdbCloseHelperFile(pHelper, 0, pGroup); - pthread_rwlock_wrlock(&(pFileH->fhlock)); + // pthread_rwlock_wrlock(&(pFileH->fhlock)); - (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; + // (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); + // pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; - if (newLast) { - (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; - } else { - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; - } + // if (newLast) { + // (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); + // pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; + // } else { + // pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; + // } - pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; + // pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; - pthread_rwlock_unlock(&(pFileH->fhlock)); + // pthread_rwlock_unlock(&(pFileH->fhlock)); return 0; @@ -488,12 +516,15 @@ static int tsdbEncodeFileChange(void **buf, STsdbFileChange *pChange) { int tsize = 0; if (pChange->type == TSDB_META_FILE_CHANGE) { SMetaFileChange *pMetaChange = (SMetaFileChange *)pChange->change; + tsize += taosEncodeString(buf, pMetaChange->oname); tsize += taosEncodeString(buf, pMetaChange->nname); tsize += tdEncodeStoreInfo(buf, pMetaChange->info); } else if (pChange->type == TSDB_DATA_FILE_CHANGE) { SDataFileChange *pDataChange = (SDataFileChange *)pChange->change; - // TODO + + tsize += tsdbEncodeSFileGroup(buf, &(pDataChange->ofgroup)); + tsize += tsdbEncodeSFileGroup(buf, &(pDataChange->nfgroup)); } else { ASSERT(false); } @@ -506,6 +537,40 @@ static void *tsdbDecodeFileChange(void *buf, STsdbFileChange *pChange) { return buf; } +static int tsdbLogTSFileChange(SCommitHandle *pCommitH, int fid) { + STsdbRepo * pRepo = pCommitH->pRepo; + STsdbFileH *pFileH = pRepo->tsdbFileH; + + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(STsdbFileChange) + sizeof(SDataFileChange)); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + STsdbFileChange *pChange = (STsdbFileChange *)pNode->data; + pChange->type = TSDB_DATA_FILE_CHANGE; + + SDataFileChange *pDataFileChange = (SDataFileChange *)pChange->change; + + SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ); + if (pFGroup == NULL) { + pDataFileChange->ofgroup.fileId = fid; + } else { + pDataFileChange->ofgroup = *pFGroup; + } + + tsdbGetNextCommitFileGroup(&(pDataFileChange->ofgroup), &(pDataFileChange->nfgroup)); + + if (tsdbLogFileChange(pCommitH, pChange) < 0) { + free(pNode); + return -1; + } + + tdListAppendNode(pCommitH->pModLog, pNode); + + return 0; +} + static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) { STsdbRepo *pRepo = pCommitH->pRepo; SKVStore * pStore = pRepo->tsdbMeta->pStore; @@ -528,7 +593,7 @@ static int tsdbLogMetaFileChange(SCommitHandle *pCommitH) { free(pNode); return -1; } - tdListPrependNode(pCommitH->pModLog, pNode); + tdListAppendNode(pCommitH->pModLog, pNode); return 0; } @@ -556,7 +621,7 @@ static int tsdbLogRetentionChange(SCommitHandle *pCommitH, int mfid) { free(pNode); return -1; } - tdListPrependNode(pCommitH->pModLog, &pChange); + tdListAppendNode(pCommitH->pModLog, &pChange); } else { break; } @@ -593,4 +658,86 @@ static void tsdbSeekTSCommitHandle(STSCommitHandle *pTSCh, TSKEY key) { while (tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key, INT32_MAX, NULL, NULL, 0) != 0) { } } +} + +static int tsdbEncodeSFileGroup(void **buf, SFileGroup *pFGroup) { + int tsize = 0; + + tsize += taosEncodeVariantI32(buf, pFGroup->fileId); + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(pFGroup->files[type]); + + tsize += taosEncodeString(buf, pFile->fname); + tsize += tsdbEncodeSFileInfo(buf, &pFile->info); + } + + return tsize; +} + +static void *tsdbDecodeSFileGroup(void *buf, SFileGroup *pFGroup) { + buf = taosDecodeVariantI32(buf, &(pFGroup->fileId)); + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pFile = &(pFGroup->files[type]); + + buf = taosDecodeString(buf, &(pFile->fname)); + buf = tsdbDecodeSFileInfo(buf, &(pFile->info)); + } + + return buf; +} + +static void tsdbGetNextCommitFileGroup(SFileGroup *pOldGroup, SFileGroup *pNewGroup) { + pNewGroup->fileId = pOldGroup->fileId; + + for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) { + SFile *pOldFile = &(pOldGroup->files[type]); + SFile *pNewFile = &(pNewGroup->files[type]); + + size_t len =strlen(pOldFile->fname); + if (len == 0 || pOldFile->fname[len - 1] == '1') { + tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 0, pNewFile->fname); + } else { + tsdbGetFileName(pRepo->rootDir, type, vid, pOldGroup->fileId, 1, pNewFile->fname); + } + } +} + +static int tsdbCommitTableData(STSCommitHandle *pTSCh, int tid) { + SCommitIter *pIter = pTSCh->pIters + tid; + if (pIter->pTable == NULL) return 0; + + taosRLockLatch(&(pIter->pTable->latch)); + + if (pIter->pIter == NULL) { + // TODO + } + + if (tdInitDataCols(pTSCh->pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + if (tsdbReadBlockInfo() < 0) { + goto _err; + } + + while (true) { + TSKEY keyNext = tsdbNextIterKey(pIter->pIter); + if (keyNext < 0 || keyNext > maxKey) break; + + if (/* no block info exists*/ || keyNext > pIdx->maxKey) { + if (tsdbProcessAppendCommit() < 0) goto _err; + } else { + if (tsdbProcessMergeCommit() < 0) goto _err; + } + + } + + taosRUnLockLatch(&(pIter->pTable->latch)); + return 0; + +_err: + taosRUnLockLatch(&(pIter->pTable->latch)); + return -1; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbManifest.c b/src/tsdb/src/tsdbManifest.c deleted file mode 100644 index 17527cdee8..0000000000 --- a/src/tsdb/src/tsdbManifest.c +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include -#include -#include -#include -#include - -#include "tchecksum.h" -#include "tsdbMain.h" - -#define TSDB_MANIFEST_FILE_VERSION 0 -#define TSDB_MANIFEST_FILE_HEADER_SIZE 128 -#define TSDB_MANIFEST_END "C0D09F476DEF4A32B694A6A9E7B7B240" -#define TSDB_MANIFEST_END_SIZE 32 - -#define TSDB_MANIFEST_END_RECORD 0 -#define TSDB_MANIFEST_META_RECORD 1 -#define TSDB_MANIFEST_DATA_RECORD 2 - -typedef struct { - int type; - int len; -} SManifestRecord; - -int tsdbInitManifestHandle(STsdbRepo *pRepo, SManifestHandle *pManifest) { - STsdbCfg *pCfg = &(pRepo->config); - - pManifest->pBuffer = NULL; - pManifest->contSize = 0; - - tsdbGetFileName(pRepo->rootDir, TSDB_FILE_TYPE_MANIFEST, pCfg->tsdbId, 0, 0, &(pManifest->fname)); - pManifest->fd = open(pManifest->fname, O_CREAT | O_APPEND, 0755); - if (pManifest->fd < 0) { - tsdbError("vgId:%d failed to open file %s since %s", REPO_ID(pRepo), fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (tsdbWriteManifestHeader(pRepo, pManifest) < 0) { - tsdbCloseManifestHandle(pRepo, pManifest); - return -1; - } - - return 0; -} - -void tsdbCloseManifestHandle(SManifestHandle *pManifest) { - if (pManifest != NULL && pManifest->fd > 0) { - close(pManifest->fd); - pManifest->fd = -1; - } - - remove(pManifest->fname); - taosTZfree(pManifest->pBuffer); - pManifest->pBuffer = NULL; - pManifest->contSize = 0; - return 0; -} - -int tsdbAppendManifestRecord(SManifestHandle *pManifest, STsdbRepo *pRepo, int type) { - ASSERT(pManifest->pBuffer != NULL && taosTSizeof(pManifest->pBuffer) >= pManifest->contSize); - - if (pManifest->contSize > 0) { - if (tsdbManifestMakeMoreRoom(pManifest, sizeof(TSCKSUM)) < 0) return -1; - pManifest->contSize += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)pManifest->pBuffer, pManifest->contSize); - } - - SManifestRecord mRecord = {.type = type, .len = pManifest->contSize}; - - // Write mRecord part - if (taosTWrite(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < sizeof(mRecord)) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), sizeof(mRecord), pManifest->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - // Write buffer part - if (pManifest->contSize > 0 && taosTWrite(pManifest->fd, pManifest->pBuffer, pManifest->contSize) < pManifest->contSize) { - tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pRepo), pManifest->contSize, - pManifest->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (fsync(pManifest->fd) < 0) { - tsdbError("vgId:%d failed to fsync file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; -} - -int tsdbAppendManifestEnd(SManifestHandle *pManifest, STsdbRepo *pRepo) { - pManifest->contSize = 0; - return tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_END_RECORD); -} - -int tsdbManifestMakeRoom(SManifestHandle *pManifest, int expectedSize) { - pManifest->pBuffer = taosTRealloc(pManifest->pBuffer, expectedSize); - if (pManifest->pBuffer == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -int tsdbManifestMakeMoreRoom(SManifestHandle *pManifest, int moreSize) { - return tsdbManifestMakeRoom(pManifest, pManifest->contSize + moreSize); -} - -// TODO -bool tsdbIsManifestEnd(SManifestHandle *pManifest) { - SManifestRecord mRecord; - - if (lseek(pManifest->fd, sizeof(mRecord), SEEK_END) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return false; - } - - if (taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)) < 0) { - tsdbError("vgId:%d failed to read manifest end from file %s since %s", REPO_ID(pRepo), pManifest->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return false; - } - - return (mRecord.type == TSDB_MANII) -} - -int tsdbManifestRollBackOrForward(SManifestHandle *pManifest, bool isManifestEnd, STsdbRepo *pRepo) { - SManifestRecord mRecord; - - if (lseek(pManifest->fd, TSDB_MANIFEST_FILE_HEADER_SIZE, SEEK_SET) < 0) { - tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return; - } - - while (true) { - ssize_t size = 0; - - size = taosTRead(pManifest->fd, (void *)(&mRecord), sizeof(mRecord)); - if (size < 0) { - tsdbError("vgId:%d failed to read SManifestRecord part from file %s since %s", REPO_ID(pRepo), pManifest->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (size < sizeof(mRecord)) break; - if ((mRecord.type != TSDB_MANIFEST_DATA_RECORD && mRecord.type != TSDB_MANIFEST_META_RECORD && mRecord.type != TSDB_MANIFEST_END_RECORD) || mRecord.len < 0) { - tsdbError("vgId:%d manifest file %s is broken since invalid mRecord content", REPO_ID(pRepo), pManifest->fname); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - if (mRecord.type == TSDB_MANIFEST_END_RECORD) { - ASSERT(isManifestEnd && mRecord.len == 0); - break; - } - - if (tsdbManifestMakeRoom(pManifest, mRecord.len) < 0) return -1; - - size = taosTRead(pManifest->fd, pManifest->pBuffer, mRecord.len); - if (size < 0) { - tsdbError("vgId:%d failed to read SManifestRecord content from file %s since %s", REPO_ID(pRepo), pManifest->fname, - strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - if (size < mRecord.len) break; - - if (!taosCheckChecksumWhole((uint8_t *)pManifest->pBuffer, size)) { - tsdbError("vgId:%d manifest file %s is broken since checksum error", REPO_ID(pRepo), pManifest->fname); - terrno = TSDB_CODE_TDB_FILE_CORRUPTED; - return -1; - } - - if (mRecord.type == TSDB_MANIFEST_DATA_RECORD) { - // func1(pManifest->pBuffer, mRecord.len, isManifestEnd); - } else if (mRecord.type == TSDB_MANIFEST_META_RECORD) { - // func2(pManifest->pBuffer, mRecord.len, isManifestEnd); - } else { - ASSERT(0); - } - } - - return 0; - -} - -int tsdbEncodeManifestRecord(SManifestHandle *pManifest) { - pManifest->contSize = 0; - -} - -static int tsdbEncodeManifestHeader(void **buffer) { - int len = taosEncodeFixedU32(buf, TSDB_MANIFEST_FILE_VERSION); - return len; -} - -static void *tsdbDecodeManifestHeader(void *buffer, uint32_t version) { - buffer = taosDecodeFixedU32(buffer, &version); - return buffer; -} - -static int tsdbWriteManifestHeader(STsdbRepo *pRepo, SManifestHandle *pManifest) { - char buffer[TSDB_MANIFEST_FILE_HEADER_SIZE] = "\0"; - tsdbEncodeManifestHeader(&buffer); - - taosCalcChecksumAppend(0, (uint8_t)buffer, TSDB_MANIFEST_FILE_HEADER_SIZE); - if (taosTWrite(pManifest->fd, buffer, TSDB_MANIFEST_FILE_HEADER_SIZE) < 0) { - tsdbError("vgId:%d failed to write file %s since %s", REPO_ID(pRepo), pManifest->fname, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - - return 0; -} \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 8cd3ec6dab..5c3d48f6c9 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -463,315 +463,315 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(*(SDataRow *)data); } -static void *tsdbCommitData(void *arg) { - STsdbRepo * pRepo = (STsdbRepo *)arg; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitHandle commitHandle = {0}; - SCommitHandle *pCommitH = &commitHandle; - - ASSERT(pRepo->commit == 1 && pMem != NULL); - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), - pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - - pCommitH->pRepo = pRepo; - if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit; - - // Create the iterator to read from cache - if (pMem->numOfRows > 0) { - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - goto _exit; - } - - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _exit; - } - } - } - - // Commit to update meta file - if (tsdbCommitMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - tsdbFitRetention(pRepo); - - if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) { - // TODO - } - - tsdbApplyManifestAction(&pCommitH->manifest); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbCloseManifestHandle(&(pCommitH->manifest)); - tsdbDestroyHelper(&(pCommitH->whelper)); - tsdbEndCommit(pRepo); - tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); - - return NULL; -} - -static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) { - SMemTable *pMem = pRepo->imem; - STsdbMeta *pMeta = pRepo->tsdbMeta; - SActObj * pAct = NULL; - SActCont * pCont = NULL; - - if (listNEles(pMem->actList) > 0) { - pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer)); - - if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err; - - if (tdKVStoreStartCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - SListNode *pNode = NULL; - - while ((pNode = tdListPopHead(pMem->actList)) != NULL) { - pAct = (SActObj *)pNode->data; - if (pAct->act == TSDB_UPDATE_META) { - pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { - tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; - } - } else if (pAct->act == TSDB_DROP_META) { - if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { - tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tdKVStoreEndCommit(pMeta->pStore); - goto _err; - } - } else { - ASSERT(false); - } - } - - if (tdKVStoreEndCommit(pMeta->pStore) < 0) { - tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - } - - return 0; - -_err: - return -1; -} - -static void tsdbEndCommit(STsdbRepo *pRepo) { - ASSERT(pRepo->commit == 1); - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); -} - -static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { - for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); - if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; - } - return 0; -} +// static void *tsdbCommitData(void *arg) { +// STsdbRepo * pRepo = (STsdbRepo *)arg; +// SMemTable * pMem = pRepo->imem; +// STsdbCfg * pCfg = &pRepo->config; +// STsdbMeta * pMeta = pRepo->tsdbMeta; +// SCommitHandle commitHandle = {0}; +// SCommitHandle *pCommitH = &commitHandle; + +// ASSERT(pRepo->commit == 1 && pMem != NULL); + +// tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), +// pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + +// pCommitH->pRepo = pRepo; +// if (tsdbInitManifestHandle(pRepo, &(pCommitH->manifest)) < 0) goto _exit; + +// // Create the iterator to read from cache +// if (pMem->numOfRows > 0) { +// iters = tsdbCreateCommitIters(pRepo); +// if (iters == NULL) { +// tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _exit; +// } + +// if (tsdbInitWriteHelper(&(pCommitH->whelper), pRepo) < 0) { +// tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _exit; +// } + +// if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", +// REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); +// goto _exit; +// } + +// int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); +// int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + +// // Loop to commit to each file +// for (int fid = sfid; fid <= efid; fid++) { +// if (tsdbCommitToFile(pRepo, fid, iters, &(pCommitH->whelper), pDataCols) < 0) { +// tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); +// goto _exit; +// } +// } +// } + +// // Commit to update meta file +// if (tsdbCommitMeta(pRepo) < 0) { +// tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _exit; +// } + +// tsdbFitRetention(pRepo); + +// if (tsdbAppendManifestEnd(&pCommitH->manifest, pCommitH->pRepo) < 0) { +// // TODO +// } + +// tsdbApplyManifestAction(&pCommitH->manifest); + +// _exit: +// tdFreeDataCols(pDataCols); +// tsdbDestroyCommitIters(iters, pMem->maxTables); +// tsdbCloseManifestHandle(&(pCommitH->manifest)); +// tsdbDestroyHelper(&(pCommitH->whelper)); +// tsdbEndCommit(pRepo); +// tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); + +// return NULL; +// } + +// static int tsdbCommitMeta(STsdbRepo *pRepo, SManifestHandle *pManifest) { +// SMemTable *pMem = pRepo->imem; +// STsdbMeta *pMeta = pRepo->tsdbMeta; +// SActObj * pAct = NULL; +// SActCont * pCont = NULL; + +// if (listNEles(pMem->actList) > 0) { +// pManifest->contSize = tdEncodeCommitAction(pMeta->pStore, &(pManifest->pBuffer)); + +// if (tsdbAppendManifestRecord(pManifest, pRepo, TSDB_MANIFEST_META_RECORD) < 0) goto _err; + +// if (tdKVStoreStartCommit(pMeta->pStore) < 0) { +// tsdbError("vgId:%d failed to commit data while start commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } + +// SListNode *pNode = NULL; + +// while ((pNode = tdListPopHead(pMem->actList)) != NULL) { +// pAct = (SActObj *)pNode->data; +// if (pAct->act == TSDB_UPDATE_META) { +// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); +// if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) { +// tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tdKVStoreEndCommit(pMeta->pStore); +// goto _err; +// } +// } else if (pAct->act == TSDB_DROP_META) { +// if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) { +// tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tdKVStoreEndCommit(pMeta->pStore); +// goto _err; +// } +// } else { +// ASSERT(false); +// } +// } + +// if (tdKVStoreEndCommit(pMeta->pStore) < 0) { +// tsdbError("vgId:%d failed to commit data while end commit meta since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } +// } + +// return 0; + +// _err: +// return -1; +// } + +// static void tsdbEndCommit(STsdbRepo *pRepo) { +// ASSERT(pRepo->commit == 1); +// if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); +// } + +// static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { +// for (int i = 0; i < nIters; i++) { +// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); +// if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; +// } +// return 0; +// } void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { - char * dataDir = NULL; - STsdbCfg * pCfg = &pRepo->config; - STsdbFileH *pFileH = pRepo->tsdbFileH; - SFileGroup *pGroup = NULL; - SMemTable * pMem = pRepo->imem; - bool newLast = false; - - TSKEY minKey = 0, maxKey = 0; - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - - // Check if there are data to commit to this file - int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey); - if (!hasDataToCommit) { - tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); - return 0; - } - - // Create and open files for commit - dataDir = tsdbGetDataDirName(pRepo->rootDir); - if (dataDir == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { - tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } - - // Open files for write/read - if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - newLast = TSDB_NLAST_FILE_OPENED(pHelper); - - if (tsdbLoadCompIdx(pHelper, NULL) < 0) { - tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Loop to commit data in each table - for (int tid = 1; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = iters + tid; - if (pIter->pTable == NULL) continue; - - taosRLockLatch(&(pIter->pTable->latch)); - - if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; - - if (pIter->pIter != NULL) { - if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { - taosRUnLockLatch(&(pIter->pTable->latch)); - tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), - tstrerror(terrno)); - goto _err; - } - } - - taosRUnLockLatch(&(pIter->pTable->latch)); - - // Move the last block to the new .l file if neccessary - if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Write the SCompBlock part - if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - } - - if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; - } - - taosTFree(dataDir); - tsdbCloseHelperFile(pHelper, 0, pGroup); - - pthread_rwlock_wrlock(&(pFileH->fhlock)); - - (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; - - if (newLast) { - (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; - } else { - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; - } - - pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; - - pthread_rwlock_unlock(&(pFileH->fhlock)); - - return 0; - -_err: - taosTFree(dataDir); - tsdbCloseHelperFile(pHelper, 1, NULL); - return -1; -} - -static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { - SMemTable *pMem = pRepo->imem; - STsdbMeta *pMeta = pRepo->tsdbMeta; - - SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); - if (iters == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return NULL; - } - - if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; - - // reference all tables - for (int i = 0; i < pMem->maxTables; i++) { - if (pMeta->tables[i] != NULL) { - tsdbRefTable(pMeta->tables[i]); - iters[i].pTable = pMeta->tables[i]; - } - } - - if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; - - for (int i = 0; i < pMem->maxTables; i++) { - if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { - if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; - } - - tSkipListIterNext(iters[i].pIter); - } - } - - return iters; - -_err: - tsdbDestroyCommitIters(iters, pMem->maxTables); - return NULL; -} - -static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { - if (iters == NULL) return; - - for (int i = 1; i < maxTables; i++) { - if (iters[i].pTable != NULL) { - tsdbUnRefTable(iters[i].pTable); - tSkipListDestroyIter(iters[i].pIter); - } - } - - free(iters); -} +// static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { +// char * dataDir = NULL; +// STsdbCfg * pCfg = &pRepo->config; +// STsdbFileH *pFileH = pRepo->tsdbFileH; +// SFileGroup *pGroup = NULL; +// SMemTable * pMem = pRepo->imem; +// bool newLast = false; + +// TSKEY minKey = 0, maxKey = 0; +// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + +// // Check if there are data to commit to this file +// int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey); +// if (!hasDataToCommit) { +// tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); +// return 0; +// } + +// // Create and open files for commit +// dataDir = tsdbGetDataDirName(pRepo->rootDir); +// if (dataDir == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return -1; +// } + +// if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) { +// tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); +// goto _err; +// } + +// // Open files for write/read +// if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { +// tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } + +// newLast = TSDB_NLAST_FILE_OPENED(pHelper); + +// if (tsdbLoadCompIdx(pHelper, NULL) < 0) { +// tsdbError("vgId:%d failed to load SCompIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } + +// // Loop to commit data in each table +// for (int tid = 1; tid < pMem->maxTables; tid++) { +// SCommitIter *pIter = iters + tid; +// if (pIter->pTable == NULL) continue; + +// taosRLockLatch(&(pIter->pTable->latch)); + +// if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; + +// if (pIter->pIter != NULL) { +// if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// goto _err; +// } + +// if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { +// taosRUnLockLatch(&(pIter->pTable->latch)); +// tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), +// TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), +// tstrerror(terrno)); +// goto _err; +// } +// } + +// taosRUnLockLatch(&(pIter->pTable->latch)); + +// // Move the last block to the new .l file if neccessary +// if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { +// tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } + +// // Write the SCompBlock part +// if (tsdbWriteCompInfo(pHelper) < 0) { +// tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); +// goto _err; +// } +// } + +// if (tsdbWriteCompIdx(pHelper) < 0) { +// tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); +// goto _err; +// } + +// taosTFree(dataDir); +// tsdbCloseHelperFile(pHelper, 0, pGroup); + +// pthread_rwlock_wrlock(&(pFileH->fhlock)); + +// (void)rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname); +// pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; + +// if (newLast) { +// (void)rename(helperNewLastF(pHelper)->fname, helperLastF(pHelper)->fname); +// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; +// } else { +// pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; +// } + +// pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; + +// pthread_rwlock_unlock(&(pFileH->fhlock)); + +// return 0; + +// _err: +// taosTFree(dataDir); +// tsdbCloseHelperFile(pHelper, 1, NULL); +// return -1; +// } + +// static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { +// SMemTable *pMem = pRepo->imem; +// STsdbMeta *pMeta = pRepo->tsdbMeta; + +// SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter)); +// if (iters == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return NULL; +// } + +// if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; + +// // reference all tables +// for (int i = 0; i < pMem->maxTables; i++) { +// if (pMeta->tables[i] != NULL) { +// tsdbRefTable(pMeta->tables[i]); +// iters[i].pTable = pMeta->tables[i]; +// } +// } + +// if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; + +// for (int i = 0; i < pMem->maxTables; i++) { +// if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { +// if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// goto _err; +// } + +// tSkipListIterNext(iters[i].pIter); +// } +// } + +// return iters; + +// _err: +// tsdbDestroyCommitIters(iters, pMem->maxTables); +// return NULL; +// } + +// static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) { +// if (iters == NULL) return; + +// for (int i = 1; i < maxTables; i++) { +// if (iters[i].pTable != NULL) { +// tsdbUnRefTable(iters[i].pTable); +// tSkipListDestroyIter(iters[i].pIter); +// } +// } + +// free(iters); +// } static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) { ASSERT(pMemTable->maxTables < maxTables); -- GitLab