未验证 提交 aae32164 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #4193 from taosdata/feature/TD-2004

Feature/td 2004
......@@ -321,6 +321,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue(int nthreads);
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo);
#ifdef __cplusplus
}
#endif
......
......@@ -220,8 +220,7 @@ typedef struct {
SMemTable* mem;
SMemTable* imem;
STsdbFileH* tsdbFileH;
int commit;
pthread_t commitThread;
sem_t readyToCommit;
pthread_mutex_t mutex;
bool repoLocked;
} STsdbRepo;
......@@ -440,6 +439,7 @@ void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
int tsdbAsyncCommit(STsdbRepo* pRepo);
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
void* tsdbCommitData(STsdbRepo* pRepo);
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
if (pIter == NULL) return NULL;
......@@ -588,6 +588,9 @@ int tsdbScanSCompBlock(STsdbScanHandle* pScanHandle, int idx);
int tsdbCloseScanFile(STsdbScanHandle* pScanHandle);
void tsdbFreeScanHandle(STsdbScanHandle* pScanHandle);
// ------------------ tsdbCommitQueue.c
int tsdbScheduleCommit(STsdbRepo *pRepo);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tlist.h"
#include "tsdbMain.h"
typedef struct {
bool stop;
pthread_mutex_t lock;
pthread_cond_t queueNotEmpty;
int nthreads;
SList * queue;
pthread_t * threads;
} SCommitQueue;
typedef struct {
STsdbRepo *pRepo;
} SCommitReq;
static void *tsdbLoopCommit(void *arg);
SCommitQueue tsCommitQueue = {0};
int tsdbInitCommitQueue(int nthreads) {
SCommitQueue *pQueue = &tsCommitQueue;
if (nthreads < 1) nthreads = 1;
pQueue->stop = false;
pQueue->nthreads = nthreads;
pQueue->queue = tdListNew(0);
if (pQueue->queue == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
pQueue->threads = (pthread_t *)calloc(nthreads, sizeof(pthread_t));
if (pQueue->threads == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tdListFree(pQueue->queue);
return -1;
}
pthread_mutex_init(&(pQueue->lock), NULL);
pthread_cond_init(&(pQueue->queueNotEmpty), NULL);
for (int i = 0; i < nthreads; i++) {
pthread_create(pQueue->threads + i, NULL, tsdbLoopCommit, NULL);
}
return 0;
}
void tsdbDestroyCommitQueue() {
SCommitQueue *pQueue = &tsCommitQueue;
pthread_mutex_lock(&(pQueue->lock));
if (pQueue->stop) {
pthread_mutex_unlock(&(pQueue->lock));
return;
}
pQueue->stop = true;
pthread_cond_broadcast(&(pQueue->queueNotEmpty));
pthread_mutex_unlock(&(pQueue->lock));
for (size_t i = 0; i < pQueue->nthreads; i++) {
pthread_join(pQueue->threads[i], NULL);
}
free(pQueue->threads);
tdListFree(pQueue->queue);
pthread_cond_destroy(&(pQueue->queueNotEmpty));
pthread_mutex_destroy(&(pQueue->lock));
}
int tsdbScheduleCommit(STsdbRepo *pRepo) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq));
if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
((SCommitReq *)pNode->data)->pRepo = pRepo;
pthread_mutex_lock(&(pQueue->lock));
ASSERT(!pQueue->stop);
tdListAppendNode(pQueue->queue, pNode);
pthread_cond_signal(&(pQueue->queueNotEmpty));
pthread_mutex_unlock(&(pQueue->lock));
return 0;
}
static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL;
while (true) {
pthread_mutex_lock(&(pQueue->lock));
while (true) {
pNode = tdListPopHead(pQueue->queue);
if (pNode == NULL) {
if (pQueue->stop) {
pthread_mutex_unlock(&(pQueue->lock));
goto _exit;
} else {
pthread_cond_wait(&(pQueue->queueNotEmpty), &(pQueue->lock));
}
} else {
break;
}
}
pthread_mutex_unlock(&(pQueue->lock));
pRepo = ((SCommitReq *)pNode->data)->pRepo;
tsdbCommitData(pRepo);
listNodeFree(pNode);
}
_exit:
return NULL;
}
......@@ -163,7 +163,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if (toCommit) {
tsdbAsyncCommit(pRepo);
if (pRepo->commit) pthread_join(pRepo->commitThread, NULL);
sem_wait(&(pRepo->readyToCommit));
}
tsdbUnRefMemTable(pRepo, pRepo->mem);
tsdbUnRefMemTable(pRepo, pRepo->imem);
......@@ -675,6 +675,12 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto _err;
}
code = sem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
goto _err;
}
pRepo->repoLocked = false;
pRepo->rootDir = strdup(rootDir);
......@@ -719,6 +725,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tfree(pRepo->rootDir);
sem_destroy(&(pRepo->readyToCommit));
pthread_mutex_destroy(&pRepo->mutex);
free(pRepo);
}
......
......@@ -24,7 +24,6 @@ static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data);
static void * tsdbCommitData(void *arg);
static int tsdbCommitMeta(STsdbRepo *pRepo);
static void tsdbEndCommit(STsdbRepo *pRepo);
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
......@@ -262,40 +261,28 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int tsdbAsyncCommit(STsdbRepo *pRepo) {
SMemTable *pIMem = pRepo->imem;
int code = 0;
if (pIMem != NULL) {
ASSERT(pRepo->commit);
tsdbDebug("vgId:%d waiting for the commit thread", REPO_ID(pRepo));
code = pthread_join(pRepo->commitThread, NULL);
tsdbDebug("vgId:%d commit thread is finished", REPO_ID(pRepo));
if (code != 0) {
tsdbError("vgId:%d failed to thread join since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
pRepo->commit = 0;
}
ASSERT(pRepo->commit == 0);
if (pRepo->mem != NULL) {
sem_wait(&(pRepo->readyToCommit));
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem;
pRepo->mem = NULL;
pRepo->commit = 1;
code = pthread_create(&pRepo->commitThread, NULL, tsdbCommitData, (void *)pRepo);
if (code != 0) {
tsdbError("vgId:%d failed to create commit thread since %s", REPO_ID(pRepo), strerror(errno));
terrno = TAOS_SYSTEM_ERROR(code);
tsdbUnlockRepo(pRepo);
return -1;
}
tsdbScheduleCommit(pRepo);
if (tsdbUnlockRepo(pRepo) < 0) return -1;
}
if (pIMem && tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
return 0;
}
int tsdbSyncCommit(TSDB_REPO_T *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
tsdbAsyncCommit(pRepo);
sem_wait(&(pRepo->readyToCommit));
sem_post(&(pRepo->readyToCommit));
return 0;
}
......@@ -419,6 +406,68 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
return 0;
}
void *tsdbCommitData(STsdbRepo *pRepo) {
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
// ---------------- LOCAL FUNCTIONS ----------------
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
ASSERT(pRepo->mem != NULL);
......@@ -529,69 +578,6 @@ static void tsdbFreeTableData(STableData *pTableData) {
static char *tsdbGetTsTupleKey(const void *data) { return dataRowTuple((SDataRow)data); }
static void *tsdbCommitData(void *arg) {
STsdbRepo * pRepo = (STsdbRepo *)arg;
SMemTable * pMem = pRepo->imem;
STsdbCfg * pCfg = &pRepo->config;
SDataCols * pDataCols = NULL;
STsdbMeta * pMeta = pRepo->tsdbMeta;
SCommitIter *iters = NULL;
SRWHelper whelper = {0};
ASSERT(pRepo->commit == 1);
ASSERT(pMem != NULL);
tsdbInfo("vgId:%d start to commit! keyFirst %" PRId64 " keyLast %" PRId64 " numOfRows %" PRId64, REPO_ID(pRepo),
pMem->keyFirst, pMem->keyLast, pMem->numOfRows);
// Create the iterator to read from cache
if (pMem->numOfRows > 0) {
iters = tsdbCreateCommitIters(pRepo);
if (iters == NULL) {
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) {
tsdbError("vgId:%d failed to init write helper since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
goto _exit;
}
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
goto _exit;
}
}
}
// Commit to update meta file
if (tsdbCommitMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to commit data while committing meta data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _exit;
}
tsdbFitRetention(pRepo);
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyCommitIters(iters, pMem->maxTables);
tsdbDestroyHelper(&whelper);
tsdbEndCommit(pRepo);
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
return NULL;
}
static int tsdbCommitMeta(STsdbRepo *pRepo) {
SMemTable *pMem = pRepo->imem;
......@@ -642,8 +628,8 @@ _err:
}
static void tsdbEndCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->commit == 1);
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_OVER);
sem_post(&(pRepo->readyToCommit));
}
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) {
......
......@@ -28,6 +28,8 @@
#include "vnodeCfg.h"
#include "vnodeVersion.h"
#define DEFAULT_COMMIT_THREADS 1
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
......@@ -67,10 +69,17 @@ int32_t vnodeInitResources() {
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (tsdbInitCommitQueue(DEFAULT_COMMIT_THREADS) < 0) {
vError("failed to init vnode commit queue");
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
tsdbDestroyCommitQueue();
if (tsVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsVnodesHash);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册