提交 a7941cf2 编写于 作者: H Hongze Cheng

refact meta 2

上级 88793e2f
......@@ -86,9 +86,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId;
strcpy(pCfg->dbname, pCreate->db);
pCfg->wsize = pCreate->cacheBlockSize * 1024 * 1024;
pCfg->ssize = 1024;
pCfg->lsize = 1024 * 1024;
pCfg->szBuf = pCreate->cacheBlockSize * 1024 * 1024;
pCfg->streamMode = pCreate->streamMode;
pCfg->isWeak = true;
pCfg->tsdbCfg.keep2 = pCreate->daysToKeep0;
......
......@@ -5,9 +5,9 @@ target_sources(
PRIVATE
# vnode
"src/vnd/vnodeOpen.c"
"src/vnd/vnodeArenaMAImpl.c"
"src/vnd/vnodeBufferPool.c"
# "src/vnd/vnodeBufferPool2.c"
# "src/vnd/vnodeArenaMAImpl.c"
# "src/vnd/vnodeBufferPool.c"
"src/vnd/vnodeBufPool.c"
"src/vnd/vnodeCfg.c"
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c"
......@@ -27,6 +27,7 @@ target_sources(
# tsdb
"src/tsdb/tsdbTDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbCompact.c"
"src/tsdb/tsdbFile.c"
"src/tsdb/tsdbFS.c"
......
......@@ -123,6 +123,7 @@ struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
int8_t slLevel;
int32_t days;
int32_t minRows;
int32_t maxRows;
......@@ -139,10 +140,8 @@ struct SVnodeCfg {
uint64_t dbId;
int32_t szPage;
int32_t szCache;
uint64_t wsize;
uint64_t ssize;
uint64_t lsize;
bool isHeapAllocator;
uint64_t szBuf;
bool isHeap;
uint32_t ttl;
uint32_t keep;
int8_t streamMode;
......
......@@ -250,7 +250,7 @@ int tqInit();
void tqCleanUp();
// open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac);
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta);
void tqClose(STQ*);
// required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
......
......@@ -30,21 +30,38 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// tsdbMemTable ================
typedef struct STbData STbData;
typedef struct STsdbMemTable STsdbMemTable;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
// tsdbCommit ================
int tsdbBegin(STsdb *pTsdb);
#if 1
typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaEnvs SSmaEnvs;
typedef struct STable {
struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
};
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
void tsdbClose(STsdb *);
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb);
int tsdbClose(STsdb *pTsdb);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
......@@ -110,25 +127,28 @@ typedef struct {
TSKEY minKey;
} SRtn;
typedef struct STbData {
struct STbData {
tb_uid_t uid;
TSKEY keyMin;
TSKEY keyMax;
int64_t minVer;
int64_t maxVer;
int64_t nrows;
SSkipList *pData;
} STbData;
};
typedef struct STsdbMemTable {
struct STsdbMemTable {
SVBufPool *pPool;
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyMin;
TSKEY keyMax;
uint64_t nRow;
SMemAllocator *pMA;
// Container
SRWLatch latch;
TSKEY keyMin;
TSKEY keyMax;
int64_t minVer;
int64_t maxVer;
int64_t nRow;
SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj *pHashIdx;
} STsdbMemTable;
};
typedef struct {
uint32_t version; // Commit version from 0 to increase
......@@ -154,18 +174,17 @@ typedef struct {
} STsdbFS;
struct STsdb {
int32_t vgId;
SVnode *pVnode;
bool repoLocked;
TdThreadMutex mutex;
char *path;
STsdbCfg config;
STsdbMemTable *mem;
STsdbMemTable *imem;
SRtn rtn;
SMemAllocatorFactory *pmaf;
STsdbFS *fs;
SSmaEnvs smaEnvs;
char *path;
SVnode *pVnode;
int32_t vgId;
bool repoLocked;
TdThreadMutex mutex;
STsdbCfg config;
STsdbMemTable *mem;
STsdbMemTable *imem;
SRtn rtn;
STsdbFS *fs;
SSmaEnvs smaEnvs;
};
#define REPO_ID(r) ((r)->vgId)
......@@ -187,7 +206,7 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock,
}
// tsdbMemTable.h
typedef struct {
struct SMergeInfo {
int rowsInserted;
int rowsUpdated;
int rowsDeleteSucceed;
......@@ -195,7 +214,7 @@ typedef struct {
int nOperations;
TSKEY keyFirst;
TSKEY keyLast;
} SMergeInfo;
};
static void *taosTMalloc(size_t size);
static void *taosTCalloc(size_t nmemb, size_t size);
......@@ -204,12 +223,6 @@ static void *taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) {
if (pIter == NULL) return NULL;
......@@ -988,6 +1001,8 @@ static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void
return len;
}
#endif
#ifdef __cplusplus
}
#endif
......
......@@ -40,22 +40,31 @@ int vnodeDecodeConfig(const SJson* pJson, void* pObj);
// vnodeModule ====================
int vnodeScheduleTask(int (*execute)(void*), void* arg);
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ====================
int vnodeBegin(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
#define vnodeShouldCommit vnodeBufPoolIsFull
// vnodeBufPool ====================
#if 1
typedef struct SVBufPoolNode SVBufPoolNode;
struct SVBufPoolNode {
SVBufPoolNode* prev;
SVBufPoolNode** pnext;
int64_t size;
uint8_t data[];
};
struct SVBufPool {
SVBufPool* next;
int64_t nRef;
int64_t size;
uint8_t* ptr;
SVBufPoolNode* pTail;
SVBufPoolNode node;
};
int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
int vnodeCloseBufPool(SVnode* pVnode);
void vnodeBufPoolReset(SVBufPool* pPool);
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
#else
// SVBufPool
int vnodeOpenBufPool(SVnode* pVnode);
void vnodeCloseBufPool(SVnode* pVnode);
......@@ -90,9 +99,21 @@ void vmaReset(SVMemAllocator* pVMA);
void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size);
void vmaFree(SVMemAllocator* pVMA, void* ptr);
bool vmaIsFull(SVMemAllocator* pVMA);
#endif
// vnodeQuery ====================
int vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit ====================
int vnodeBegin(SVnode* pVnode);
int vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
int vnodeSyncCommit(SVnode* pVnode);
int vnodeAsyncCommit(SVnode* pVnode);
#ifdef __cplusplus
}
#endif
......
......@@ -91,7 +91,10 @@ struct SVnode {
SVState state;
STfs* pTfs;
SMsgCb msgCb;
SVBufPool* pBufPool;
SVBufPool* pPool;
SVBufPool* inUse;
SVBufPool* onCommit;
SVBufPool* onRecycle;
SMeta* pMeta;
STsdb* pTsdb;
SWal* pWal;
......
......@@ -16,6 +16,9 @@
#include "vnodeInt.h"
int metaBegin(SMeta *pMeta) {
// TODO
if (tdbBegin(pMeta->pEnv, NULL) < 0) {
return -1;
}
return 0;
}
......@@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) {
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ));
if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
......@@ -29,13 +29,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
pTq->pVnode = pVnode;
pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta;
#if 0
pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
}
#endif
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
......
......@@ -239,7 +239,7 @@ static void tsdbStartCommit(STsdb *pRepo) {
static void tsdbEndCommit(STsdb *pTsdb, int eno) {
tsdbEndFSTxn(pTsdb);
tsdbFreeMemTable(pTsdb, pTsdb->imem);
tsdbMemTableDestroy(pTsdb, pTsdb->imem);
pTsdb->imem = NULL;
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int tsdbBegin(STsdb *pTsdb) {
STsdbMemTable *pMem;
if (tsdbMemTableCreate(pTsdb, &pTsdb->mem) < 0) {
return -1;
}
return 0;
}
......@@ -15,94 +15,56 @@
#include "vnodeInt.h"
static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF);
static void tsdbFree(STsdb *pTsdb);
static int tsdbOpenImpl(STsdb *pTsdb);
static void tsdbCloseImpl(STsdb *pTsdb);
STsdb *tsdbOpen(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
STsdb *pTsdb = NULL;
int slen = 0;
// Set default TSDB Options
// if (pTsdbCfg == NULL) {
pTsdbCfg = &defautlTsdbOptions;
// }
*ppTsdb = NULL;
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(VNODE_TSDB_DIR) + 3;
// Validate the options
if (tsdbValidateOptions(pTsdbCfg) < 0) {
// TODO: handle error
return NULL;
}
// Create the handle
pTsdb = tsdbNew(path, pVnode, pTsdbCfg, pMAF);
// create handle
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
taosMkDir(path);
// Open the TSDB
if (tsdbOpenImpl(pTsdb) < 0) {
// TODO: handle error
return NULL;
}
pTsdb->path = (char *)&pTsdb[1];
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
VNODE_TSDB_DIR);
pTsdb->pVnode = pVnode;
pTsdb->vgId = TD_VID(pVnode);
pTsdb->repoLocked = false;
tdbMutexInit(&pTsdb->mutex, NULL);
pTsdb->config = pVnode->config.tsdbCfg;
pTsdb->fs = tsdbNewFS(&pTsdb->config);
return pTsdb;
}
// create dir (TODO: use tfsMkdir)
taosMkDir(pTsdb->path);
void tsdbClose(STsdb *pTsdb) {
if (pTsdb) {
tsdbCloseImpl(pTsdb);
tsdbFree(pTsdb);
// open tsdb
if (tsdbOpenFS(pTsdb) < 0) {
goto _err;
}
}
/* ------------------------ STATIC METHODS ------------------------ */
static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF) {
STsdb *pTsdb = NULL;
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(STsdb));
if (pTsdb == NULL) {
// TODO: handle error
return NULL;
}
tsdbDebug("vgId: %d tsdb is opened", TD_VID(pVnode));
pTsdb->path = strdup(path);
pTsdb->vgId = TD_VID(pVnode);
pTsdb->pVnode = pVnode;
tsdbOptionsCopy(&(pTsdb->config), pTsdbCfg);
pTsdb->pmaf = pMAF;
pTsdb->fs = tsdbNewFS(pTsdbCfg);
*ppTsdb = pTsdb;
return 0;
return pTsdb;
_err:
taosMemoryFree(pTsdb);
return -1;
}
static void tsdbFree(STsdb *pTsdb) {
int tsdbClose(STsdb *pTsdb) {
if (pTsdb) {
// tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
// tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
tsdbFreeFS(pTsdb->fs);
taosMemoryFreeClear(pTsdb->path);
tsdbCloseFS(pTsdb);
taosMemoryFree(pTsdb);
}
}
static int tsdbOpenImpl(STsdb *pTsdb) {
tsdbOpenFS(pTsdb);
// tsdbInitSma(pTsdb);
// TODO
return 0;
}
static void tsdbCloseImpl(STsdb *pTsdb) {
tsdbCloseFS(pTsdb);
// TODO
}
int tsdbLockRepo(STsdb *pTsdb) {
int code = taosThreadMutexLock(&pTsdb->mutex);
if (code != 0) {
......
......@@ -24,51 +24,46 @@ static int tsdbTbDataComp(const void *arg1, const void *arg2);
static char *tsdbTbDataGetUid(const void *arg);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row);
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) {
STsdbMemTable *pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable) {
STsdbMemTable *pMemTable;
SVnode *pVnode;
*ppMemTable = NULL;
pVnode = pTsdb->pVnode;
// alloc handle
pMemTable = (STsdbMemTable *)taosMemoryCalloc(1, sizeof(*pMemTable));
if (pMemTable == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return -1;
}
pMemTable->pPool = pTsdb->pVnode->inUse;
T_REF_INIT_VAL(pMemTable, 1);
taosInitRWLatch(&(pMemTable->latch));
pMemTable->keyMax = TSKEY_MIN;
taosInitRWLatch(&pMemTable->latch);
pMemTable->keyMin = TSKEY_MAX;
pMemTable->keyMax = TSKEY_MIN;
pMemTable->nRow = 0;
pMemTable->pMA = pTsdb->pmaf->create(pTsdb->pmaf);
if (pMemTable->pMA == NULL) {
taosMemoryFree(pMemTable);
return NULL;
}
// Initialize the container
pMemTable->pSlIdx =
tSkipListCreate(5, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t), tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
pMemTable->pSlIdx = tSkipListCreate(pVnode->config.tsdbCfg.slLevel, TSDB_DATA_TYPE_BIGINT, sizeof(tb_uid_t),
tsdbTbDataComp, SL_DISCARD_DUP_KEY, tsdbTbDataGetUid);
if (pMemTable->pSlIdx == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
taosMemoryFree(pMemTable);
return NULL;
return -1;
}
pMemTable->pHashIdx = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
if (pMemTable->pHashIdx == NULL) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
tSkipListDestroy(pMemTable->pSlIdx);
taosMemoryFree(pMemTable);
return NULL;
return -1;
}
return pMemTable;
return 0;
}
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable) {
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable) {
if (pMemTable) {
taosHashCleanup(pMemTable->pHashIdx);
tSkipListDestroy(pMemTable->pSlIdx);
if (pMemTable->pMA) {
pTsdb->pmaf->destroy(pTsdb->pmaf, pMemTable->pMA);
}
taosMemoryFree(pMemTable);
}
}
......
......@@ -25,11 +25,11 @@
*/
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
// Check if mem is there. If not, create one.
if (pTsdb->mem == NULL) {
pTsdb->mem = tsdbNewMemTable(pTsdb);
if (pTsdb->mem == NULL) {
return -1;
}
}
// if (pTsdb->mem == NULL) {
// pTsdb->mem = tsdbMemTableCreate(pTsdb);
// if (pTsdb->mem == NULL) {
// return -1;
// }
// }
return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, pRsp);
}
\ No newline at end of file
......@@ -17,7 +17,7 @@
/* ------------------------ STRUCTURES ------------------------ */
static int vnodeBufPoolCreate(int size, SVBufPool **ppPool);
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool);
static int vnodeBufPoolDestroy(SVBufPool *pPool);
int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
......@@ -30,17 +30,17 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
// create pool
ret = vnodeBufPoolCreate(size, &pPool);
if (ret < 0) {
vError("vgId:%d failed to open vnode buffer pool since %s", TD_VNODE_ID(pVnode), tstrerror(terrno));
vError("vgId:%d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode);
return -1;
}
// add pool to queue
// add pool to vnode
pPool->next = pVnode->pPool;
pVnode->pPool = pPool;
}
vDebug("vgId:%d vnode buffer pool is opened, pool size: %" PRId64, TD_VNODE_ID(pVnode), size);
vDebug("vgId:%d vnode buffer pool is opened, pool size: %" PRId64, TD_VID(pVnode), size);
return 0;
}
......@@ -53,7 +53,7 @@ int vnodeCloseBufPool(SVnode *pVnode) {
vnodeBufPoolDestroy(pPool);
}
vDebug("vgId:%d vnode buffer pool is closed", TD_VNODE_ID(pVnode));
vDebug("vgId:%d vnode buffer pool is closed", TD_VID(pVnode));
return 0;
}
......@@ -75,7 +75,7 @@ void vnodeBufPoolReset(SVBufPool *pPool) {
pPool->ptr = pPool->node.data;
}
void *vnodeBufPoolMalloc(SVBufPool *pPool, size_t size) {
void *vnodeBufPoolMalloc(SVBufPool *pPool, int size) {
SVBufPoolNode *pNode;
void *p;
......@@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
}
// STATIC METHODS -------------------
static int vnodeBufPoolCreate(int size, SVBufPool **ppPool) {
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
......
......@@ -21,10 +21,8 @@ const SVnodeCfg vnodeCfgDefault = {
.dbId = 0,
.szPage = 4096,
.szCache = 256,
.wsize = 96 * 1024 * 1024,
.ssize = 1 * 1024 * 1024,
.lsize = 1024,
.isHeapAllocator = false,
.szBuf = 96 * 1024 * 1024,
.isHeap = false,
.ttl = 0,
.keep = 0,
.streamMode = 0,
......@@ -32,6 +30,7 @@ const SVnodeCfg vnodeCfgDefault = {
.tsdbCfg = {.precision = TWO_STAGE_COMP,
.update = 0,
.compression = 2,
.slLevel = 5,
.days = 10,
.minRows = 100,
.maxRows = 4096,
......@@ -57,10 +56,8 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "dbId", pCfg->dbId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szPage", pCfg->szPage) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szCache", pCfg->szCache) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wsize", pCfg->wsize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "ssize", pCfg->ssize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "lsize", pCfg->lsize) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "ttl", pCfg->ttl) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep", pCfg->keep) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
......@@ -68,6 +65,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
......@@ -97,10 +95,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "dbId", pCfg->dbId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "szPage", pCfg->szPage) < 0) return -1;
if (tjsonGetNumberValue(pJson, "szCache", pCfg->szCache) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wsize", pCfg->wsize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "ssize", pCfg->ssize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "lsize", pCfg->lsize) < 0) return -1;
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeapAllocator) < 0) return -1;
if (tjsonGetNumberValue(pJson, "szBuf", pCfg->szBuf) < 0) return -1;
if (tjsonGetNumberValue(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
if (tjsonGetNumberValue(pJson, "ttl", pCfg->ttl) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep", pCfg->keep) < 0) return -1;
if (tjsonGetNumberValue(pJson, "streamMode", pCfg->streamMode) < 0) return -1;
......@@ -108,6 +104,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonGetNumberValue(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
if (tjsonGetNumberValue(pJson, "slLevel", pCfg->tsdbCfg.slLevel) < 0) return -1;
if (tjsonGetNumberValue(pJson, "daysPerFile", pCfg->tsdbCfg.days) < 0) return -1;
if (tjsonGetNumberValue(pJson, "minRows", pCfg->tsdbCfg.minRows) < 0) return -1;
if (tjsonGetNumberValue(pJson, "maxRows", pCfg->tsdbCfg.maxRows) < 0) return -1;
......
......@@ -26,7 +26,19 @@ static int vnodeCommit(void *arg);
static void vnodeWaitCommit(SVnode *pVnode);
int vnodeBegin(SVnode *pVnode) {
// begin buffer pool
// alloc buffer pool
/* pthread_mutex_lock(); */
while (pVnode->pPool == NULL) {
/* pthread_cond_wait(); */
}
pVnode->inUse = pVnode->pPool;
pVnode->pPool = pVnode->inUse->next;
pVnode->inUse->next = NULL;
/* ref pVnode->inUse buffer pool */
/* pthread_mutex_unlock(); */
// begin meta
if (metaBegin(pVnode->pMeta) < 0) {
......@@ -35,12 +47,10 @@ int vnodeBegin(SVnode *pVnode) {
}
// begin tsdb
#if 0
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId: %d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
#endif
return 0;
}
......@@ -162,7 +172,7 @@ _err:
int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit(pVnode);
vnodeBufPoolSwitch(pVnode);
// vnodeBufPoolSwitch(pVnode);
tsdbPrepareCommit(pVnode->pTsdb);
vnodeScheduleTask(vnodeCommit, pVnode);
......@@ -184,7 +194,7 @@ static int vnodeCommit(void *arg) {
tqCommit(pVnode->pTq);
tsdbCommit(pVnode->pTsdb);
vnodeBufPoolRecycle(pVnode);
// vnodeBufPoolRecycle(pVnode);
tsem_post(&(pVnode->canCommit));
return 0;
}
......
......@@ -83,7 +83,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
tsem_init(&(pVnode->canCommit), 0, 1);
// open buffer pool
if (vnodeOpenBufPool(pVnode) < 0) {
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
vError("vgId: %d failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
......@@ -95,9 +95,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
}
// open tsdb
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TSDB_DIR);
pVnode->pTsdb = tsdbOpen(tdir, pVnode, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode));
if (pVnode->pTsdb == NULL) {
if (tsdbOpen(pVnode, &pVnode->pTsdb) < 0) {
vError("vgId: %d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
......@@ -112,7 +110,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode));
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta);
if (pVnode->pTq == NULL) {
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
......
......@@ -46,7 +46,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
int ret;
if (pVnode->config.streamMode == 0) {
ptr = vnodeMalloc(pVnode, pMsg->contLen);
// ptr = vnodeMalloc(pVnode, pMsg->contLen);
if (ptr == NULL) {
// TODO: handle error
}
......@@ -125,7 +125,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pVnode->state.applied = version;
// Check if it needs to commit
if (vnodeShouldCommit(pVnode)) {
if (0 /*vnodeShouldCommit(pVnode)*/) {
// tsem_wait(&(pVnode->canCommit));
if (vnodeAsyncCommit(pVnode) < 0) {
// TODO: handle error
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册