提交 706b081f 编写于 作者: H Hongze Cheng

fix: r/w concurrency

上级 f3204ed9
...@@ -62,12 +62,13 @@ struct SVBufPoolNode { ...@@ -62,12 +62,13 @@ struct SVBufPoolNode {
}; };
struct SVBufPool { struct SVBufPool {
SVBufPool* next; SVBufPool* next;
int64_t nRef; SVnode* pVnode;
int64_t size; volatile int32_t nRef;
uint8_t* ptr; int64_t size;
SVBufPoolNode* pTail; uint8_t* ptr;
SVBufPoolNode node; SVBufPoolNode* pTail;
SVBufPoolNode node;
}; };
int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size); int32_t vnodeOpenBufPool(SVnode* pVnode, int64_t size);
...@@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool); ...@@ -78,7 +79,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
int32_t vnodeQueryOpen(SVnode* pVnode); int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
......
...@@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr; ...@@ -77,6 +77,8 @@ typedef struct SSnapDataHdr SSnapDataHdr;
// vnd.h // vnd.h
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
void vnodeBufPoolFree(SVBufPool* pPool, void* p); void vnodeBufPoolFree(SVBufPool* pPool, void* p);
void vnodeBufPoolRef(SVBufPool* pPool);
void vnodeBufPoolUnRef(SVBufPool* pPool);
// meta // meta
typedef struct SMCtbCursor SMCtbCursor; typedef struct SMCtbCursor SMCtbCursor;
...@@ -247,26 +249,26 @@ struct STsdbKeepCfg { ...@@ -247,26 +249,26 @@ struct STsdbKeepCfg {
}; };
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
SVState state; SVState state;
STfs* pTfs; STfs* pTfs;
SMsgCb msgCb; SMsgCb msgCb;
SVBufPool* pPool; TdThreadMutex mutex;
SVBufPool* inUse; TdThreadCond poolNotEmpty;
SVBufPool* onCommit; SVBufPool* pPool;
SVBufPool* onRecycle; SVBufPool* inUse;
SMeta* pMeta; SMeta* pMeta;
SSma* pSma; SSma* pSma;
STsdb* pTsdb; STsdb* pTsdb;
SWal* pWal; SWal* pWal;
STQ* pTq; STQ* pTq;
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
int64_t sync; int64_t sync;
int32_t blockCount; int32_t blockCount;
tsem_t syncSem; tsem_t syncSem;
SQHandle* pQuery; SQHandle* pQuery;
}; };
#define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define TD_VID(PVNODE) ((PVNODE)->config.vgId)
......
...@@ -55,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) { ...@@ -55,6 +55,7 @@ int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
taosMemoryFree(pMemTable); taosMemoryFree(pMemTable);
goto _err; goto _err;
} }
vnodeBufPoolRef(pMemTable->pPool);
*ppMemTable = pMemTable; *ppMemTable = pMemTable;
return code; return code;
...@@ -66,6 +67,7 @@ _err: ...@@ -66,6 +67,7 @@ _err:
void tsdbMemTableDestroy(SMemTable *pMemTable) { void tsdbMemTableDestroy(SMemTable *pMemTable) {
if (pMemTable) { if (pMemTable) {
vnodeBufPoolUnRef(pMemTable->pPool);
taosArrayDestroy(pMemTable->aTbData); taosArrayDestroy(pMemTable->aTbData);
taosMemoryFree(pMemTable); taosMemoryFree(pMemTable);
} }
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
/* ------------------------ STRUCTURES ------------------------ */ /* ------------------------ STRUCTURES ------------------------ */
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool); static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool);
static int vnodeBufPoolDestroy(SVBufPool *pPool); static int vnodeBufPoolDestroy(SVBufPool *pPool);
int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
...@@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) { ...@@ -28,7 +28,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
// create pool // create pool
ret = vnodeBufPoolCreate(size, &pPool); ret = vnodeBufPoolCreate(pVnode, size, &pPool);
if (ret < 0) { if (ret < 0) {
vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode buffer pool since %s", TD_VID(pVnode), tstrerror(terrno));
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
...@@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) { ...@@ -120,7 +120,7 @@ void vnodeBufPoolFree(SVBufPool *pPool, void *p) {
} }
// STATIC METHODS ------------------- // STATIC METHODS -------------------
static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { static int vnodeBufPoolCreate(SVnode *pVnode, int64_t size, SVBufPool **ppPool) {
SVBufPool *pPool; SVBufPool *pPool;
pPool = taosMemoryMalloc(sizeof(SVBufPool) + size); pPool = taosMemoryMalloc(sizeof(SVBufPool) + size);
...@@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) { ...@@ -130,6 +130,7 @@ static int vnodeBufPoolCreate(int64_t size, SVBufPool **ppPool) {
} }
pPool->next = NULL; pPool->next = NULL;
pPool->pVnode = pVnode;
pPool->nRef = 0; pPool->nRef = 0;
pPool->size = 0; pPool->size = 0;
pPool->ptr = pPool->node.data; pPool->ptr = pPool->node.data;
...@@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) { ...@@ -146,4 +147,26 @@ static int vnodeBufPoolDestroy(SVBufPool *pPool) {
vnodeBufPoolReset(pPool); vnodeBufPoolReset(pPool);
taosMemoryFree(pPool); taosMemoryFree(pPool);
return 0; return 0;
}
void vnodeBufPoolRef(SVBufPool *pPool) {
int32_t nRef = atomic_fetch_add_32(&pPool->nRef, 1);
ASSERT(nRef > 0);
}
void vnodeBufPoolUnRef(SVBufPool *pPool) {
int32_t nRef = atomic_sub_fetch_32(&pPool->nRef, 1);
if (nRef == 0) {
SVnode *pVnode = pPool->pVnode;
vnodeBufPoolReset(pPool);
taosThreadMutexLock(&pVnode->mutex);
pPool->next = pVnode->pPool;
pVnode->pPool = pPool;
taosThreadCondSignal(&pVnode->poolNotEmpty);
taosThreadMutexUnlock(&pVnode->mutex);
}
} }
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#include "vnd.h" #include "vnd.h"
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"
static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData); static int vnodeEncodeInfo(const SVnodeInfo *pInfo, char **ppData);
...@@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode); ...@@ -27,18 +27,18 @@ static void vnodeWaitCommit(SVnode *pVnode);
int vnodeBegin(SVnode *pVnode) { int vnodeBegin(SVnode *pVnode) {
// alloc buffer pool // alloc buffer pool
/* pthread_mutex_lock(); */ taosThreadMutexLock(&pVnode->mutex);
while (pVnode->pPool == NULL) { while (pVnode->pPool == NULL) {
/* pthread_cond_wait(); */ taosThreadCondWait(&pVnode->poolNotEmpty, &pVnode->mutex);
} }
pVnode->inUse = pVnode->pPool; pVnode->inUse = pVnode->pPool;
pVnode->inUse->nRef = 1;
pVnode->pPool = pVnode->inUse->next; pVnode->pPool = pVnode->inUse->next;
pVnode->inUse->next = NULL; pVnode->inUse->next = NULL;
/* ref pVnode->inUse buffer pool */
/* pthread_mutex_unlock(); */ taosThreadMutexUnlock(&pVnode->mutex);
pVnode->state.commitID++; pVnode->state.commitID++;
// begin meta // begin meta
...@@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -217,7 +217,7 @@ int vnodeCommit(SVnode *pVnode) {
vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID,
pVnode->state.applied); pVnode->state.applied);
pVnode->onCommit = pVnode->inUse; vnodeBufPoolUnRef(pVnode->inUse);
pVnode->inUse = NULL; pVnode->inUse = NULL;
// save info // save info
...@@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) { ...@@ -284,10 +284,6 @@ int vnodeCommit(SVnode *pVnode) {
// apply the commit (TODO) // apply the commit (TODO)
walEndSnapshot(pVnode->pWal); walEndSnapshot(pVnode->pWal);
vnodeBufPoolReset(pVnode->onCommit);
pVnode->onCommit->next = pVnode->pPool;
pVnode->pPool = pVnode->onCommit;
pVnode->onCommit = NULL;
vInfo("vgId:%d, commit over", TD_VID(pVnode)); vInfo("vgId:%d, commit over", TD_VID(pVnode));
......
...@@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -89,6 +89,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&pVnode->syncSem, 0, 0);
tsem_init(&(pVnode->canCommit), 0, 1); tsem_init(&(pVnode->canCommit), 0, 1);
taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
// open buffer pool // open buffer pool
if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) { if (vnodeOpenBufPool(pVnode, pVnode->config.isHeap ? 0 : pVnode->config.szBuf / 3) < 0) {
...@@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) { ...@@ -195,6 +197,8 @@ void vnodeClose(SVnode *pVnode) {
// destroy handle // destroy handle
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
tsem_destroy(&pVnode->syncSem); tsem_destroy(&pVnode->syncSem);
taosThreadCondDestroy(&pVnode->poolNotEmpty);
taosThreadMutexDestroy(&pVnode->mutex);
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
} }
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册