提交 e710802f 编写于 作者: H Hongze Cheng

more code

上级 fef89768
...@@ -61,7 +61,7 @@ struct SVBufPoolNode { ...@@ -61,7 +61,7 @@ struct SVBufPoolNode {
}; };
struct SVBufPool { struct SVBufPool {
SVBufPool* next; SVBufPool* freeNext;
SVnode* pVnode; SVnode* pVnode;
TdThreadSpinlock* lock; TdThreadSpinlock* lock;
volatile int32_t nRef; volatile int32_t nRef;
......
...@@ -87,6 +87,8 @@ typedef struct SCommitInfo SCommitInfo; ...@@ -87,6 +87,8 @@ typedef struct SCommitInfo SCommitInfo;
#define VNODE_RSMA1_DIR "rsma1" #define VNODE_RSMA1_DIR "rsma1"
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VNODE_BUFPOOL_SEGMENTS 3
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
// vnd.h // vnd.h
...@@ -326,16 +328,21 @@ struct STsdbKeepCfg { ...@@ -326,16 +328,21 @@ struct STsdbKeepCfg {
}; };
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
SVStatis statis; SVStatis statis;
STfs* pTfs; STfs* pTfs;
SMsgCb msgCb; SMsgCb msgCb;
// Buffer Pool
TdThreadMutex mutex; TdThreadMutex mutex;
TdThreadCond poolNotEmpty; TdThreadCond poolNotEmpty;
SVBufPool* pPool; SVBufPool* aBufPool[VNODE_BUFPOOL_SEGMENTS];
SVBufPool* freeList;
SVBufPool* inUse; SVBufPool* inUse;
SVBufPool* recycling;
SMeta* pMeta; SMeta* pMeta;
SSma* pSma; SSma* pSma;
STsdb* pTsdb; STsdb* pTsdb;
......
...@@ -16,8 +16,6 @@ ...@@ -16,8 +16,6 @@
#include "vnd.h" #include "vnd.h"
/* ------------------------ STRUCTURES ------------------------ */ /* ------------------------ STRUCTURES ------------------------ */
#define VNODE_BUFPOOL_SEGMENTS 3
static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) { static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool; SVBufPool *pPool;
...@@ -44,7 +42,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) ...@@ -44,7 +42,7 @@ static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool)
pPool->lock = NULL; pPool->lock = NULL;
} }
pPool->next = NULL; pPool->freeNext = NULL;
pPool->pVnode = pVnode; pPool->pVnode = pVnode;
pPool->nRef = 0; pPool->nRef = 0;
pPool->size = 0; pPool->size = 0;
...@@ -69,22 +67,21 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { ...@@ -69,22 +67,21 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
} }
int vnodeOpenBufPool(SVnode *pVnode) { int vnodeOpenBufPool(SVnode *pVnode) {
SVBufPool *pPool = NULL; int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
int64_t size = pVnode->config.szBuf / VNODE_BUFPOOL_SEGMENTS;
ASSERT(pVnode->pPool == NULL); ASSERT(pVnode->freeList == NULL);
for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) { for (int i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
// create pool // create pool
if (vnodeBufPoolCreate(pVnode, size, &pPool)) { if (vnodeBufPoolCreate(pVnode, size, &pVnode->aBufPool[i])) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
return -1; return -1;
} }
// add pool to vnode // add to free list
pPool->next = pVnode->pPool; pVnode->aBufPool[i]->freeNext = pVnode->freeList;
pVnode->pPool = pPool; pVnode->freeList = pVnode->aBufPool[i];
} }
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size); vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
...@@ -92,19 +89,18 @@ int vnodeOpenBufPool(SVnode *pVnode) { ...@@ -92,19 +89,18 @@ int vnodeOpenBufPool(SVnode *pVnode) {
} }
int vnodeCloseBufPool(SVnode *pVnode) { int vnodeCloseBufPool(SVnode *pVnode) {
SVBufPool *pPool; for (int32_t i = 0; i < VNODE_BUFPOOL_SEGMENTS; i++) {
if (pVnode->aBufPool[i]) {
for (pPool = pVnode->pPool; pPool; pPool = pVnode->pPool) { vnodeBufPoolDestroy(pVnode->aBufPool[i]);
pVnode->pPool = pPool->next; pVnode->aBufPool[i] = NULL;
vnodeBufPoolDestroy(pPool); }
} }
if (pVnode->inUse) { pVnode->freeList = NULL;
vnodeBufPoolDestroy(pVnode->inUse); ASSERT(pVnode->inUse == NULL);
pVnode->inUse = NULL; ASSERT(pVnode->recycling == NULL);
}
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
vDebug("vgId:%d, vnode buffer pool is closed", TD_VID(pVnode));
return 0; return 0;
} }
...@@ -240,8 +236,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) { ...@@ -240,8 +236,8 @@ void vnodeBufPoolUnRef(SVBufPool *pPool) {
} }
} }
pPool->next = pVnode->pPool; pPool->freeNext = pVnode->freeList;
pVnode->pPool = pPool; pVnode->freeList = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty); taosThreadCondSignal(&pVnode->poolNotEmpty);
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
......
...@@ -25,14 +25,14 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -25,14 +25,14 @@ int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool // alloc buffer pool
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
while (pVnode->pPool == NULL) { while (pVnode->freeList == NULL) {
taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex); taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
} }
pVnode->inUse = pVnode->pPool; pVnode->inUse = pVnode->freeList;
pVnode->inUse->nRef = 1; pVnode->inUse->nRef = 1;
pVnode->pPool = pVnode->inUse->next; pVnode->freeList = pVnode->inUse->freeNext;
pVnode->inUse->next = NULL; pVnode->inUse->freeNext = NULL;
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
......
...@@ -238,7 +238,7 @@ _err: ...@@ -238,7 +238,7 @@ _err:
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb); if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
if (pVnode->pSma) smaClose(pVnode->pSma); if (pVnode->pSma) smaClose(pVnode->pSma);
if (pVnode->pMeta) metaClose(pVnode->pMeta); if (pVnode->pMeta) metaClose(pVnode->pMeta);
if (pVnode->pPool) vnodeCloseBufPool(pVnode); if (pVnode->freeList) vnodeCloseBufPool(pVnode);
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册