From a5d6851cbeeab12bda42348b58cd6370ecfc88f1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 24 Feb 2021 13:15:23 +0800 Subject: [PATCH] implement compact interfaces --- src/inc/tsdb.h | 3 +++ src/tsdb/inc/tsdbCommitQueue.h | 4 +++- src/tsdb/inc/tsdbCompact.h | 28 ++++++++++++++++++++++++++++ src/tsdb/inc/tsdbint.h | 2 ++ src/tsdb/src/tsdbCommitQueue.c | 23 +++++++++++++++++------ src/tsdb/src/tsdbCompact.c | 6 +++++- src/tsdb/src/tsdbMemTable.c | 2 +- 7 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 src/tsdb/inc/tsdbCompact.h diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index 78cd2927c7..d6c9fed971 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -355,6 +355,9 @@ void tsdbDecCommitRef(int vgId); int tsdbSyncSend(void *pRepo, SOCKET socketFd); int tsdbSyncRecv(void *pRepo, SOCKET socketFd); +// For TSDB Compact +int tsdbCompact(STsdbRepo *pRepo); + #ifdef __cplusplus } #endif diff --git a/src/tsdb/inc/tsdbCommitQueue.h b/src/tsdb/inc/tsdbCommitQueue.h index c2353391f9..6342c036b7 100644 --- a/src/tsdb/inc/tsdbCommitQueue.h +++ b/src/tsdb/inc/tsdbCommitQueue.h @@ -16,6 +16,8 @@ #ifndef _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_ -int tsdbScheduleCommit(STsdbRepo *pRepo); +typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T; + +int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbCompact.h b/src/tsdb/inc/tsdbCompact.h new file mode 100644 index 0000000000..5a382de5e0 --- /dev/null +++ b/src/tsdb/inc/tsdbCompact.h @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifndef _TD_TSDB_COMPACT_H_ +#define _TD_TSDB_COMPACT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +void *tsdbCompactImpl(STsdbRepo *pRepo); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_TSDB_COMPACT_H_ */ \ No newline at end of file diff --git a/src/tsdb/inc/tsdbint.h b/src/tsdb/inc/tsdbint.h index 074ff20f22..0a448d8194 100644 --- a/src/tsdb/inc/tsdbint.h +++ b/src/tsdb/inc/tsdbint.h @@ -64,6 +64,8 @@ extern "C" { #include "tsdbReadImpl.h" // Commit #include "tsdbCommit.h" +// Compact +#include "tsdbCompact.h" // Commit Queue #include "tsdbCommitQueue.h" // Main definitions diff --git a/src/tsdb/src/tsdbCommitQueue.c b/src/tsdb/src/tsdbCommitQueue.c index 9e8e4acd7e..d515faf861 100644 --- a/src/tsdb/src/tsdbCommitQueue.c +++ b/src/tsdb/src/tsdbCommitQueue.c @@ -26,8 +26,9 @@ typedef struct { } SCommitQueue; typedef struct { + TSDB_REQ_T req; STsdbRepo *pRepo; -} SCommitReq; +} SReq; static void *tsdbLoopCommit(void *arg); @@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() { pthread_mutex_destroy(&(pQueue->lock)); } -int tsdbScheduleCommit(STsdbRepo *pRepo) { +int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { SCommitQueue *pQueue = &tsCommitQueue; - SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); + SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq)); if (pNode == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; } - ((SCommitReq *)pNode->data)->pRepo = pRepo; + ((SReq *)pNode->data)->req = req; + ((SReq *)pNode->data)->pRepo = pRepo; pthread_mutex_lock(&(pQueue->lock)); @@ -116,6 +118,7 @@ static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; STsdbRepo * pRepo = NULL; + TSDB_REQ_T req; while (true) { pthread_mutex_lock(&(pQueue->lock)); @@ -136,9 +139,17 @@ static void *tsdbLoopCommit(void *arg) { pthread_mutex_unlock(&(pQueue->lock)); - pRepo = ((SCommitReq *)pNode->data)->pRepo; + req = ((SReq *)pNode->data)->req; + pRepo = ((SReq *)pNode->data)->pRepo; + + if (req == COMMIT_REQ) { + tsdbCommitData(pRepo); + } else if (req == COMPACT_REQ) { + tsdbCompactImpl(pRepo); + } else { + ASSERT(0); + } - tsdbCommitData(pRepo); listNodeFree(pNode); } diff --git a/src/tsdb/src/tsdbCompact.c b/src/tsdb/src/tsdbCompact.c index 6dea4a4e57..d8cd558424 100644 --- a/src/tsdb/src/tsdbCompact.c +++ b/src/tsdb/src/tsdbCompact.c @@ -11,4 +11,8 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ +#include "tsdb.h" + +int tsdbCompact(STsdbRepo *pRepo) { return 0; } +void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index 73a1270799..2e4a73e56b 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -232,7 +232,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { if (tsdbLockRepo(pRepo) < 0) return -1; pRepo->imem = pRepo->mem; pRepo->mem = NULL; - tsdbScheduleCommit(pRepo); + tsdbScheduleCommit(pRepo, COMMIT_REQ); if (tsdbUnlockRepo(pRepo) < 0) return -1; return 0; -- GitLab