提交 1edb6d5b 编写于 作者: H Hongze Cheng

more code

上级 b99e501a
...@@ -14,6 +14,7 @@ target_sources( ...@@ -14,6 +14,7 @@ target_sources(
"src/vnd/vnodeSvr.c" "src/vnd/vnodeSvr.c"
"src/vnd/vnodeSync.c" "src/vnd/vnodeSync.c"
"src/vnd/vnodeSnapshot.c" "src/vnd/vnodeSnapshot.c"
"src/vnd/vnodeCompact.c"
# meta # meta
"src/meta/metaOpen.c" "src/meta/metaOpen.c"
......
...@@ -94,6 +94,10 @@ int32_t vnodeSyncCommit(SVnode* pVnode); ...@@ -94,6 +94,10 @@ int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode); int32_t vnodeAsyncCommit(SVnode* pVnode);
bool vnodeShouldRollback(SVnode* pVnode); bool vnodeShouldRollback(SVnode* pVnode);
// vnodeCompact.c
int32_t vnodeAsyncCompact(SVnode* pVnode);
int32_t vnodeSyncCompact(SVnode* pVnode);
// vnodeSync.c // vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); int32_t vnodeSyncStart(SVnode* pVnode);
......
...@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader; ...@@ -76,6 +76,7 @@ typedef struct SRSmaSnapReader SRSmaSnapReader;
typedef struct SRSmaSnapWriter SRSmaSnapWriter; typedef struct SRSmaSnapWriter SRSmaSnapWriter;
typedef struct SSnapDataHdr SSnapDataHdr; typedef struct SSnapDataHdr SSnapDataHdr;
typedef struct SCommitInfo SCommitInfo; typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
#define VNODE_META_DIR "meta" #define VNODE_META_DIR "meta"
#define VNODE_TSDB_DIR "tsdb" #define VNODE_TSDB_DIR "tsdb"
...@@ -428,6 +429,11 @@ struct SCommitInfo { ...@@ -428,6 +429,11 @@ struct SCommitInfo {
SVnode* pVnode; SVnode* pVnode;
TXN* txn; TXN* txn;
}; };
struct SCompactInfo {
SVnodeInfo info;
SVnode* pVnode;
TXN* txn;
};
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -15,8 +15,8 @@ ...@@ -15,8 +15,8 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct { // typedef struct {
} SMemDIter; // } SMemDIter;
typedef struct { typedef struct {
SArray *aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
...@@ -34,6 +34,12 @@ typedef struct { ...@@ -34,6 +34,12 @@ typedef struct {
int32_t iRow; int32_t iRow;
} SSttDIter; } SSttDIter;
typedef struct {
int32_t flag;
SRowInfo rowInfo;
char handle[];
} STsdbDataIter;
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
STsdbFS fs; STsdbFS fs;
...@@ -47,6 +53,25 @@ typedef struct { ...@@ -47,6 +53,25 @@ typedef struct {
#define TSDB_FLG_DEEP_COMPACT 0x1 #define TSDB_FLG_DEEP_COMPACT 0x1
// ITER ========================= // ITER =========================
static int32_t tsdbDataIterOpen(STsdbDataIter *pIter) {
int32_t code = 0;
int32_t lino = 0;
// TODO
_exit:
return code;
}
static void tsdbDataIterClose(STsdbDataIter *pIter) {
// TODO
}
static int32_t tsdbDataIterNext(STsdbDataIter *pIter) {
int32_t code = 0;
int32_t lino = 0;
// TODO
_exit:
return code;
}
// COMPACT ========================= // COMPACT =========================
static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
...@@ -106,8 +131,6 @@ static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) { ...@@ -106,8 +131,6 @@ static int32_t tsdbDeepCompact(STsdbCompactor *pCompactor) {
code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet); code = tsdbDataFReaderOpen(&pCompactor->pReader, pTsdb, pCompactor->pDFileSet);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
//
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
......
...@@ -184,7 +184,7 @@ _err: ...@@ -184,7 +184,7 @@ _err:
return -1; return -1;
} }
static void vnodePrepareCommit(SVnode *pVnode) { void vnodePrepareCommit(SVnode *pVnode) {
tsem_wait(&pVnode->canCommit); tsem_wait(&pVnode->canCommit);
tsdbPrepareCommit(pVnode->pTsdb); tsdbPrepareCommit(pVnode->pTsdb);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
extern void vnodePrepareCommit(SVnode *pVnode);
#define vnodePrepareCompact vnodePrepareCommit
static int32_t vnodeCompactImpl(SCompactInfo *pInfo) {
int32_t code = 0;
// TODO
return code;
}
static int32_t vnodeCompactTask(void *param) {
int32_t code = 0;
SCompactInfo *pInfo = (SCompactInfo *)param;
// compact
vnodeCompactImpl(pInfo);
// end compact
tsem_post(&pInfo->pVnode->canCommit);
_exit:
taosMemoryFree(pInfo);
return code;
}
int32_t vnodeAsyncCompact(SVnode *pVnode) {
int32_t code = 0;
// prepare
vnodePrepareCompact(pVnode);
// schedule compact task
SCompactInfo *pInfo = taosMemoryCalloc(1, sizeof(*pInfo));
if (NULL == pInfo) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pInfo->info.config = pVnode->config;
pInfo->info.state.committed = pVnode->state.applied;
pInfo->info.state.commitTerm = pVnode->state.applyTerm;
pInfo->info.state.commitID = pVnode->state.commitID;
pInfo->pVnode = pVnode;
pInfo->txn = metaGetTxn(pVnode->pMeta);
vnodeScheduleTask(vnodeCompactTask, pInfo);
_exit:
if (code) {
vError("vgId:%d %s failed since %s", TD_VID(pInfo->pVnode), __func__, tstrerror(code));
}
return code;
}
int32_t vnodeSyncCompact(SVnode *pVnode) {
vnodeAsyncCompact(pVnode);
tsem_wait(&pVnode->canCommit);
tsem_post(&pVnode->canCommit);
return 0;
}
\ No newline at end of file
...@@ -299,6 +299,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -299,6 +299,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vnodeSyncCommit(pVnode); vnodeSyncCommit(pVnode);
vnodeBegin(pVnode); vnodeBegin(pVnode);
goto _exit; goto _exit;
case TDMT_VND_COMPACT:
vnodeAsyncCompact(pVnode);
vnodeBegin(pVnode);
goto _exit;
default: default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1; return -1;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册