From 941834a659306bf892a5861f351c2a01312396c2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 10 Nov 2020 05:39:09 +0000 Subject: [PATCH] try to fix TD-2004 --- src/inc/tsdb.h | 3 + src/tsdb/inc/tsdbMain.h | 7 +- src/tsdb/src/tsdbCommitQueue.c | 149 +++++++++++++++++++++++++++++++ src/tsdb/src/tsdbMain.c | 9 +- src/tsdb/src/tsdbMemTable.c | 156 ++++++++++++++------------------- src/vnode/src/vnodeMain.c | 9 ++ 6 files changed, 241 insertions(+), 92 deletions(-) create mode 100644 src/tsdb/src/tsdbCommitQueue.c diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 02ca99e4b8..993ec287a5 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -321,6 +321,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); */ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); +int tsdbInitCommitQueue(int nthreads); +void tsdbDestroyCommitQueue(); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 7d40d7f00a..0962e7c2cb 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -220,8 +220,7 @@ typedef struct { SMemTable* mem; SMemTable* imem; STsdbFileH* tsdbFileH; - int commit; - pthread_t commitThread; + sem_t readyToCommit; pthread_mutex_t mutex; bool repoLocked; } STsdbRepo; @@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); int tsdbAsyncCommit(STsdbRepo* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); +void* tsdbCommitData(STsdbRepo* pRepo); static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; @@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx); int tsdbCloseScanFile(STsdbScanHandle* pScanHandle); void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle); +// ------------------ tsdbCommitQueue.c +int tsdbScheduleCommit(STsdbRepo *pRepo); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c new file mode 100644 index 0000000000..0272d7b398 --- /dev/null +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include "os.h" +#include "tlist.h" +#include "tsdbMain.h" + +typedef struct { + bool stop; + pthread_mutex_t lock; + pthread_cond_t queueNotEmpty; + int nthreads; + SList * queue; + pthread_t * threads; +} SCommitQueue; + +typedef struct { + STsdbRepo *pRepo; +} SCommitReq; + +static void *tsdbLoopCommit(void *arg); + +SCommitQueue tsCommitQueue = {0}; + +int tsdbInitCommitQueue(int nthreads) { + SCommitQueue *pQueue = &tsCommitQueue; + + if (nthreads < 1) nthreads = 1; + + pQueue->stop = false; + pQueue->nthreads = nthreads; + + pQueue->queue = tdListNew(0); + if (pQueue->queue == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t)); + if (pQueue->threads == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tdListFree(pQueue->queue); + return -1; + } + + pthread_mutex_init(&(pQueue->lock), NULL); + pthread_cond_init(&(pQueue->queueNotEmpty), NULL); + + for (int i = 0; i < nthreads; i++) { + pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL); + } + + return 0; +} + +void tsdbDestroyCommitQueue() { + SCommitQueue *pQueue = &tsCommitQueue; + + pthread_mutex_lock(&(pQueue->lock)); + + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + return; + } + + pQueue->stop = true; + pthread_cond_broadcast(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + + for (size_t i = 0; i < pQueue->nthreads; i++) { + pthread_join(pQueue->threads[i], NULL); + } + + free(pQueue->threads); + tdListFree(pQueue->queue); + pthread_cond_destroy(&(pQueue->queueNotEmpty)); + pthread_mutex_destroy(&(pQueue->lock)); +} + +int tsdbScheduleCommit(STsdbRepo *pRepo) { + SCommitQueue *pQueue = &tsCommitQueue; + + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); + if (pNode == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + ((SCommitReq *)pNode->data)->pRepo = pRepo; + + pthread_mutex_lock(&(pQueue->lock)); + + ASSERT(!pQueue->stop); + + tdListAppendNode(pQueue->queue, pNode); + pthread_cond_signal(&(pQueue->queueNotEmpty)); + + pthread_mutex_unlock(&(pQueue->lock)); + return 0; +} + +static void *tsdbLoopCommit(void *arg) { + SCommitQueue *pQueue = &tsCommitQueue; + SListNode * pNode = NULL; + STsdbRepo * pRepo = NULL; + + while (true) { + pthread_mutex_lock(&(pQueue->lock)); + + while (true) { + pNode = tdListPopHead(pQueue->queue); + if (pNode == NULL) { + if (pQueue->stop) { + pthread_mutex_unlock(&(pQueue->lock)); + goto _exit; + } else { + pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock)); + } + } else { + break; + } + } + + pthread_mutex_unlock(&(pQueue->lock)); + + pRepo = ((SCommitReq *)pNode->data)->pRepo; + + tsdbCommitData(pRepo); + listNodeFree(pNode); + } + +_exit: + return NULL; +} diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 9a7c2db3d3..677ef20416 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (toCommit) { tsdbAsyncCommit(pRepo); - if (pRepo->commit) pthread_join(pRepo->commitThread, NULL); + sem_wait(&(pRepo->readyToCommit)); } tsdbUnRefMemTable(pRepo, pRepo->mem); tsdbUnRefMemTable(pRepo, pRepo->imem); @@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { goto _err; } + code = sem_init(&(pRepo->readyToCommit), 0, 1); + if (code != 0) { + terrno = TAOS_SYSTEM_ERROR(code); + goto _err; + } + pRepo->repoLocked = false; pRepo->rootDir = strdup(rootDir); @@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { // tsdbFreeMemTable(pRepo->mem); // tsdbFreeMemTable(pRepo->imem); taosTFree(pRepo->rootDir); + sem_destroy(&(pRepo->readyToCommit)); pthread_mutex_destroy(&pRepo->mutex); free(pRepo); } diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 150bda3b80..7b3c617a90 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static char * tsdbGetTsTupleKey(const void *data); -static void * tsdbCommitData(void *arg); static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); @@ -262,39 +261,19 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { int tsdbAsyncCommit(STsdbRepo *pRepo) { SMemTable *pIMem = pRepo->imem; - int code = 0; - if (pIMem != NULL) { - ASSERT(pRepo->commit); - tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo)); - code = pthread_join(pRepo->commitThread, NULL); - tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo)); - if (code != 0) { - tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - return -1; - } - pRepo->commit = 0; - } - - ASSERT(pRepo->commit == 0); if (pRepo->mem != NULL) { + sem_wait(&(pRepo->readyToCommit)); + if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START); if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; - pRepo->commit = 1; - code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo); - if (code != 0) { - tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(code); - tsdbUnlockRepo(pRepo); - return -1; - } + tsdbScheduleCommit(pRepo); if (tsdbUnlockRepo(pRepo) < 0) return -1; } - if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; + if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1; return 0; } @@ -419,6 +398,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey return 0; } +void *tsdbCommitData(STsdbRepo *pRepo) { + SMemTable * pMem = pRepo->imem; + STsdbCfg * pCfg = &pRepo->config; + SDataCols * pDataCols = NULL; + STsdbMeta * pMeta = pRepo->tsdbMeta; + SCommitIter *iters = NULL; + SRWHelper whelper = {0}; + ASSERT(pMem != NULL); + + tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), + pMem->keyFirst, pMem->keyLast, pMem->numOfRows); + + // Create the iterator to read from cache + if (pMem->numOfRows > 0) { + iters = tsdbCreateCommitIters(pRepo); + if (iters == NULL) { + tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { + tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", + REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); + goto _exit; + } + + int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); + int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + + // Loop to commit to each file + for (int fid = sfid; fid <= efid; fid++) { + if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { + tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + goto _exit; + } + } + } + + // Commit to update meta file + if (tsdbCommitMeta(pRepo) < 0) { + tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); + goto _exit; + } + + tsdbFitRetention(pRepo); + +_exit: + tdFreeDataCols(pDataCols); + tsdbDestroyCommitIters(iters, pMem->maxTables); + tsdbDestroyHelper(&whelper); + tsdbEndCommit(pRepo); + tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); + + return NULL; +} + // ---------------- LOCAL FUNCTIONS ---------------- static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) { ASSERT(pRepo->mem != NULL); @@ -529,69 +570,6 @@ static void tsdbFreeTableData(STableData *pTableData) { static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); } -static void *tsdbCommitData(void *arg) { - STsdbRepo * pRepo = (STsdbRepo *)arg; - SMemTable * pMem = pRepo->imem; - STsdbCfg * pCfg = &pRepo->config; - SDataCols * pDataCols = NULL; - STsdbMeta * pMeta = pRepo->tsdbMeta; - SCommitIter *iters = NULL; - SRWHelper whelper = {0}; - ASSERT(pRepo->commit == 1); - ASSERT(pMem != NULL); - - tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo), - pMem->keyFirst, pMem->keyLast, pMem->numOfRows); - - // Create the iterator to read from cache - if (pMem->numOfRows > 0) { - iters = tsdbCreateCommitIters(pRepo); - if (iters == NULL) { - tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if (tsdbInitWriteHelper(&whelper, pRepo) < 0) { - tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s", - REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno)); - goto _exit; - } - - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); - - // Loop to commit to each file - for (int fid = sfid; fid <= efid; fid++) { - if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _exit; - } - } - } - - // Commit to update meta file - if (tsdbCommitMeta(pRepo) < 0) { - tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _exit; - } - - tsdbFitRetention(pRepo); - -_exit: - tdFreeDataCols(pDataCols); - tsdbDestroyCommitIters(iters, pMem->maxTables); - tsdbDestroyHelper(&whelper); - tsdbEndCommit(pRepo); - tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId); - - return NULL; -} static int tsdbCommitMeta(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; @@ -642,8 +620,8 @@ _err: } static void tsdbEndCommit(STsdbRepo *pRepo) { - ASSERT(pRepo->commit == 1); if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER); + sem_post(&(pRepo->readyToCommit)); } static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f90b9701fe..17a8581bd2 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -28,6 +28,8 @@ #include "vnodeCfg.h" #include "vnodeVersion.h" +#define DEFAULT_COMMIT_THREADS 1 + static SHashObj*tsVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); @@ -67,10 +69,17 @@ int32_t vnodeInitResources() { return TSDB_CODE_VND_OUT_OF_MEMORY; } + if (tsdbInitCommitQueue(DEFAULT_COMMIT_THREADS) < 0) { + vError("failed to init vnode commit queue"); + return terrno; + } + return TSDB_CODE_SUCCESS; } void vnodeCleanupResources() { + tsdbDestroyCommitQueue(); + if (tsVnodesHash != NULL) { vDebug("vnode list is cleanup"); taosHashCleanup(tsVnodesHash); -- GitLab