From 1a139a077268923497f598efa26e1fddb3f7f6c7 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 15 Jun 2020 07:38:15 +0000 Subject: [PATCH] TD-353 --- src/inc/taoserror.h | 1 + src/tsdb/inc/tsdbMain.h | 6 +- src/tsdb/src/tsdbFile.c | 5 - src/tsdb/src/tsdbMain.c | 31 ----- src/tsdb/src/tsdbMemTable.c | 227 ++++++++++++++++++++++-------------- src/tsdb/src/tsdbMeta.c | 8 ++ 6 files changed, 155 insertions(+), 123 deletions(-) diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index eb457df624..a5bfb0fa1e 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -191,6 +191,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE, 0, 0x060B, "tsdb times TAOS_DEFINE_ERROR(TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP, 0, 0x060C, "tsdb submit message is messed up") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb invalid action") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle") diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 03b2bc255b..70cd19d55a 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -295,6 +295,8 @@ int tsdbUpdateTable(STsdbMeta* pMeta, STable* pTable, STableCfg* pCfg); int tsdbWLockRepoMeta(STsdbRepo* pRepo); int tsdbRLockRepoMeta(STsdbRepo* pRepo); int tsdbUnlockRepoMeta(STsdbRepo* pRepo); +void tsdbRefTable(STable* pTable); +void tsdbUnRefTable(STable* pTable); // ------------------ tsdbBuffer.c STsdbBufPool* tsdbNewBufPool(); @@ -317,8 +319,8 @@ int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem); #define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC -STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); -void tsdbFreeFileH(STsdbFileH* pFileH); +STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg); +void tsdbFreeFileH(STsdbFileH* pFileH); // ------------------ tsdbRWHelper.c #define TSDB_MAX_SUBBLOCKS 8 diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index e52a72c32a..77e07b54c4 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -325,11 +325,6 @@ int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, SFile *pFile) return 0; } -void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, - TSKEY *maxKey) { - *minKey = fileId * daysPerFile * tsMsPerDay[precision]; - *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; -} SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid) { if (pFileH->numOfFGroups == 0 || fid < pFileH->fGroup[0].fileId || fid > pFileH->fGroup[pFileH->numOfFGroups - 1].fileId) diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index c783b91ec1..298caff0f6 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -844,35 +844,4 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) { return TSDB_GET_TABLE_LAST_KEY(pTable); } - -/** - * Return the next iterator key. - * - * @return the next key if iter has - * -1 if iter not - */ -static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { - if (pIter == NULL) return -1; - - SSkipListNode *node = tSkipListIterGet(pIter); - if (node == NULL) return -1; - - SDataRow row = SL_GET_NODE_DATA(node); - return dataRowKey(row); -} - -static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { - TSKEY nextKey; - for (int i = 0; i < nIters; i++) { - SSkipListIterator *pIter = iters[i]; - nextKey = tsdbNextIterKey(pIter); - if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; - } - return 0; -} - - - - - #endif \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 57e7961049..78ce9be30a 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -18,6 +18,11 @@ #define TSDB_DATA_SKIPLIST_LEVEL 5 +typedef struct { + STable * pTable; + SSkipListIterator *pIter; +} SCommitIter; + static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo); static void * tsdbAllocBytes(STsdbRepo *pRepo, int bytes); @@ -337,9 +342,12 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple(data); } static void *tsdbCommitData(void *arg) { - STsdbRepo *pRepo = (STsdbRepo *)arg; - STsdbMeta *pMeta = pRepo->tsdbMeta; - SMemTable *pMem = pRepo->imem; + STsdbRepo * pRepo = (STsdbRepo *)arg; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &pRepo->config; + SDataCols * pDataCols = NULL; + SCommitIter *iters = NULL; ASSERT(pRepo->commit == 1); ASSERT(pMem != NULL); @@ -347,69 +355,84 @@ static void *tsdbCommitData(void *arg) { pMem->keyFirst, pMem->keyLast, pMem->numOfRows); // Create the iterator to read from cache - SSkipListIterator **iters = tsdbCreateTableIters(pRepo); + iters = tsdbCreateTableIters(pRepo); if (iters == NULL) { - tsdbError("vgId:%d failed to create table iterators since %s", REPO_ID(pRepo), tstrerror(terrno)); - // TODO: deal with the error here - return NULL; + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; } if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - // TODO goto _exit; } - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d since %s", REPO_ID(pRepo), - pMeta->maxRowBytes, pMeta->maxCols, tstrerror(terrno)); - // TODO + if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); goto _exit; } - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + int sfid = TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision); + int efid = TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision); // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - ASSERT(false); + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _exit; } } - // Do retention actions + // TODO: Do retention actions tsdbFitRetention(pRepo); - if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + + // TODO: Commit action meta data _exit: tdFreeDataCols(pDataCols); tsdbDestroyTableIters(iters, pCfg->maxTables); tsdbDestroyHelper(&whelper); + tsdbEndCommit(pRepo); + tsdbPrint("vgId:%d commit over", pRepo->config.tsdbId); + + return NULL; +} - tsdbLockRepo(arg); - tdListMove(pCache->imem->list, pCache->pool.memPool); - tsdbAdjustCacheBlocks(pCache); - tdListFree(pCache->imem->list); - free(pCache->imem); - pCache->imem = NULL; +static void tsdbEndCommit(STsdbRepo *pRepo) { + ASSERT(pRepo->commit == 1); + tsdbLockRepo(pRepo); pRepo->commit = 0; - for (int i = 1; i < pCfg->maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable && pTable->imem) { - tsdbFreeMemTable(pTable->imem); - pTable->imem = NULL; - } + tsdbUnlockRepo(pRepo); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); +} + +static TSKEY tsdbNextIterKey(SCommitIter *pIter) { + if (pIter == NULL) return -1; + + SSkipListNode *node = tSkipListIterGet(pIter->pIter); + if (node == NULL) return -1; + + SDataRow row = SL_GET_NODE_DATA(node); + return dataRowKey(row); +} + +static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + TSKEY nextKey = tsdbNextIterKey(iters + i); + if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1; } - tsdbUnLockRepo(arg); - tsdbPrint("vgId:%d, commit over....", pRepo->config.tsdbId); + return 0; +} - return NULL; +static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, + TSKEY *maxKey) { + *minKey = fileId * daysPerFile * tsMsPerDay[precision]; + *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } -static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, - SDataCols *pDataCols) { - char dataDir[128] = {0}; +static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) { + char * dataDir = NULL; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; @@ -420,70 +443,84 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters // Check if there are data to commit to this file int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); - if (!hasDataToCommit) return 0; // No data to commit, just return + if (!hasDataToCommit) { + tsdbTrace("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); + return 0; + } // Create and open files for commit - tsdbGetDataDirName(pRepo, dataDir); + dataDir = tsdbGetDataDirName(pRepo->rootDir); + if (dataDir == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) { - tsdbError("vgId:%d, failed to create file group %d", pRepo->config.tsdbId, fid); + tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; } + free(dataDir); + // Open files for write/read if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d, failed to set helper file", pRepo->config.tsdbId); + tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } // Loop to commit data in each table for (int tid = 1; tid < pCfg->maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL) continue; - - SSkipListIterator *pIter = iters[tid]; + SCommitIter *pIter = iters + tid; + if (pIter->pTable == NULL) continue; - // Set the helper and the buffer dataCols object to help to write this table tsdbSetHelperTable(pHelper, pTable, pRepo); - tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable)); - - // Loop to write the data in the cache to files. If no data to write, just break the loop - int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; - int nLoop = 0; - while (true) { - int rowsRead = tsdbReadRowsFromCache(pMeta, pTable, pIter, maxKey, maxRowsToRead, pDataCols); - assert(rowsRead >= 0); - if (pDataCols->numOfRows == 0) break; - nLoop++; - - ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); - ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); - - int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); - ASSERT(rowsWritten != 0); - if (rowsWritten < 0) goto _err; - ASSERT(rowsWritten <= pDataCols->numOfRows); - - tdPopDataColsPoints(pDataCols, rowsWritten); - maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; - } - ASSERT(pDataCols->numOfRows == 0); + if (pIter->pIter != NULL) { + tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable)); + + int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; + int nLoop = 0; + while (true) { + int rowsRead = tsdbReadRowsFromCache(pMeta, pIter->pTable, pIter->pIter, maxKey, maxRowsToRead, pDataCols); + ASSERT(rowsRead >= 0); + if (pDataCols->numOfRows == 0) break; + nLoop++; + + ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey); + ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey); + + int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols); + ASSERT(rowsWritten != 0); + if (rowsWritten < 0) { + tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), + TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), + tstrerror(terrno)); + goto _err; + } + ASSERT(rowsWritten <= pDataCols->numOfRows); + + tdPopDataColsPoints(pDataCols, rowsWritten); + maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfRows; + } + + ASSERT(pDataCols->numOfRows == 0); + } // Move the last block to the new .l file if neccessary if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block", pRepo->config.tsdbId); + tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } // Write the SCompBlock part if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part", pRepo->config.tsdbId); + tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); goto _err; } } if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compIdx part", pRepo->config.tsdbId); + tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); goto _err; } @@ -496,43 +533,63 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters return 0; _err: - ASSERT(false); + // ASSERT(false); tsdbCloseHelperFile(pHelper, 1); return -1; } -static SSkipListIterator **tsdbCreateTableIters(STsdbRepo *pRepo) { - STsdbCfg *pCfg = &(pRepo->config); +static SCommitIter *tsdbCreateTableIters(STsdbRepo *pRepo) { + STsdbCfg * pCfg = &(pRepo->config); + SMemTable *pMem = pRepo->imem; + STsdbMeta *pMeta = pRepo->tsdbMeta; - SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); + SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter)); if (iters == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; } - for (int tid = 1; tid < maxTables; tid++) { - STable *pTable = pMeta->tables[tid]; - if (pTable == NULL || pTable->imem == NULL || pTable->imem->numOfRows == 0) continue; + if (tsdbRLockRepoMeta(pRepo) < 0) goto _err; + + // reference all tables + for (int i = 0; i < pCfg->maxTables; i++) { + if (pMeta->tables[i] != NULL) { + tsdbRefTable(pMeta->tables[i]); + iters[i].pTable = pMeta->tables[i]; + } + } - iters[tid] = tSkipListCreateIter(pTable->imem->pData); - if (iters[tid] == NULL) goto _err; + if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err; - if (!tSkipListIterNext(iters[tid])) goto _err; + for (int i = 0; i < pCfg->maxTables; i++) { + if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TALBE_UID(iters[i].pTable) == pMem->tData[i]->uid)) { + if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + goto _err; + } + + if (!tSkipListIterNext(iters[i].pIter)) { + terrno = TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM; + goto _err; + } + } } return iters; _err: - tsdbDestroyTableIters(iters, maxTables); + tsdbDestroyTableIters(iters, pCfg->maxTables); return NULL; } -static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { +static void tsdbDestroyTableIters(SCommitIter *iters, int maxTables) { if (iters == NULL) return; - for (int tid = 1; tid < maxTables; tid++) { - if (iters[tid] == NULL) continue; - tSkipListDestroyIter(iters[tid]); + for (int i = 1; i < maxTables; i++) { + if (iters[i].pTable != NULL) { + tsdbUnRefTable(iters[i].pTable); + tSkipListDestroyIter(iters[i].pIter); + } } free(iters); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 10866bbae5..f96b0ae1da 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -516,6 +516,14 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) { return 0; } +void tsdbRefTable(STable *pTable) { T_REF_INC(pTable); } + +void tsdbUnRefTable(STable *pTable) { + if (T_REF_DEC(pTable) == 0) { + tsdbFreeTable(pTable); + } +} + // ------------------ LOCAL FUNCTIONS ------------------ static int tsdbCompareSchemaVersion(const void *key1, const void *key2) { if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { -- GitLab