diff --git a/source/dnode/vnode/impl/src/vnodeBufferPool.c b/source/dnode/vnode/impl/src/vnodeBufferPool.c index 2affcbb95df16cb3412848c7f0d5a0854a0e9576..8fcc112a9fbe1ac136aa887459006e6df6679c71 100644 --- a/source/dnode/vnode/impl/src/vnodeBufferPool.c +++ b/source/dnode/vnode/impl/src/vnodeBufferPool.c @@ -19,6 +19,8 @@ #define VNODE_BUF_POOL_SHARDS 3 struct SVBufPool { + pthread_mutex_t mutex; + pthread_cond_t hasFree; TD_DLIST(SVMemAllocator) free; TD_DLIST(SVMemAllocator) incycle; SVMemAllocator *inuse; @@ -110,6 +112,8 @@ void *vnodeMalloc(SVnode *pVnode, uint64_t size) { if (pBufPool->inuse) { tDListPop(&(pBufPool->free), pBufPool->inuse); break; + } else { + // tsem_wait(&(pBufPool->hasFree)); } } } diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h deleted file mode 100644 index b690e3bdc25d86acf7e5b9d580470a80f3f4316f..0000000000000000000000000000000000000000 --- a/src/tsdb/inc/tsdbCommitQueue.h +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TSDB_COMMIT_QUEUE_H_ -#define _TD_TSDB_COMMIT_QUEUE_H_ - -typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T; - -int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); - -#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ \ No newline at end of file diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c deleted file mode 100644 index 59fb4f334d3006eb7e8807ce193d61905f2322d2..0000000000000000000000000000000000000000 --- a/src/tsdb/src/tsdbCommitQueue.c +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "tsdbint.h" - -typedef struct { - bool stop; - pthread_mutex_t lock; - pthread_cond_t queueNotEmpty; - int nthreads; - int refCount; - SList * queue; - pthread_t * threads; -} SCommitQueue; - -typedef struct { - TSDB_REQ_T req; - STsdbRepo *pRepo; -} SReq; - -static void *tsdbLoopCommit(void *arg); - -static SCommitQueue tsCommitQueue = {0}; - -int tsdbInitCommitQueue() { - int nthreads = tsNumOfCommitThreads; - SCommitQueue *pQueue = &tsCommitQueue; - - if (nthreads < 1) nthreads = 1; - - pQueue->stop = false; - pQueue->nthreads = nthreads; - - pQueue->queue = tdListNew(0); - if (pQueue->queue == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t)); - if (pQueue->threads == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - tdListFree(pQueue->queue); - return -1; - } - - pthread_mutex_init(&(pQueue->lock), NULL); - pthread_cond_init(&(pQueue->queueNotEmpty), NULL); - - for (int i = 0; i < nthreads; i++) { - pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL); - } - - return 0; -} - -void tsdbDestroyCommitQueue() { - SCommitQueue *pQueue = &tsCommitQueue; - - pthread_mutex_lock(&(pQueue->lock)); - - if (pQueue->stop) { - pthread_mutex_unlock(&(pQueue->lock)); - return; - } - - pQueue->stop = true; - pthread_cond_broadcast(&(pQueue->queueNotEmpty)); - - pthread_mutex_unlock(&(pQueue->lock)); - - for (size_t i = 0; i < pQueue->nthreads; i++) { - pthread_join(pQueue->threads[i], NULL); - } - - free(pQueue->threads); - tdListFree(pQueue->queue); - pthread_cond_destroy(&(pQueue->queueNotEmpty)); - pthread_mutex_destroy(&(pQueue->lock)); -} - -int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { - SCommitQueue *pQueue = &tsCommitQueue; - - SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq)); - if (pNode == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - ((SReq *)pNode->data)->req = req; - ((SReq *)pNode->data)->pRepo = pRepo; - - pthread_mutex_lock(&(pQueue->lock)); - - // ASSERT(pQueue->stop); - - tdListAppendNode(pQueue->queue, pNode); - pthread_cond_signal(&(pQueue->queueNotEmpty)); - - pthread_mutex_unlock(&(pQueue->lock)); - return 0; -} - -static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { - pthread_mutex_lock(&pRepo->save_mutex); - - pRepo->config_changed = false; - STsdbCfg * pSaveCfg = &pRepo->save_config; - STsdbCfg oldCfg; - int32_t oldTotalBlocks = pRepo->config.totalBlocks; - - memcpy(&oldCfg, &(pRepo->config), sizeof(STsdbCfg)); - - pRepo->config.compression = pRepo->save_config.compression; - pRepo->config.keep = pRepo->save_config.keep; - pRepo->config.keep1 = pRepo->save_config.keep1; - pRepo->config.keep2 = pRepo->save_config.keep2; - pRepo->config.cacheLastRow = pRepo->save_config.cacheLastRow; - pRepo->config.totalBlocks = pRepo->save_config.totalBlocks; - - pthread_mutex_unlock(&pRepo->save_mutex); - - tsdbInfo("vgId:%d apply new config: compression(%d), keep(%d,%d,%d), totalBlocks(%d), cacheLastRow(%d->%d),totalBlocks(%d->%d)", - REPO_ID(pRepo), - pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->totalBlocks, oldCfg.cacheLastRow, pSaveCfg->cacheLastRow, oldTotalBlocks, pSaveCfg->totalBlocks); - - int err = tsdbExpandPool(pRepo, oldTotalBlocks); - if (!TAOS_SUCCEEDED(err)) { - tsdbError("vgId:%d expand pool from %d to %d fail,reason:%s", - REPO_ID(pRepo), oldTotalBlocks, pSaveCfg->totalBlocks, tstrerror(err)); - } - - if (oldCfg.cacheLastRow != pRepo->config.cacheLastRow) { - if (tsdbLockRepo(pRepo) < 0) return; - tsdbCacheLastData(pRepo, &oldCfg); - tsdbUnlockRepo(pRepo); - } - -} - -static void *tsdbLoopCommit(void *arg) { - SCommitQueue *pQueue = &tsCommitQueue; - SListNode * pNode = NULL; - STsdbRepo * pRepo = NULL; - TSDB_REQ_T req; - - setThreadName("tsdbCommit"); - - while (true) { - pthread_mutex_lock(&(pQueue->lock)); - - while (true) { - pNode = tdListPopHead(pQueue->queue); - if (pNode == NULL) { - if (pQueue->stop && pQueue->refCount <= 0) { - pthread_mutex_unlock(&(pQueue->lock)); - goto _exit; - } else { - pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock)); - } - } else { - break; - } - } - - pthread_mutex_unlock(&(pQueue->lock)); - - req = ((SReq *)pNode->data)->req; - pRepo = ((SReq *)pNode->data)->pRepo; - - if (req == COMMIT_REQ) { - tsdbCommitData(pRepo); - } else if (req == COMPACT_REQ) { - tsdbCompactImpl(pRepo); - } else if (req == COMMIT_CONFIG_REQ) { - ASSERT(pRepo->config_changed); - tsdbApplyRepoConfig(pRepo); - tsem_post(&(pRepo->readyToCommit)); - } else { - ASSERT(0); - } - - listNodeFree(pNode); - } - -_exit: - return NULL; -} - -void tsdbIncCommitRef(int vgId) { - int refCount = atomic_add_fetch_32(&tsCommitQueue.refCount, 1); - tsdbDebug("vgId:%d, inc commit queue ref to %d", vgId, refCount); -} - -void tsdbDecCommitRef(int vgId) { - int refCount = atomic_sub_fetch_32(&tsCommitQueue.refCount, 1); - pthread_cond_broadcast(&(tsCommitQueue.queueNotEmpty)); - tsdbDebug("vgId:%d, dec commit queue ref to %d", vgId, refCount); -}