提交 a5d6851c 编写于 作者: H Hongze Cheng

implement compact interfaces

上级 cc5e1ebb
...@@ -355,6 +355,9 @@ void tsdbDecCommitRef(int vgId); ...@@ -355,6 +355,9 @@ void tsdbDecCommitRef(int vgId);
int tsdbSyncSend(void *pRepo, SOCKET socketFd); int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd); int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -16,6 +16,8 @@ ...@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_ #ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_ #define _TD_TSDB_COMMIT_QUEUE_H_
int tsdbScheduleCommit(STsdbRepo *pRepo); typedef enum { COMMIT_REQ, COMPACT_REQ } TSDB_REQ_T;
int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req);
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
#ifdef __cplusplus
extern "C" {
#endif
void *tsdbCompactImpl(STsdbRepo *pRepo);
#ifdef __cplusplus
}
#endif
#endif /* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
...@@ -64,6 +64,8 @@ extern "C" { ...@@ -64,6 +64,8 @@ extern "C" {
#include "tsdbReadImpl.h" #include "tsdbReadImpl.h"
// Commit // Commit
#include "tsdbCommit.h" #include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Commit Queue // Commit Queue
#include "tsdbCommitQueue.h" #include "tsdbCommitQueue.h"
// Main definitions // Main definitions
......
...@@ -26,8 +26,9 @@ typedef struct { ...@@ -26,8 +26,9 @@ typedef struct {
} SCommitQueue; } SCommitQueue;
typedef struct { typedef struct {
TSDB_REQ_T req;
STsdbRepo *pRepo; STsdbRepo *pRepo;
} SCommitReq; } SReq;
static void *tsdbLoopCommit(void *arg); static void *tsdbLoopCommit(void *arg);
...@@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() { ...@@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy(&(pQueue->lock)); pthread_mutex_destroy(&(pQueue->lock));
} }
int tsdbScheduleCommit(STsdbRepo *pRepo) { int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SCommitReq)); SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq));
if (pNode == NULL) { if (pNode == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1; return -1;
} }
((SCommitReq *)pNode->data)->pRepo = pRepo; ((SReq *)pNode->data)->req = req;
((SReq *)pNode->data)->pRepo = pRepo;
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -116,6 +118,7 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -116,6 +118,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
SListNode * pNode = NULL; SListNode * pNode = NULL;
STsdbRepo * pRepo = NULL; STsdbRepo * pRepo = NULL;
TSDB_REQ_T req;
while (true) { while (true) {
pthread_mutex_lock(&(pQueue->lock)); pthread_mutex_lock(&(pQueue->lock));
...@@ -136,9 +139,17 @@ static void *tsdbLoopCommit(void *arg) { ...@@ -136,9 +139,17 @@ static void *tsdbLoopCommit(void *arg) {
pthread_mutex_unlock(&(pQueue->lock)); pthread_mutex_unlock(&(pQueue->lock));
pRepo = ((SCommitReq *)pNode->data)->pRepo; req = ((SReq *)pNode->data)->req;
pRepo = ((SReq *)pNode->data)->pRepo;
if (req == COMMIT_REQ) {
tsdbCommitData(pRepo);
} else if (req == COMPACT_REQ) {
tsdbCompactImpl(pRepo);
} else {
ASSERT(0);
}
tsdbCommitData(pRepo);
listNodeFree(pNode); listNodeFree(pNode);
} }
......
...@@ -11,4 +11,8 @@ ...@@ -11,4 +11,8 @@
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
\ No newline at end of file #include "tsdb.h"
int tsdbCompact(STsdbRepo *pRepo) { return 0; }
void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; }
\ No newline at end of file
...@@ -232,7 +232,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { ...@@ -232,7 +232,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if (tsdbLockRepo(pRepo) < 0) return -1; if (tsdbLockRepo(pRepo) < 0) return -1;
pRepo->imem = pRepo->mem; pRepo->imem = pRepo->mem;
pRepo->mem = NULL; pRepo->mem = NULL;
tsdbScheduleCommit(pRepo); tsdbScheduleCommit(pRepo, COMMIT_REQ);
if (tsdbUnlockRepo(pRepo) < 0) return -1; if (tsdbUnlockRepo(pRepo) < 0) return -1;
return 0; return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册